You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2014/12/19 13:12:44 UTC
[1/8] incubator-ignite git commit: ignite-gg9499 -
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-gg9499 [created] 67df1ce35
ignite-gg9499 -
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4da1d1a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4da1d1a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4da1d1a4
Branch: refs/heads/ignite-gg9499
Commit: 4da1d1a496c3eb8e0e73a397fcd02322f3508416
Parents: c67dcde
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Dec 12 16:34:17 2014 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Dec 12 16:34:17 2014 +0300
----------------------------------------------------------------------
.../query/h2/twostep/GridMergeIndex.java | 162 +++++++++++++++++++
.../query/h2/twostep/GridMergeTable.java | 145 +++++++++++++++++
.../query/h2/twostep/GridNextPageRequest.java | 54 +++++++
.../query/h2/twostep/GridNextPageResponse.java | 47 ++++++
.../query/h2/twostep/GridQueryAck.java | 42 +++++
.../query/h2/twostep/GridQueryRequest.java | 50 ++++++
6 files changed, 500 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4da1d1a4/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
new file mode 100644
index 0000000..163256b
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
@@ -0,0 +1,162 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.gridgain.grid.*;
+import org.h2.engine.*;
+import org.h2.index.*;
+import org.h2.message.*;
+import org.h2.result.*;
+import org.h2.table.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Merge index.
+ */
+public class GridMergeIndex extends BaseIndex {
+ /** */
+ private static final int MAX_CAPACITY = 100_000;
+
+ /** */
+ private static final Collection<Row> END = new ArrayList<>(0);
+
+ /** */
+ private Collection<Collection<Row>> fetchedRows = new LinkedBlockingQueue<>();
+
+ /** */
+ private BlockingQueue<Collection<Row>> cursorRows = new LinkedBlockingQueue<>();
+
+ /** */
+ private int fetchedCnt;
+
+ /** */
+ private final AtomicInteger cnt = new AtomicInteger(0);
+
+ /** {@inheritDoc} */
+ @Override public long getRowCount(Session session) {
+ return cnt.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getRowCountApproximation() {
+ return getRowCount(null);
+ }
+
+ /**
+ * @param cnt Count.
+ */
+ public void addCount(int cnt) {
+ this.cnt.addAndGet(cnt);
+ }
+
+ /**
+ * @param rows0 Rows.
+ */
+ public void addRows(Collection<Row> rows0) {
+ assert !rows0.isEmpty();
+
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkRename() {
+ throw DbException.getUnsupportedException("rename");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close(Session session) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void add(Session session, Row row) {
+ throw DbException.getUnsupportedException("add");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(Session session, Row row) {
+ throw DbException.getUnsupportedException("remove row");
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor find(Session session, SearchRow first, SearchRow last) {
+ if (fetchedRows == null)
+ throw new GridRuntimeException("Rows were dropped out of result set.");
+
+ return new Cursor0();
+ }
+
+ /** {@inheritDoc} */
+ @Override public double getCost(Session session, int[] masks, TableFilter filter, SortOrder sortOrder) {
+ return getRowCountApproximation() + Constants.COST_ROW_OFFSET;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(Session session) {
+ throw DbException.getUnsupportedException("remove index");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void truncate(Session session) {
+ throw DbException.getUnsupportedException("truncate");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canGetFirstOrLast() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor findFirstOrLast(Session session, boolean first) {
+ throw DbException.getUnsupportedException("findFirstOrLast");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean needRebuild() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getDiskSpaceUsed() {
+ return 0;
+ }
+
+ private class Cursor0 implements Cursor {
+ /** */
+ private Row cur;
+
+ /** */
+ private Iterator<Row> curIter;
+
+ /** {@inheritDoc} */
+ @Override public Row get() {
+ return cur;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SearchRow getSearchRow() {
+ return get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean next() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean previous() {
+ throw DbException.getUnsupportedException("previous");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4da1d1a4/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
new file mode 100644
index 0000000..3f059fa
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
@@ -0,0 +1,145 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.h2.command.ddl.*;
+import org.h2.engine.*;
+import org.h2.index.*;
+import org.h2.message.*;
+import org.h2.result.*;
+import org.h2.table.*;
+
+import java.util.*;
+
+/**
+ * Merge table for distributed queries.
+ */
+public class GridMergeTable extends TableBase {
+ /** */
+ private final ArrayList<Index> idxs = new ArrayList<>(1);
+
+ /** */
+ private final GridMergeIndex idx = new GridMergeIndex();
+
+ /**
+ * @param data Data.
+ */
+ public GridMergeTable(CreateTableData data) {
+ super(data);
+
+ idxs.add(idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void lock(Session session, boolean exclusive, boolean force) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close(Session ses) {
+ idx.close(ses);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unlock(Session s) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Index addIndex(Session session, String indexName, int indexId, IndexColumn[] cols,
+ IndexType indexType, boolean create, String indexComment) {
+ throw DbException.getUnsupportedException("addIndex");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeRow(Session session, Row row) {
+ throw DbException.getUnsupportedException("removeRow");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void truncate(Session session) {
+ throw DbException.getUnsupportedException("truncate");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addRow(Session session, Row row) {
+ throw DbException.getUnsupportedException("addRow");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkSupportAlter() {
+ throw DbException.getUnsupportedException("alter");
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getTableType() {
+ return EXTERNAL_TABLE_ENGINE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridMergeIndex getScanIndex(Session session) {
+ return idx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Index getUniqueIndex() {
+ return null; // We don't have a PK.
+ }
+
+ /** {@inheritDoc} */
+ @Override public ArrayList<Index> getIndexes() {
+ return idxs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isLockedExclusively() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMaxDataModificationId() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDeterministic() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canGetRowCount() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canDrop() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getRowCount(Session ses) {
+ return idx.getRowCount(ses);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getRowCountApproximation() {
+ return idx.getRowCountApproximation();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getDiskSpaceUsed() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkRename() {
+ throw DbException.getUnsupportedException("rename");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4da1d1a4/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java
new file mode 100644
index 0000000..d550b3b
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java
@@ -0,0 +1,54 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.gridgain.grid.util.direct.*;
+
+import java.nio.*;
+
+/**
+ * Request to fetch next page.
+ */
+public class GridNextPageRequest extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private long reqId;
+
+ /** */
+ private long qryId;
+
+ /** */
+ private int qry;
+
+ /** */
+ private int offset;
+
+ /** */
+ private int pageSize;
+
+ @Override public boolean writeTo(ByteBuffer buf) {
+ return false;
+ }
+
+ @Override public boolean readFrom(ByteBuffer buf) {
+ return false;
+ }
+
+ @Override public byte directType() {
+ return 0;
+ }
+
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ return null;
+ }
+
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4da1d1a4/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java
new file mode 100644
index 0000000..d77215c
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java
@@ -0,0 +1,47 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.gridgain.grid.util.direct.*;
+import org.h2.value.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ * TODO write doc
+ */
+public class GridNextPageResponse extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private long reqId;
+
+ /** */
+ private Collection<Value[]> rows;
+
+ @Override public boolean writeTo(ByteBuffer buf) {
+ return false;
+ }
+
+ @Override public boolean readFrom(ByteBuffer buf) {
+ return false;
+ }
+
+ @Override public byte directType() {
+ return 0;
+ }
+
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ return null;
+ }
+
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4da1d1a4/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java
new file mode 100644
index 0000000..10e30ee
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java
@@ -0,0 +1,42 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.gridgain.grid.util.direct.*;
+
+import java.nio.*;
+
+/**
+ * TODO write doc
+ */
+public class GridQueryAck extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private long reqId;
+
+ @Override public boolean writeTo(ByteBuffer buf) {
+ return false;
+ }
+
+ @Override public boolean readFrom(ByteBuffer buf) {
+ return false;
+ }
+
+ @Override public byte directType() {
+ return 0;
+ }
+
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ return null;
+ }
+
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4da1d1a4/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java
new file mode 100644
index 0000000..7a664fd
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java
@@ -0,0 +1,50 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.gridgain.grid.util.direct.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ * TODO write doc
+ */
+public class GridQueryRequest extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private long reqId;
+
+ /** */
+ private List<String> sqlQrys;
+
+ /** */
+ private List<Collection<Object>> params;
+
+
+ @Override public boolean writeTo(ByteBuffer buf) {
+ return false;
+ }
+
+ @Override public boolean readFrom(ByteBuffer buf) {
+ return false;
+ }
+
+ @Override public byte directType() {
+ return 0;
+ }
+
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ return null;
+ }
+
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+
+ }
+}
[5/8] incubator-ignite git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-gg9499
Posted by se...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-gg9499
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/29823e67
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/29823e67
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/29823e67
Branch: refs/heads/ignite-gg9499
Commit: 29823e67eb6d187897219e6fe859e9cffd24658c
Parents: 3e62b4e 3032bee
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Dec 19 09:00:51 2014 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Dec 19 09:00:51 2014 +0300
----------------------------------------------------------------------
.../optimized-classnames.previous.properties | 30621 -----------------
.../optimized/optimized-classnames.properties | 30621 -----------------
.../dht/atomic/GridDhtAtomicCache.java | 2 +-
.../gridgain/grid/kernal/visor/VisorJob.java | 6 +-
.../grid/kernal/visor/cache/VisorCache.java | 4 +-
.../cache/VisorCacheAggregatedMetrics.java | 417 +-
.../kernal/visor/cache/VisorCacheMetrics.java | 176 +-
.../kernal/visor/cache/VisorCacheMetrics2.java | 218 -
.../cache/VisorCacheMetricsCollectorTask.java | 104 +-
.../cache/VisorCacheQueryAggregatedMetrics.java | 129 -
.../visor/cache/VisorCacheQueryMetrics.java | 44 +-
.../visor/node/VisorGridConfiguration.java | 2 +-
.../VisorNodeConfigurationCollectorJob.java | 2 +-
.../grid/kernal/visor/util/VisorTaskUtils.java | 42 +
.../core/src/main/resources/gridgain.properties | 2 +-
.../commands/cache/VisorCacheCommand.scala | 127 +-
.../visor/commands/vvm/VisorVvmCommand.scala | 9 +-
.../gridgain/visor/VisorRuntimeBaseSpec.scala | 1 -
.../org/gridgain/visor/VisorTextTableSpec.scala | 3 +-
.../visor/commands/VisorArgListSpec.scala | 3 +-
.../commands/VisorFileNameCompleterSpec.scala | 4 +-
.../commands/alert/VisorAlertCommandSpec.scala | 5 +-
.../cache/VisorCacheClearCommandSpec.scala | 9 +-
.../commands/cache/VisorCacheCommandSpec.scala | 6 +-
.../cache/VisorCacheCompactCommandSpec.scala | 9 +-
.../config/VisorConfigurationCommandSpec.scala | 6 +-
.../cswap/VisorCacheSwapCommandSpec.scala | 6 +-
.../deploy/VisorDeployCommandSpec.scala | 3 +-
.../disco/VisorDiscoveryCommandSpec.scala | 2 +-
.../events/VisorEventsCommandSpec.scala | 3 +-
.../visor/commands/gc/VisorGcCommandSpec.scala | 3 +-
.../commands/help/VisorHelpCommandSpec.scala | 3 +-
.../commands/kill/VisorKillCommandSpec.scala | 3 +-
.../commands/log/VisorLogCommandSpec.scala | 3 +-
.../commands/mem/VisorMemoryCommandSpec.scala | 3 +-
.../commands/open/VisorOpenCommandSpec.scala | 1 +
.../commands/start/VisorStartCommandSpec.scala | 3 +-
.../commands/tasks/VisorTasksCommandSpec.scala | 11 +-
.../commands/vvm/VisorVvmCommandSpec.scala | 3 +-
.../testsuites/VisorConsoleSelfTestSuite.scala | 13 +-
pom.xml | 7 +-
41 files changed, 521 insertions(+), 62118 deletions(-)
----------------------------------------------------------------------
[8/8] incubator-ignite git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-gg9499
Posted by se...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-gg9499
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/67df1ce3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/67df1ce3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/67df1ce3
Branch: refs/heads/ignite-gg9499
Commit: 67df1ce352c28c0704e3309bb0267d2e0259f9c4
Parents: 50e88a6 47cf475
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Dec 19 15:11:32 2014 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Dec 19 15:11:32 2014 +0300
----------------------------------------------------------------------
ipc/shmem/ggshmem/Makefile.am | 2 +-
ipc/shmem/ggshmem/Makefile.in | 2 +-
.../java/META-INF/native/linux32/libggshmem.so | Bin 136714 -> 0 bytes
.../java/META-INF/native/linux64/libggshmem.so | Bin 145904 -> 161925 bytes
.../java/META-INF/native/osx/libggshmem.dylib | Bin 29312 -> 32940 bytes
5 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[4/8] incubator-ignite git commit: ignite-gg9499 -
Posted by se...@apache.org.
ignite-gg9499 -
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3e62b4eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3e62b4eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3e62b4eb
Branch: refs/heads/ignite-gg9499
Commit: 3e62b4eb7be6b91101cb6c09ea10fd5c5c250600
Parents: 193d9b3
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Dec 19 09:00:29 2014 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Dec 19 09:00:29 2014 +0300
----------------------------------------------------------------------
.../testsuites/GridUtilSelfTestSuite.java | 1 -
.../cache/GridCacheCrossCacheQuerySelfTest.java | 23 ++++++++++++--------
2 files changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3e62b4eb/modules/core/src/test/java/org/gridgain/testsuites/GridUtilSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/testsuites/GridUtilSelfTestSuite.java b/modules/core/src/test/java/org/gridgain/testsuites/GridUtilSelfTestSuite.java
index 5c05824..b3eabd4 100644
--- a/modules/core/src/test/java/org/gridgain/testsuites/GridUtilSelfTestSuite.java
+++ b/modules/core/src/test/java/org/gridgain/testsuites/GridUtilSelfTestSuite.java
@@ -35,7 +35,6 @@ public class GridUtilSelfTestSuite extends TestSuite {
suite.addTestSuite(GridThreadPoolExecutorServiceSelfTest.class);
suite.addTestSuite(GridUtilsSelfTest.class);
suite.addTestSuite(GridSpinReadWriteLockSelfTest.class);
- suite.addTestSuite(GridQueueSelfTest.class);
suite.addTestSuite(GridStringBuilderFactorySelfTest.class);
suite.addTestSuite(GridToStringBuilderSelfTest.class);
suite.addTestSuite(GridByteArrayListSelfTest.class);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3e62b4eb/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index 0720c91..bc9dcf8 100644
--- a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -103,14 +103,9 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
}
/**
- * Fills the caches with data and executes the query.
- *
- * @param prj Cache projection.
- * @throws Exception If failed.
- * @return Result.
+ * @throws IgniteCheckedException If failed.
*/
- private List<Map.Entry<Integer, FactPurchase>> body(GridCacheProjection<Integer, FactPurchase> prj)
- throws Exception {
+ private void fillCaches() throws IgniteCheckedException {
int idGen = 0;
GridCache<Integer, Object> dimCache = ignite.cache("replicated");
@@ -155,9 +150,19 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
factCache.put(id, new FactPurchase(id, prod.getId(), store.getId()));
}
+ }
- GridCacheQuery<Map.Entry<Integer, FactPurchase>> qry = (prj == null ? factCache : prj).queries().createSqlQuery(
- FactPurchase.class,
+ /**
+ * Fills the caches with data and executes the query.
+ *
+ * @param prj Cache projection.
+ * @throws Exception If failed.
+ * @return Result.
+ */
+ private List<Map.Entry<Integer, FactPurchase>> body(GridCacheProjection<Integer, FactPurchase> prj)
+ throws Exception {
+ GridCacheQuery<Map.Entry<Integer, FactPurchase>> qry = (prj == null ?
+ ignite.<Integer, FactPurchase>cache("partitioned") : prj).queries().createSqlQuery(FactPurchase.class,
"from \"replicated\".DimStore, \"partitioned\".FactPurchase where DimStore.id = FactPurchase.storeId");
List<Map.Entry<Integer, FactPurchase>> res = new ArrayList<>(qry.execute().get());
[7/8] incubator-ignite git commit: ignite-gg9499 - minor
Posted by se...@apache.org.
ignite-gg9499 - minor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/50e88a64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/50e88a64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/50e88a64
Branch: refs/heads/ignite-gg9499
Commit: 50e88a642829b69e4ab9aefec115134913c6f5d8
Parents: 1226b24
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Dec 19 15:11:05 2014 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Dec 19 15:11:05 2014 +0300
----------------------------------------------------------------------
.../grid/kernal/processors/query/h2/twostep/GridMergeIndex.java | 2 --
1 file changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50e88a64/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
index 5d51b32..04a4c39 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
@@ -256,8 +256,6 @@ public abstract class GridMergeIndex extends BaseIndex {
fetched.add(cur);
}
- X.println("__ row: " + Arrays.toString(cur.getValueList()));
-
return true;
}
[3/8] incubator-ignite git commit: ignite-gg9499 -
Posted by se...@apache.org.
ignite-gg9499 -
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/193d9b32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/193d9b32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/193d9b32
Branch: refs/heads/ignite-gg9499
Commit: 193d9b32183cf4b35eb0bdb026c37147942bf15e
Parents: 4da1d1a
Author: S.Vladykin <sv...@gridgain.com>
Authored: Thu Dec 18 19:29:25 2014 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Thu Dec 18 19:29:25 2014 +0300
----------------------------------------------------------------------
.../org/gridgain/grid/kernal/GridTopic.java | 5 +-
.../cache/query/GridCacheQueriesEx.java | 7 +
.../cache/query/GridCacheQueriesImpl.java | 5 +
.../cache/query/GridCacheQueriesProxy.java | 12 +
.../cache/query/GridCacheQueryManager.java | 26 +-
.../cache/query/GridCacheSqlQuery.java | 91 +++++
.../cache/query/GridCacheSqlResult.java | 19 ++
.../cache/query/GridCacheTwoStepQuery.java | 66 ++++
.../processors/query/GridQueryIndexing.java | 10 +
.../processors/query/GridQueryProcessor.java | 17 +
.../java/org/gridgain/grid/util/GridQueue.java | 340 -------------------
.../gridgain/grid/util/GridQueueSelfTest.java | 62 ----
.../processors/query/h2/GridH2Indexing.java | 130 ++++---
.../query/h2/GridH2ResultSetIterator.java | 3 +-
.../query/h2/opt/GridH2IndexBase.java | 5 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 263 ++++++++++++++
.../query/h2/twostep/GridMergeIndex.java | 131 +++++--
.../h2/twostep/GridMergeIndexUnsorted.java | 74 ++++
.../query/h2/twostep/GridMergeTable.java | 33 +-
.../query/h2/twostep/GridNextPageRequest.java | 54 ---
.../query/h2/twostep/GridNextPageResponse.java | 47 ---
.../query/h2/twostep/GridQueryAck.java | 42 ---
.../query/h2/twostep/GridQueryRequest.java | 50 ---
.../h2/twostep/GridReduceQueryExecutor.java | 199 +++++++++++
.../query/h2/twostep/GridResultPage.java | 76 +++++
.../twostep/messages/GridNextPageRequest.java | 59 ++++
.../twostep/messages/GridNextPageResponse.java | 149 ++++++++
.../query/h2/twostep/messages/GridQueryAck.java | 34 ++
.../twostep/messages/GridQueryFailResponse.java | 46 +++
.../h2/twostep/messages/GridQueryRequest.java | 61 ++++
30 files changed, 1402 insertions(+), 714 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java
index 5fedbd9..7ab61d9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java
@@ -77,7 +77,10 @@ public enum GridTopic {
TOPIC_TIME_SYNC,
/** */
- TOPIC_HADOOP;
+ TOPIC_HADOOP,
+
+ /** */
+ TOPIC_QUERY;
/** Enum values. */
private static final GridTopic[] VALS = values();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java
index d1732fb..e854367 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java
@@ -10,6 +10,7 @@
package org.gridgain.grid.kernal.processors.cache.query;
import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
import org.gridgain.grid.cache.query.*;
import java.util.*;
@@ -41,4 +42,10 @@ public interface GridCacheQueriesEx<K, V> extends GridCacheQueries<K, V> {
* @return Query.
*/
public <R> GridCacheQuery<R> createSpiQuery();
+
+ /**
+ * @param qry Query.
+ * @return Future.
+ */
+ public IgniteFuture<GridCacheSqlResult> execute(GridCacheTwoStepQuery qry);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java
index 3ba1ceb..f643cb2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java
@@ -158,6 +158,11 @@ public class GridCacheQueriesImpl<K, V> implements GridCacheQueriesEx<K, V>, Ext
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<GridCacheSqlResult> execute(GridCacheTwoStepQuery qry) {
+ return ctx.kernalContext().query().queryTwoStep(qry);
+ }
+
+ /** {@inheritDoc} */
@Override public GridCacheContinuousQuery<K, V> createContinuousQuery() {
return ctx.continuousQueries().createQuery(prj == null ? null : prj.predicate());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java
index 9edcf6a..61f7ac7 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java
@@ -166,6 +166,18 @@ public class GridCacheQueriesProxy<K, V> implements GridCacheQueriesEx<K, V>, Ex
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<GridCacheSqlResult> execute(GridCacheTwoStepQuery qry) {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.execute(qry);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteFuture<?> rebuildIndexes(Class<?> cls) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java
index 25c0668..5fbc366 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java
@@ -51,7 +51,7 @@ import static org.gridgain.grid.kernal.processors.cache.query.GridCacheQueryType
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapter<K, V> {
/** */
- protected GridQueryProcessor idxProc;
+ protected GridQueryProcessor qryProc;
/** */
private String space;
@@ -78,7 +78,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
/** {@inheritDoc} */
@Override public void start0() throws IgniteCheckedException {
- idxProc = cctx.kernalContext().query();
+ qryProc = cctx.kernalContext().query();
space = cctx.name();
maxIterCnt = cctx.config().getMaximumQueryIteratorCount();
@@ -165,7 +165,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
throw new IllegalStateException("Failed to get size (grid is stopping).");
try {
- return idxProc.size(space, valType);
+ return qryProc.size(space, valType);
}
finally {
leaveBusy();
@@ -193,7 +193,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
throw new IllegalStateException("Failed to rebuild indexes (grid is stopping).");
try {
- return idxProc.rebuildIndexes(space, typeName);
+ return qryProc.rebuildIndexes(space, typeName);
}
finally {
leaveBusy();
@@ -210,7 +210,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
throw new IllegalStateException("Failed to rebuild indexes (grid is stopping).");
try {
- return idxProc.rebuildAllIndexes();
+ return qryProc.rebuildAllIndexes();
}
finally {
leaveBusy();
@@ -262,7 +262,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
return; // Ignore index update when node is stopping.
try {
- idxProc.onSwap(space, key);
+ qryProc.onSwap(space, key);
}
finally {
leaveBusy();
@@ -282,7 +282,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
return; // Ignore index update when node is stopping.
try {
- idxProc.onUnswap(space, key, val, valBytes);
+ qryProc.onUnswap(space, key, val, valBytes);
}
finally {
leaveBusy();
@@ -324,7 +324,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
if (val == null)
val = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader());
- idxProc.store(space, key, keyBytes, val, valBytes, CU.versionToBytes(ver), expirationTime);
+ qryProc.store(space, key, keyBytes, val, valBytes, CU.versionToBytes(ver), expirationTime);
}
finally {
invalidateResultCache();
@@ -349,7 +349,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
return; // Ignore index update when node is stopping.
try {
- idxProc.remove(space, key);
+ qryProc.remove(space, key);
}
finally {
invalidateResultCache();
@@ -368,7 +368,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
return; // Ignore index update when node is stopping.
try {
- idxProc.onUndeploy(space, ldr);
+ qryProc.onUndeploy(space, ldr);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -488,7 +488,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
taskName));
}
- iter = idxProc.query(space, qry.clause(), F.asList(args),
+ iter = qryProc.query(space, qry.clause(), F.asList(args),
qry.queryClassName(), filter(qry));
break;
@@ -531,7 +531,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
taskName));
}
- iter = idxProc.queryText(space, qry.clause(), qry.queryClassName(), filter(qry));
+ iter = qryProc.queryText(space, qry.clause(), qry.queryClassName(), filter(qry));
break;
@@ -650,7 +650,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
else {
assert qry.type() == SQL_FIELDS;
- GridQueryFieldsResult qryRes = idxProc.queryFields(space, qry.clause(), F.asList(args), filter(qry));
+ GridQueryFieldsResult qryRes = qryProc.queryFields(space, qry.clause(), F.asList(args), filter(qry));
res.metaData(qryRes.metaData());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java
new file mode 100644
index 0000000..025ea29
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java
@@ -0,0 +1,91 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.query;
+
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Query.
+ */
+public class GridCacheSqlQuery implements Externalizable {
+ /** */
+ private static final Object[] EMPTY_PARAMS = {};
+
+ /** */
+ String alias;
+
+ /** */
+ String qry;
+
+ /** */
+ Object[] params;
+
+ /**
+ * For {@link Externalizable}.
+ */
+ public GridCacheSqlQuery() {
+ // No-op.
+ }
+
+ /**
+ * @param alias Alias.
+ * @param qry Query.
+ * @param params Query parameters.
+ */
+ GridCacheSqlQuery(String alias, String qry, Object[] params) {
+ A.ensure(!F.isEmpty(qry), "qry must not be empty");
+
+ this.alias = alias;
+ this.qry = qry;
+
+ this.params = F.isEmpty(params) ? EMPTY_PARAMS : params;
+ }
+
+ /**
+ * @return Alias.
+ */
+ public String alias() {
+ return alias;
+ }
+
+ /**
+ * @return Query.
+ */
+ public String query() {
+ return qry;
+ }
+
+ /**
+ * @return Parameters.
+ */
+ public Object[] parameters() {
+ return params;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeString(out, alias);
+ U.writeString(out, qry);
+ U.writeArray(out, params);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ alias = U.readString(in);
+ qry = U.readString(in);
+ params = U.readArray(in);
+
+ if (F.isEmpty(params))
+ params = EMPTY_PARAMS;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlResult.java
new file mode 100644
index 0000000..ecee21e
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlResult.java
@@ -0,0 +1,19 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.query;
+
+import java.util.*;
+
+/**
+ * SQL Query result.
+ */
+public interface GridCacheSqlResult extends AutoCloseable, Iterable<List<?>> {
+ // No-op.
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java
new file mode 100644
index 0000000..a7c9a02
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -0,0 +1,66 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.query;
+
+import org.apache.ignite.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Two step map-reduce style query.
+ */
+public class GridCacheTwoStepQuery implements Serializable {
+ /** */
+ private Map<String, GridCacheSqlQuery> mapQrys;
+
+ /** */
+ private GridCacheSqlQuery reduce;
+
+ /**
+ * @param qry Reduce query.
+ * @param params Reduce query parameters.
+ */
+ public GridCacheTwoStepQuery(String qry, Object ... params) {
+ reduce = new GridCacheSqlQuery(null, qry, params);
+ }
+
+ /**
+ * @param alias Alias.
+ * @param qry SQL Query.
+ * @param params Query parameters.
+ */
+ public void addMapQuery(String alias, String qry, Object ... params) {
+ A.ensure(!F.isEmpty(alias), "alias must not be empty");
+
+ if (mapQrys == null)
+ mapQrys = new GridLeanMap<>();
+
+ if (mapQrys.put(alias, new GridCacheSqlQuery(alias, qry, params)) != null)
+ throw new IgniteException("Failed to add query, alias already exists: " + alias + ".");
+ }
+
+ /**
+ * @return Reduce query.
+ */
+ public GridCacheSqlQuery reduceQuery() {
+ return reduce;
+ }
+
+ /**
+ * @return Map queries.
+ */
+ public Collection<GridCacheSqlQuery> mapQueries() {
+ return mapQrys.values();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java
index 18b2832..1b9ec6a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java
@@ -13,6 +13,7 @@ import org.apache.ignite.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.spi.indexing.*;
import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
import org.gridgain.grid.util.lang.*;
import org.jetbrains.annotations.*;
@@ -37,6 +38,15 @@ public interface GridQueryIndexing {
*/
public void stop() throws IgniteCheckedException;
+
+ /**
+ * Runs two step query.
+ *
+ * @param qry Query.
+ * @return Future.
+ */
+ public IgniteFuture<GridCacheSqlResult> queryTwoStep(GridCacheTwoStepQuery qry);
+
/**
* Queries individual fields (generally used by JDBC drivers).
*
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java
index 6bc0235..e05c425 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java
@@ -18,6 +18,7 @@ import org.gridgain.grid.cache.*;
import org.gridgain.grid.cache.query.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
import org.gridgain.grid.util.*;
import org.gridgain.grid.util.future.*;
import org.gridgain.grid.util.lang.*;
@@ -428,6 +429,22 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * @param qry Query.
+ * @return Future.
+ */
+ public IgniteFuture<GridCacheSqlResult> queryTwoStep(GridCacheTwoStepQuery qry) {
+ if (!busyLock.enterBusy())
+ throw new IllegalStateException("Failed to execute query (grid is stopping).");
+
+ try {
+ return idx.queryTwoStep(qry);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
* @param space Space.
* @param key Key.
* @throws IgniteCheckedException Thrown in case of any errors.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/util/GridQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/GridQueue.java b/modules/core/src/main/java/org/gridgain/grid/util/GridQueue.java
deleted file mode 100644
index 1f6bbe4..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/util/GridQueue.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.util;
-
-import org.gridgain.grid.util.typedef.internal.*;
-import org.gridgain.grid.util.tostring.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Queue which supports addition at tail and removing at head. This
- * queue also exposes its internal linked list nodes and allows for
- * constant time removal from the middle of the queue.
- * <p>
- * This queue is not thread-safe.
- */
-public class GridQueue<E> extends AbstractCollection<E> implements Queue<E> {
- /** Queue size. */
- private int size;
-
- /** Modification count. */
- private int modCnt;
-
- /** Queue header. */
- private Node<E> hdr = new Node<>(null, null, null);
-
- /**
- * Creates empty queue.
- */
- public GridQueue() {
- hdr.next = hdr.prev = hdr;
- }
-
- /**
- * Handles modification count check.
- *
- * @param match Modification count to match.
- */
- private void checkModCount(int match) {
- if (modCnt != match)
- throw new ConcurrentModificationException("Mod count mismatch [expected=" + match +
- ", actual=" + modCnt + ']');
-
- modCnt++;
- }
-
- /**
- * Adds element before node.
- *
- * @param e Element to add.
- * @param n Node.
- * @return New node.
- */
- private Node<E> addBefore(E e, Node<E> n) {
- A.notNull(e, "e");
-
- assert n != null;
-
- int match = modCnt;
-
- Node<E> newNode = new Node<>(e, n, n.prev);
-
- // Link.
- newNode.prev.next = newNode;
- newNode.next.prev = newNode;
-
- size++;
-
- checkModCount(match);
-
- return newNode;
- }
-
- /**
- * Removes node.
- *
- * @param n Node to remove.
- * @return Removed value.
- */
- private E remove(Node<E> n) {
- assert n != null;
-
- if (n == hdr)
- throw new NoSuchElementException();
-
- assert !n.unlinked();
-
- int match = modCnt;
-
- E res = n.item;
-
- // Relink.
- n.prev.next = n.next;
- n.next.prev = n.prev;
-
- // GC.
- n.next = n.prev = null;
- n.item = null;
-
- size--;
-
- checkModCount(match);
-
- n.unlink();
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public boolean add(E e) {
- offer(e);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean remove(Object o) {
- A.notNull(o, "o");
-
- for (Node<E> n = hdr.next; n != hdr; n = n.next) {
- if (o.equals(n.item)) {
- remove(n);
-
- return true;
- }
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean offer(E e) {
- addBefore(e, hdr);
-
- return true;
- }
-
- /**
- * Same as {@link #offer(Object)}, but returns created node.
- *
- * @param e Element to add.
- * @return New node.
- */
- public Node<E> offerx(E e) {
- return addBefore(e, hdr);
- }
-
- /**
- * Polls element from head of the queue.
- *
- * @return Polled element.
- */
- @Nullable @Override public E poll() {
- if (size == 0)
- return null;
-
- return remove(hdr.next);
- }
-
- /** {@inheritDoc} */
- @Override public E element() {
- Node<E> n = hdr.next;
-
- if (n == null)
- throw new NoSuchElementException();
-
- return n.item;
- }
-
- /** {@inheritDoc} */
- @Override public E remove() {
- E item = poll();
-
- if (item == null)
- throw new NoSuchElementException();
-
- return item;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public E peek() {
- return hdr.next.item;
- }
-
- /**
- * @return Peeks at first node in the queue.
- */
- public Node<E> peekx() {
- return hdr.next == hdr ? null : hdr.next;
- }
-
- /**
- * Unlinks node from the queue.
- *
- * @param n Node to unlink.
- */
- public void unlink(Node<E> n) {
- A.notNull(n, "n");
-
- remove(n);
- }
-
- /**
- * Gets queue size.
- *
- * @return Queue size.
- */
- @Override public int size() {
- return size;
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<E> iterator() {
- return new QueueIterator();
- }
-
- /**
- * Node for internal linked list.
- *
- * @param <E> Queue element.
- */
- @SuppressWarnings( {"PublicInnerClass"})
- public static class Node<E> {
- /** Item. */
- private E item;
-
- /** Next. */
- @GridToStringExclude
- private Node<E> next;
-
- /** Previous. */
- @GridToStringExclude
- private Node<E> prev;
-
- /** Unlinked flag. */
- private boolean unlinked;
-
- /**
- * @param item Item.
- * @param next Next link.
- * @param prev Previous link.
- */
- private Node(E item, Node<E> next, Node<E> prev) {
- this.item = item;
- this.next = next;
- this.prev = prev;
- }
-
- /**
- * Gets this node's item.
- *
- * @return This node's item.
- */
- public E item() {
- return item;
- }
-
- /**
- * Sets unlinked flag.
- */
- void unlink() {
- assert !unlinked;
-
- unlinked = true;
- }
-
- /**
- * Checks if node is unlinked.
- *
- * @return {@code True} if node is unlinked.
- */
- public boolean unlinked() {
- return unlinked;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(Node.class, this);
- }
- }
-
- /**
- * Iterator.
- */
- private class QueueIterator implements Iterator<E> {
- /** Next element. */
- private Node<E> next;
-
- /** Expected modification count. */
- private int expModCnt = modCnt;
-
- /**
- *
- */
- QueueIterator() {
- next = hdr.next;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasNext() {
- return next != hdr;
- }
-
- /** {@inheritDoc} */
- @Override public E next() {
- checkModCount();
-
- if (next == null)
- throw new NoSuchElementException();
-
- E ret = next.item;
-
- next = next.next;
-
- return ret;
- }
-
- /** {@inheritDoc} */
- @Override public void remove() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Checks modification count.
- */
- private void checkModCount() {
- if (modCnt != expModCnt)
- throw new ConcurrentModificationException("Mod count mismatch [expected=" + expModCnt +
- ", actual=" + modCnt + ']');
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/test/java/org/gridgain/grid/util/GridQueueSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/util/GridQueueSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/util/GridQueueSelfTest.java
deleted file mode 100644
index 4b613f6..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/util/GridQueueSelfTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.util;
-
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.testframework.junits.common.*;
-
-/**
- * Grid utils tests.
- */
-@GridCommonTest(group = "Utils")
-public class GridQueueSelfTest extends GridCommonAbstractTest {
- /**
- *
- */
- public void testQueue() {
- GridQueue<String> q = new GridQueue<>();
- for (char c = 'a'; c <= 'z'; c++)
- q.offer(Character.toString(c));
-
- assertEquals('z' - 'a' + 1, q.size());
-
- char ch = 'a';
-
- for (String c = q.poll(); c != null; c = q.poll()) {
- X.println(c);
-
- assertEquals(Character.toString(ch++), c);
- }
-
- assert q.isEmpty();
-
- for (char c = 'A'; c <= 'Z'; c++)
- q.offer(Character.toString(c));
-
- assertEquals('Z' - 'A' + 1, q.size());
-
- ch = 'A';
-
- for (String s : q) {
- X.println(s);
-
- assertEquals(Character.toString(ch++), s);
- }
-
- q.remove("O");
-
- assertEquals('Z' - 'A', q.size());
-
- for (String c = q.poll(); c != null; c = q.poll())
- assert !"O".equals(c);
-
- assert q.isEmpty();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
index 7307676..7ee84f8 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
@@ -16,13 +16,14 @@ import org.apache.ignite.marshaller.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.indexing.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.cache.query.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
import org.gridgain.grid.kernal.processors.query.*;
import org.gridgain.grid.kernal.processors.query.h2.opt.*;
+import org.gridgain.grid.kernal.processors.query.h2.twostep.*;
import org.gridgain.grid.util.*;
import org.gridgain.grid.util.lang.*;
import org.gridgain.grid.util.offheap.unsafe.*;
@@ -112,9 +113,6 @@ public class GridH2Indexing implements GridQueryIndexing {
}
/** */
- private static final ThreadLocal<GridH2Indexing> localSpi = new ThreadLocal<>();
-
- /** */
private volatile String cachedSearchPathCmd;
/** Cache for deserialized offheap rows. */
@@ -148,6 +146,12 @@ public class GridH2Indexing implements GridQueryIndexing {
private final Collection<Connection> conns = Collections.synchronizedCollection(new ArrayList<Connection>());
/** */
+ private GridMapQueryExecutor mapQryExec;
+
+ /** */
+ private GridReduceQueryExecutor rdcQryExec;
+
+ /** */
private final ThreadLocal<ConnectionWrapper> connCache = new ThreadLocal<ConnectionWrapper>() {
@Nullable @Override public ConnectionWrapper get() {
ConnectionWrapper c = super.get();
@@ -218,6 +222,15 @@ public class GridH2Indexing implements GridQueryIndexing {
private volatile GridKernalContext ctx;
/**
+ * @param space Space.
+ * @return Connection.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Connection connectionForSpace(@Nullable String space) throws IgniteCheckedException {
+ return connectionForThread(schema(space));
+ }
+
+ /**
* Gets DB connection.
*
* @param schema Whether to set schema for connection or not.
@@ -370,22 +383,15 @@ public class GridH2Indexing implements GridQueryIndexing {
if (tbl == null)
return; // Type was rejected.
- localSpi.set(this);
+ removeKey(spaceName, k, tbl);
- try {
- removeKey(spaceName, k, tbl);
-
- if (expirationTime == 0)
- expirationTime = Long.MAX_VALUE;
+ if (expirationTime == 0)
+ expirationTime = Long.MAX_VALUE;
- tbl.tbl.update(k, v, expirationTime);
+ tbl.tbl.update(k, v, expirationTime);
- if (tbl.luceneIdx != null)
- tbl.luceneIdx.store(k, v, ver, expirationTime);
- }
- finally {
- localSpi.remove();
- }
+ if (tbl.luceneIdx != null)
+ tbl.luceneIdx.store(k, v, ver, expirationTime);
}
/** {@inheritDoc} */
@@ -393,23 +399,16 @@ public class GridH2Indexing implements GridQueryIndexing {
if (log.isDebugEnabled())
log.debug("Removing key from cache query index [locId=" + nodeId + ", key=" + key + ']');
- localSpi.set(this);
+ for (TableDescriptor tbl : tables(schema(spaceName))) {
+ if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
+ if (tbl.tbl.update(key, null, 0)) {
+ if (tbl.luceneIdx != null)
+ tbl.luceneIdx.remove(key);
- try {
- for (TableDescriptor tbl : tables(schema(spaceName))) {
- if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
- if (tbl.tbl.update(key, null, 0)) {
- if (tbl.luceneIdx != null)
- tbl.luceneIdx.remove(key);
-
- return;
- }
+ return;
}
}
}
- finally {
- localSpi.remove();
- }
}
/** {@inheritDoc} */
@@ -419,47 +418,33 @@ public class GridH2Indexing implements GridQueryIndexing {
if (schema == null)
return;
- localSpi.set(this);
-
- try {
- for (TableDescriptor tbl : schema.values()) {
- if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
- try {
- if (tbl.tbl.onSwap(key))
- return;
- }
- catch (IgniteCheckedException e) {
- throw new IgniteCheckedException(e);
- }
+ for (TableDescriptor tbl : schema.values()) {
+ if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
+ try {
+ if (tbl.tbl.onSwap(key))
+ return;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteCheckedException(e);
}
}
}
- finally {
- localSpi.remove();
- }
}
/** {@inheritDoc} */
@Override public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes)
throws IgniteCheckedException {
- localSpi.set(this);
-
- try {
- for (TableDescriptor tbl : tables(schema(spaceName))) {
- if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
- try {
- if (tbl.tbl.onUnswap(key, val))
- return;
- }
- catch (IgniteCheckedException e) {
- throw new IgniteCheckedException(e);
- }
+ for (TableDescriptor tbl : tables(schema(spaceName))) {
+ if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
+ try {
+ if (tbl.tbl.onUnswap(key, val))
+ return;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteCheckedException(e);
}
}
}
- finally {
- localSpi.remove();
- }
}
/**
@@ -540,8 +525,6 @@ public class GridH2Indexing implements GridQueryIndexing {
@Override public <K, V> GridQueryFieldsResult queryFields(@Nullable final String spaceName, final String qry,
@Nullable final Collection<Object> params, final GridIndexingQueryFilter filters)
throws IgniteCheckedException {
- localSpi.set(this);
-
setFilters(filters);
try {
@@ -575,8 +558,6 @@ public class GridH2Indexing implements GridQueryIndexing {
}
finally {
setFilters(null);
-
- localSpi.remove();
}
}
@@ -652,7 +633,7 @@ public class GridH2Indexing implements GridQueryIndexing {
* @return Result.
* @throws IgniteCheckedException If failed.
*/
- private ResultSet executeSqlQueryWithTimer(Connection conn, String sql,
+ public ResultSet executeSqlQueryWithTimer(Connection conn, String sql,
@Nullable Collection<Object> params) throws IgniteCheckedException {
long start = U.currentTimeMillis();
@@ -719,7 +700,7 @@ public class GridH2Indexing implements GridQueryIndexing {
* @param params Parameters collection.
* @throws IgniteCheckedException If failed.
*/
- private void bindParameters(PreparedStatement stmt, @Nullable Collection<Object> params) throws IgniteCheckedException {
+ public void bindParameters(PreparedStatement stmt, @Nullable Collection<Object> params) throws IgniteCheckedException {
if (!F.isEmpty(params)) {
int idx = 1;
@@ -751,8 +732,6 @@ public class GridH2Indexing implements GridQueryIndexing {
setFilters(filters);
- localSpi.set(this);
-
try {
ResultSet rs = executeQuery(qry, params, tbl);
@@ -760,11 +739,14 @@ public class GridH2Indexing implements GridQueryIndexing {
}
finally {
setFilters(null);
-
- localSpi.remove();
}
}
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<GridCacheSqlResult> queryTwoStep(GridCacheTwoStepQuery qry) {
+ return rdcQryExec.query(qry);
+ }
+
/**
* Sets filters for current thread. Must be set to not null value
* before executeQuery and reset to null after in finally block since it signals
@@ -772,7 +754,7 @@ public class GridH2Indexing implements GridQueryIndexing {
*
* @param filters Filters.
*/
- private void setFilters(@Nullable GridIndexingQueryFilter filters) {
+ public void setFilters(@Nullable GridIndexingQueryFilter filters) {
GridH2IndexBase.setFiltersForThread(filters);
}
@@ -1115,6 +1097,12 @@ public class GridH2Indexing implements GridQueryIndexing {
for (GridCacheConfiguration cacheCfg : ctx.config().getCacheConfiguration())
registerSpace(cacheCfg.getName());
+
+ mapQryExec = new GridMapQueryExecutor();
+ rdcQryExec = new GridReduceQueryExecutor();
+
+ mapQryExec.start(ctx, this);
+ rdcQryExec.start(ctx, this);
}
System.setProperty("h2.serializeJavaObject", "false");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java
index 61f1190..90aa454 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java
@@ -10,6 +10,7 @@
package org.gridgain.grid.kernal.processors.query.h2;
import org.apache.ignite.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
import org.gridgain.grid.util.*;
import org.gridgain.grid.util.typedef.internal.*;
@@ -20,7 +21,7 @@ import java.util.*;
/**
* Iterator over result set.
*/
-abstract class GridH2ResultSetIterator<T> extends GridCloseableIteratorAdapter<T> {
+public abstract class GridH2ResultSetIterator<T> extends GridCloseableIteratorAdapter<T> {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java
index 00cb06d..4dcfd73 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java
@@ -49,7 +49,10 @@ public abstract class GridH2IndexBase extends BaseIndex {
* @param fs Filters.
*/
public static void setFiltersForThread(GridIndexingQueryFilter fs) {
- filters.set(fs);
+ if (fs == null)
+ filters.remove();
+ else
+ filters.set(fs);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
new file mode 100644
index 0000000..b86caf1
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -0,0 +1,263 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.indexing.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
+import org.gridgain.grid.kernal.processors.query.h2.*;
+import org.gridgain.grid.kernal.processors.query.h2.twostep.messages.*;
+import org.gridgain.grid.util.typedef.*;
+import org.h2.jdbc.*;
+import org.h2.result.*;
+import org.h2.value.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.lang.reflect.*;
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Map query executor.
+ */
+public class GridMapQueryExecutor {
+ /** */
+ private static final Field RESULT_FIELD;
+
+ /**
+ * Initialize.
+ */
+ static {
+ try {
+ RESULT_FIELD = JdbcResultSet.class.getDeclaredField("result");
+
+ RESULT_FIELD.setAccessible(true);
+ }
+ catch (NoSuchFieldException e) {
+ throw new IllegalStateException("Check H2 version in classpath.", e);
+ }
+ }
+
+ /** */
+ private IgniteLogger log;
+
+ /** */
+ private GridKernalContext ctx;
+
+ /** */
+ private GridH2Indexing h2;
+
+ /** */
+ private ConcurrentMap<UUID, ConcurrentMap<Long, QueryResults>> qryRess = new ConcurrentHashMap8<>();
+
+ /**
+ * @param ctx Context.
+ * @param h2 H2 Indexing.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void start(final GridKernalContext ctx, GridH2Indexing h2) throws IgniteCheckedException {
+ this.ctx = ctx;
+ this.h2 = h2;
+
+ log = ctx.log(GridMapQueryExecutor.class);
+
+ // TODO handle node failures.
+
+ ctx.io().addUserMessageListener(GridTopic.TOPIC_QUERY, new IgniteBiPredicate<UUID, Object>() {
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ assert msg != null;
+
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (msg instanceof GridQueryRequest)
+ executeLocalQuery(node, (GridQueryRequest)msg);
+ else if (msg instanceof GridNextPageRequest)
+ sendNextPage(node, (GridNextPageRequest)msg);
+
+ return true;
+ }
+ });
+ }
+
+ /**
+ * @param node Node.
+ * @param req Query request.
+ */
+ private void executeLocalQuery(ClusterNode node, GridQueryRequest req) {
+ h2.setFilters(new GridIndexingQueryFilter() {
+ @Nullable @Override public <K, V> IgniteBiPredicate<K, V> forSpace(String spaceName) {
+ final GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(spaceName);
+
+ if (cache.context().isReplicated() || cache.configuration().getBackups() == 0)
+ return null;
+
+ return new IgniteBiPredicate<K, V>() {
+ @Override public boolean apply(K k, V v) {
+ return cache.context().affinity().primary(ctx.discovery().localNode(), k, -1);
+ }
+ };
+ }
+ });
+
+ try {
+ QueryResults qr = new QueryResults(req.requestId(), req.queries().size());
+
+ ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id());
+
+ if (nodeRess == null) {
+ nodeRess = new ConcurrentHashMap8<>();
+
+ ConcurrentMap<Long, QueryResults> old = qryRess.putIfAbsent(node.id(), nodeRess);
+
+ if (old != null)
+ nodeRess = old;
+ }
+
+ QueryResults old = nodeRess.putIfAbsent(req.requestId(), qr);
+
+ assert old == null;
+
+ // Prepare snapshots for all the needed tables before actual run.
+ for (GridCacheSqlQuery qry : req.queries()) {
+ // TODO
+ }
+
+ // Run queries.
+ int i = 0;
+
+ for (GridCacheSqlQuery qry : req.queries()) {
+ ResultSet rs = h2.executeSqlQueryWithTimer(h2.connectionForSpace(null), qry.query(),
+ F.asList(qry.parameters()));
+
+ assert rs instanceof JdbcResultSet : rs.getClass();
+
+ ResultInterface res = (ResultInterface)RESULT_FIELD.get(rs);
+
+ qr.results[i] = res;
+ qr.resultSets[i] = rs;
+
+ // Send the first page.
+ sendNextPage(node, qr, i, req.pageSize(), res.getRowCount());
+
+ i++;
+ }
+ }
+ catch (Throwable e) {
+ sendError(node, req.requestId(), e);
+ }
+ finally {
+ h2.setFilters(null);
+ }
+ }
+
+ /**
+ * @param node Node.
+ * @param qryReqId Query request ID.
+ * @param err Error.
+ */
+ private void sendError(ClusterNode node, long qryReqId, Throwable err) {
+ try {
+ ctx.io().sendUserMessage(F.asList(node), new GridQueryFailResponse(qryReqId, err));
+ }
+ catch (IgniteCheckedException e) {
+ e.addSuppressed(err);
+
+ log.error("Failed to send error message.", e);
+ }
+ }
+
+ /**
+ * @param node Node.
+ * @param req Request.
+ */
+ private void sendNextPage(ClusterNode node, GridNextPageRequest req) {
+ ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id());
+
+ QueryResults qr = nodeRess == null ? null : nodeRess.get(req.queryRequestId());
+
+ if (qr == null)
+ sendError(node, req.queryRequestId(),
+ new IllegalStateException("No query result found for request: " + req));
+ else
+ sendNextPage(node, qr, req.query(), req.pageSize(), -1);
+ }
+
+ /**
+ * @param node Node.
+ * @param qr Query results.
+ * @param qry Query.
+ * @param pageSize Page size.
+ * @param allRows All rows count.
+ */
+ private void sendNextPage(ClusterNode node, QueryResults qr, int qry, int pageSize, int allRows) {
+ int page;
+
+ List<Value[]> rows = new ArrayList<>(Math.min(64, pageSize));
+
+ ResultInterface res = qr.results[qry];
+
+ assert res != null;
+
+ synchronized (res) {
+ page = qr.pages[qry]++;
+
+ for (int i = 0 ; i < pageSize; i++) {
+ if (!res.next())
+ break;
+
+ rows.add(res.currentRow());
+ }
+ }
+
+ try {
+ ctx.io().sendUserMessage(F.asList(node), new GridNextPageResponse(qr.qryReqId, qry, page, allRows, rows));
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failed to send message.", e);
+
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class QueryResults {
+ /** */
+ private long qryReqId;
+
+ /** */
+ private ResultInterface[] results;
+
+ /** */
+ private ResultSet[] resultSets;
+
+ /** */
+ private int[] pages;
+
+ /**
+ * @param qryReqId Query request ID.
+ * @param qrys Queries.
+ */
+ private QueryResults(long qryReqId, int qrys) {
+ this.qryReqId = qryReqId;
+
+ results = new ResultInterface[qrys];
+ resultSets = new ResultSet[qrys];
+ pages = new int[qrys];
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
index 163256b..16ba15d 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
@@ -9,7 +9,7 @@
package org.gridgain.grid.kernal.processors.query.h2.twostep;
-import org.gridgain.grid.*;
+import org.apache.ignite.*;
import org.h2.engine.*;
import org.h2.index.*;
import org.h2.message.*;
@@ -18,31 +18,23 @@ import org.h2.table.*;
import org.jetbrains.annotations.*;
import java.util.*;
-import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
/**
* Merge index.
*/
-public class GridMergeIndex extends BaseIndex {
+public abstract class GridMergeIndex extends BaseIndex {
/** */
- private static final int MAX_CAPACITY = 100_000;
-
- /** */
- private static final Collection<Row> END = new ArrayList<>(0);
-
- /** */
- private Collection<Collection<Row>> fetchedRows = new LinkedBlockingQueue<>();
-
- /** */
- private BlockingQueue<Collection<Row>> cursorRows = new LinkedBlockingQueue<>();
-
- /** */
- private int fetchedCnt;
+ private static final int MAX_FETCH_SIZE = 100000;
/** */
private final AtomicInteger cnt = new AtomicInteger(0);
+ /**
+ * Will be r/w from query execution thread only, does not need to be threadsafe.
+ */
+ private ArrayList<Row> fetched = new ArrayList<>();
+
/** {@inheritDoc} */
@Override public long getRowCount(Session session) {
return cnt.get();
@@ -61,12 +53,36 @@ public class GridMergeIndex extends BaseIndex {
}
/**
- * @param rows0 Rows.
+ * @param page Page.
*/
- public void addRows(Collection<Row> rows0) {
- assert !rows0.isEmpty();
+ public abstract void addPage(GridResultPage<?> page);
+
+ /** {@inheritDoc} */
+ @Override public Cursor find(Session session, SearchRow first, SearchRow last) {
+ if (fetched == null)
+ throw new IgniteException("Fetched result set was too large.");
+ if (fetched.size() == cnt.get()) // We've fetched all the rows.
+ return findAllFetched(fetched, first, last);
+ return findInStream(first, last);
+ }
+
+ /**
+ * @param first First row.
+ * @param last Last row.
+ * @return Cursor. Usually it must be {@link FetchingCursor} instance.
+ */
+ protected abstract Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last);
+
+ /**
+ * @param fetched Fetched rows.
+ * @param first First row.
+ * @param last Last row.
+ * @return Cursor.
+ */
+ protected Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first, @Nullable SearchRow last) {
+ return new IteratorCursor(fetched.iterator());
}
/** {@inheritDoc} */
@@ -90,14 +106,6 @@ public class GridMergeIndex extends BaseIndex {
}
/** {@inheritDoc} */
- @Override public Cursor find(Session session, SearchRow first, SearchRow last) {
- if (fetchedRows == null)
- throw new GridRuntimeException("Rows were dropped out of result set.");
-
- return new Cursor0();
- }
-
- /** {@inheritDoc} */
@Override public double getCost(Session session, int[] masks, TableFilter filter, SortOrder sortOrder) {
return getRowCountApproximation() + Constants.COST_ROW_OFFSET;
}
@@ -132,12 +140,24 @@ public class GridMergeIndex extends BaseIndex {
return 0;
}
- private class Cursor0 implements Cursor {
+ /**
+ * Cursor over iterator.
+ */
+ protected class IteratorCursor implements Cursor {
/** */
- private Row cur;
+ private Iterator<Row> iter;
/** */
- private Iterator<Row> curIter;
+ protected Row cur;
+
+ /**
+ * @param iter Iterator.
+ */
+ public IteratorCursor(Iterator<Row> iter) {
+ assert iter != null;
+
+ this.iter = iter;
+ }
/** {@inheritDoc} */
@Override public Row get() {
@@ -151,7 +171,9 @@ public class GridMergeIndex extends BaseIndex {
/** {@inheritDoc} */
@Override public boolean next() {
- return false;
+ cur = iter.hasNext() ? iter.next() : null;
+
+ return cur != null;
}
/** {@inheritDoc} */
@@ -159,4 +181,51 @@ public class GridMergeIndex extends BaseIndex {
throw DbException.getUnsupportedException("previous");
}
}
+
+ /**
+ * Fetching cursor.
+ */
+ protected abstract class FetchingCursor extends IteratorCursor {
+ /** */
+ private boolean canFetch = true;
+
+ /**
+ */
+ public FetchingCursor() {
+ super(fetched == null ? Collections.<Row>emptyIterator() : fetched.iterator());
+ }
+
+ /**
+ * @return Next row or {@code null} if none available.
+ */
+ @Nullable protected abstract Row fetchNext();
+
+ /** {@inheritDoc} */
+ @Override public boolean next() {
+ if (super.next())
+ return true;
+
+ if (!canFetch)
+ return false;
+
+ cur = fetchNext();
+
+ if (cur == null) { // No more results to fetch.
+ assert fetched == null || fetched.size() == cnt.get() : fetched.size() + " <> " + cnt.get();
+
+ canFetch = false;
+
+ return false;
+ }
+
+ if (fetched != null) { // Try to reuse fetched result.
+ fetched.add(cur);
+
+ if (fetched.size() == MAX_FETCH_SIZE)
+ fetched = null; // Throw away fetched result if it is too large.
+ }
+
+ return true;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
new file mode 100644
index 0000000..c6aaea9
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -0,0 +1,74 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.apache.ignite.*;
+import org.h2.index.*;
+import org.h2.result.*;
+import org.h2.value.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Unsorted merge index.
+ */
+public class GridMergeIndexUnsorted extends GridMergeIndex {
+ /** */
+ private final BlockingQueue<GridResultPage<?>> queue = new LinkedBlockingQueue<>();
+
+ /** {@inheritDoc} */
+ @Override public void addPage(GridResultPage<?> page) {
+ queue.add(page);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) {
+ final GridResultPage<?> p = queue.poll();
+
+ assert p != null; // First page must be already fetched.
+
+ if (p.isEmpty())
+ return new IteratorCursor(Collections.<Row>emptyIterator());
+
+ p.fetchNextPage(); // We always request next page before reading this one.
+
+ return new FetchingCursor() {
+ /** */
+ Iterator<Value[]> iter = p.rows().iterator();
+
+ @Nullable @Override protected Row fetchNext() {
+ if (!iter.hasNext()) {
+ GridResultPage<?> page;
+
+ try {
+ page = queue.take();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException("Query execution was interrupted.", e);
+ }
+
+ if (page.isEmpty()) {
+ assert queue.isEmpty() : "It must be the last page.";
+
+ return null; // Empty page - we are done.
+ }
+
+ page.fetchNextPage();
+
+ iter = page.rows().iterator();
+ }
+
+ return new Row(iter.next(), 0);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
index 3f059fa..a1f2213 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
@@ -9,6 +9,7 @@
package org.gridgain.grid.kernal.processors.query.h2.twostep;
+import org.h2.api.*;
import org.h2.command.ddl.*;
import org.h2.engine.*;
import org.h2.index.*;
@@ -26,7 +27,7 @@ public class GridMergeTable extends TableBase {
private final ArrayList<Index> idxs = new ArrayList<>(1);
/** */
- private final GridMergeIndex idx = new GridMergeIndex();
+ private final GridMergeIndex idx = new GridMergeIndexUnsorted();
/**
* @param data Data.
@@ -142,4 +143,34 @@ public class GridMergeTable extends TableBase {
@Override public void checkRename() {
throw DbException.getUnsupportedException("rename");
}
+
+ /**
+ * Engine.
+ */
+ public static class Engine implements TableEngine {
+ /** */
+ private static ThreadLocal<GridMergeTable> createdTbl = new ThreadLocal<>();
+
+ /**
+ * @return Created table.
+ */
+ public static GridMergeTable getCreated() {
+ GridMergeTable tbl = createdTbl.get();
+
+ assert tbl != null;
+
+ createdTbl.remove();
+
+ return tbl;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Table createTable(CreateTableData data) {
+ GridMergeTable tbl = new GridMergeTable(data);
+
+ createdTbl.set(tbl);
+
+ return tbl;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java
deleted file mode 100644
index d550b3b..0000000
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.query.h2.twostep;
-
-import org.gridgain.grid.util.direct.*;
-
-import java.nio.*;
-
-/**
- * Request to fetch next page.
- */
-public class GridNextPageRequest extends GridTcpCommunicationMessageAdapter {
- /** */
- private long reqId;
-
- /** */
- private long qryId;
-
- /** */
- private int qry;
-
- /** */
- private int offset;
-
- /** */
- private int pageSize;
-
- @Override public boolean writeTo(ByteBuffer buf) {
- return false;
- }
-
- @Override public boolean readFrom(ByteBuffer buf) {
- return false;
- }
-
- @Override public byte directType() {
- return 0;
- }
-
- @Override public GridTcpCommunicationMessageAdapter clone() {
- return null;
- }
-
- @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java
deleted file mode 100644
index d77215c..0000000
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.query.h2.twostep;
-
-import org.gridgain.grid.util.direct.*;
-import org.h2.value.*;
-
-import java.nio.*;
-import java.util.*;
-
-/**
- * TODO write doc
- */
-public class GridNextPageResponse extends GridTcpCommunicationMessageAdapter {
- /** */
- private long reqId;
-
- /** */
- private Collection<Value[]> rows;
-
- @Override public boolean writeTo(ByteBuffer buf) {
- return false;
- }
-
- @Override public boolean readFrom(ByteBuffer buf) {
- return false;
- }
-
- @Override public byte directType() {
- return 0;
- }
-
- @Override public GridTcpCommunicationMessageAdapter clone() {
- return null;
- }
-
- @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java
deleted file mode 100644
index 10e30ee..0000000
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.query.h2.twostep;
-
-import org.gridgain.grid.util.direct.*;
-
-import java.nio.*;
-
-/**
- * TODO write doc
- */
-public class GridQueryAck extends GridTcpCommunicationMessageAdapter {
- /** */
- private long reqId;
-
- @Override public boolean writeTo(ByteBuffer buf) {
- return false;
- }
-
- @Override public boolean readFrom(ByteBuffer buf) {
- return false;
- }
-
- @Override public byte directType() {
- return 0;
- }
-
- @Override public GridTcpCommunicationMessageAdapter clone() {
- return null;
- }
-
- @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java
deleted file mode 100644
index 7a664fd..0000000
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.query.h2.twostep;
-
-import org.gridgain.grid.util.direct.*;
-
-import java.nio.*;
-import java.util.*;
-
-/**
- * TODO write doc
- */
-public class GridQueryRequest extends GridTcpCommunicationMessageAdapter {
- /** */
- private long reqId;
-
- /** */
- private List<String> sqlQrys;
-
- /** */
- private List<Collection<Object>> params;
-
-
- @Override public boolean writeTo(ByteBuffer buf) {
- return false;
- }
-
- @Override public boolean readFrom(ByteBuffer buf) {
- return false;
- }
-
- @Override public byte directType() {
- return 0;
- }
-
- @Override public GridTcpCommunicationMessageAdapter clone() {
- return null;
- }
-
- @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
new file mode 100644
index 0000000..3e7e12c
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -0,0 +1,199 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.managers.communication.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
+import org.gridgain.grid.kernal.processors.query.h2.*;
+import org.gridgain.grid.kernal.processors.query.h2.twostep.messages.*;
+import org.gridgain.grid.util.future.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jdk8.backport.*;
+
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Reduce query executor.
+ */
+public class GridReduceQueryExecutor {
+ /** */
+ private GridKernalContext ctx;
+
+ /** */
+ private GridH2Indexing h2;
+
+ /** */
+ private IgniteLogger log;
+
+ /** */
+ private final AtomicLong reqIdGen = new AtomicLong();
+
+ /** */
+ private final ConcurrentMap<Long, QueryRun> runs = new ConcurrentHashMap8<>();
+
+ /**
+ * @param ctx Context.
+ * @param h2 H2 Indexing.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void start(final GridKernalContext ctx, GridH2Indexing h2) throws IgniteCheckedException {
+ this.ctx = ctx;
+ this.h2 = h2;
+
+ log = ctx.log(GridReduceQueryExecutor.class);
+
+ // TODO handle node failure.
+
+ ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert msg != null;
+
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (msg instanceof GridNextPageResponse)
+ onNextPage(node, (GridNextPageResponse)msg);
+ else if (msg instanceof GridQueryFailResponse)
+ onFail(node, (GridQueryFailResponse)msg);
+ }
+ });
+ }
+
+ private void onFail(ClusterNode node, GridQueryFailResponse msg) {
+ U.error(log, "Failed to execute query.", msg.error());
+ }
+
+ private void onNextPage(final ClusterNode node, GridNextPageResponse msg) {
+ final long qryReqId = msg.queryRequestId();
+ final int qry = msg.query();
+ final int pageSize = msg.rows().size();
+
+ QueryRun r = runs.get(qryReqId);
+
+ GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null);
+
+ idx.addPage(new GridResultPage<Object>(node.id(), msg.query(), msg.rows()) {
+ @Override public void fetchNextPage() {
+ try {
+ ctx.io().sendUserMessage(F.asList(node), new GridNextPageRequest(qryReqId, qry, pageSize));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+
+ if (msg.allRows() != -1) { // Only the first page contains row count.
+ idx.addCount(msg.allRows());
+
+ r.latch.countDown();
+ }
+ }
+
+ public IgniteFuture<GridCacheSqlResult> query(GridCacheTwoStepQuery qry) {
+ long qryReqId = reqIdGen.incrementAndGet();
+
+ QueryRun r = new QueryRun();
+
+ r.tbls = new ArrayList<>();
+
+ try {
+ r.conn = h2.connectionForSpace(null);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+
+ for (GridCacheSqlQuery mapQry : qry.mapQueries())
+ r.tbls.add(createTable(r.conn, mapQry));
+
+ Collection<ClusterNode> nodes = ctx.grid().cluster().nodes(); // TODO filter nodes somehow?
+
+ r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
+
+ this.runs.put(qryReqId, r);
+
+ try {
+ ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 1000, qry.mapQueries())); // TODO conf page size
+
+ r.latch.await();
+
+ GridCacheSqlQuery rdc = qry.reduceQuery();
+
+ final ResultSet res = h2.executeSqlQueryWithTimer(r.conn, rdc.query(), F.asList(rdc.parameters()));
+
+ return new GridFinishedFuture(ctx, new Iter(res));
+ }
+ catch (IgniteCheckedException | InterruptedException e) {
+ return new GridFinishedFuture<>(ctx, e);
+ }
+ }
+
+ private GridMergeTable createTable(Connection conn, GridCacheSqlQuery qry) {
+ try {
+ try (PreparedStatement s = conn.prepareStatement(
+ "CREATE LOCAL TEMPORARY TABLE " + qry.alias() +
+ " ENGINE \"" + GridMergeTable.Engine.class.getName() + "\" " +
+ " AS SELECT * FROM (" + qry.query() + ") WHERE FALSE")) {
+ h2.bindParameters(s, F.asList(qry.parameters()));
+
+ s.execute();
+ }
+
+ return GridMergeTable.Engine.getCreated();
+ }
+ catch (SQLException|IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class QueryRun {
+ /** */
+ private List<GridMergeTable> tbls;
+
+ /** */
+ private CountDownLatch latch;
+
+ /** */
+ private Connection conn;
+ }
+
+ /**
+ *
+ */
+ private static class Iter extends GridH2ResultSetIterator<List<?>> implements GridCacheSqlResult {
+ /**
+ * @param data Data array.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected Iter(ResultSet data) throws IgniteCheckedException {
+ super(data);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected List<?> createRow() {
+ ArrayList<Object> res = new ArrayList<>(row.length);
+
+ Collections.addAll(res, row);
+
+ return res;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
new file mode 100644
index 0000000..6c6ab6a
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
@@ -0,0 +1,76 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.h2.result.*;
+import org.h2.value.*;
+
+import java.util.*;
+
+/**
+ * Page result.
+ */
+public abstract class GridResultPage<S> {
+ /** */
+ private final S src;
+
+ /** */
+ private final Collection<Value[]> rows;
+
+ /** */
+ private final int page;
+
+ /**
+ * @param src Source.
+ * @param page Page.
+ * @param rows Page rows.
+ */
+ protected GridResultPage(S src, int page, Collection<Value[]> rows) {
+ assert src != null;
+ assert rows != null;
+
+ this.src = src;
+ this.page = page;
+ this.rows = rows;
+ }
+
+ /**
+ * @return Result source.
+ */
+ public S source() {
+ return src;
+ }
+
+ /**
+ * @return Page.
+ */
+ public int page() {
+ return page;
+ }
+
+ /**
+ * @return {@code true} If result is empty.
+ */
+ public boolean isEmpty() {
+ return rows.isEmpty();
+ }
+
+ /**
+ * @return Page rows.
+ */
+ public Collection<Value[]> rows() {
+ return rows;
+ }
+
+ /**
+ * Request next page.
+ */
+ public abstract void fetchNextPage();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java
new file mode 100644
index 0000000..e1eb905
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java
@@ -0,0 +1,59 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep.messages;
+
+
+import java.io.*;
+
+/**
+ * Request to fetch next page.
+ */
+public class GridNextPageRequest implements Serializable {
+ /** */
+ private long qryReqId;
+
+ /** */
+ private int qry;
+
+ /** */
+ private int pageSize;
+
+ /**
+ * @param qryReqId Query request ID.
+ * @param qry Query.
+ * @param pageSize Page size.
+ */
+ public GridNextPageRequest(long qryReqId, int qry, int pageSize) {
+ this.qryReqId = qryReqId;
+ this.qry = qry;
+ this.pageSize = pageSize;
+ }
+
+ /**
+ * @return Query request ID.
+ */
+ public long queryRequestId() {
+ return qryReqId;
+ }
+
+ /**
+ * @return Query.
+ */
+ public int query() {
+ return qry;
+ }
+
+ /**
+ * @return Page size.
+ */
+ public int pageSize() {
+ return pageSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
new file mode 100644
index 0000000..de38172
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
@@ -0,0 +1,149 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep.messages;
+
+import org.h2.store.*;
+import org.h2.value.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Next page response.
+ */
+public class GridNextPageResponse implements Externalizable {
+ /** */
+ private long qryReqId;
+
+ /** */
+ private int qry;
+
+ /** */
+ private int page;
+
+ /** */
+ private int allRows;
+
+ /** */
+ private Collection<Value[]> rows;
+
+ /**
+ * @param qryReqId Query request ID.
+ * @param qry Query.
+ * @param page Page.
+ * @param allRows All rows count.
+ * @param rows Rows.
+ */
+ public GridNextPageResponse(long qryReqId, int qry, int page, int allRows, Collection<Value[]> rows) {
+ assert rows != null;
+
+ this.qryReqId = qryReqId;
+ this.qry = qry;
+ this.page = page;
+ this.allRows = allRows;
+ this.rows = rows;
+ }
+
+ /**
+ * @return Query request ID.
+ */
+ public long queryRequestId() {
+ return qryReqId;
+ }
+
+ /**
+ * @return Query.
+ */
+ public int query() {
+ return qry;
+ }
+
+ /**
+ * @return Page.
+ */
+ public int page() {
+ return page;
+ }
+
+ /**
+ * @return All rows.
+ */
+ public int allRows() {
+ return allRows;
+ }
+
+ /**
+ * @return Rows.
+ */
+ public Collection<Value[]> rows() {
+ return rows;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeLong(qryReqId);
+ out.writeInt(qry);
+ out.writeInt(page);
+ out.writeInt(allRows);
+
+ out.writeInt(rows.size());
+
+ if (rows.isEmpty())
+ return;
+
+ Data data = Data.create(null, 512);
+
+ boolean first = true;
+
+ for (Value[] row : rows) {
+ if (first) {
+ out.writeInt(row.length);
+
+ first = false;
+ }
+
+ for (Value val : row)
+ data.writeValue(val);
+ }
+
+ out.writeInt(data.length());
+ out.write(data.getBytes(), 0, data.length());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ qryReqId = in.readLong();
+ qry = in.readInt();
+ page = in.readInt();
+ allRows = in.readInt();
+
+ int rowCnt = in.readInt();
+
+ if (rowCnt == 0)
+ rows = Collections.emptyList();
+ else {
+ rows = new ArrayList<>(rowCnt);
+
+ int cols = in.readInt();
+ int dataSize = in.readInt();
+
+ Data data = Data.create(null, dataSize);
+
+ for (int r = 0; r < rowCnt; r++) {
+ Value[] row = new Value[cols];
+
+ for (int c = 0; c < cols; c++)
+ row[c] = data.readValue();
+
+ rows.add(row);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java
new file mode 100644
index 0000000..fe55114
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java
@@ -0,0 +1,34 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep.messages;
+
+import java.io.*;
+
+/**
+ * TODO write doc
+ */
+public class GridQueryAck implements Serializable {
+ /** */
+ private long reqId;
+
+ /**
+ * @param reqId Request ID.
+ */
+ public GridQueryAck(long reqId) {
+ this.reqId = reqId;
+ }
+
+ /**
+ * @return Request ID.
+ */
+ public long requestId() {
+ return reqId;
+ }
+}
[6/8] incubator-ignite git commit: ignite-gg9499 - fixes
Posted by se...@apache.org.
ignite-gg9499 - fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1226b240
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1226b240
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1226b240
Branch: refs/heads/ignite-gg9499
Commit: 1226b2401b58e0e75d80e01787c0cee39ce5aeee
Parents: 29823e6
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Dec 19 15:09:04 2014 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Dec 19 15:09:04 2014 +0300
----------------------------------------------------------------------
.../query/h2/twostep/GridMapQueryExecutor.java | 11 +-
.../query/h2/twostep/GridMergeIndex.java | 116 ++++++++++++++-----
.../h2/twostep/GridMergeIndexUnsorted.java | 66 ++++++-----
.../query/h2/twostep/GridMergeTable.java | 4 +-
.../h2/twostep/GridReduceQueryExecutor.java | 34 +++---
.../query/h2/twostep/GridResultPage.java | 53 +++------
.../twostep/messages/GridNextPageResponse.java | 29 ++++-
.../cache/GridCacheCrossCacheQuerySelfTest.java | 34 +++++-
8 files changed, 240 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
index b86caf1..92d1875 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -211,19 +211,26 @@ public class GridMapQueryExecutor {
assert res != null;
+ boolean last = false;
+
synchronized (res) {
page = qr.pages[qry]++;
for (int i = 0 ; i < pageSize; i++) {
- if (!res.next())
+ if (!res.next()) {
+ last = true;
+
break;
+ }
rows.add(res.currentRow());
}
}
try {
- ctx.io().sendUserMessage(F.asList(node), new GridNextPageResponse(qr.qryReqId, qry, page, allRows, rows));
+ ctx.io().sendUserMessage(F.asList(node),
+ new GridNextPageResponse(qr.qryReqId, qry, page, allRows, last, rows),
+ GridTopic.TOPIC_QUERY, false, 0);
}
catch (IgniteCheckedException e) {
log.error("Failed to send message.", e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
index 16ba15d..5d51b32 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
@@ -10,11 +10,14 @@
package org.gridgain.grid.kernal.processors.query.h2.twostep;
import org.apache.ignite.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.*;
import org.h2.engine.*;
import org.h2.index.*;
import org.h2.message.*;
import org.h2.result.*;
import org.h2.table.*;
+import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -25,16 +28,32 @@ import java.util.concurrent.atomic.*;
*/
public abstract class GridMergeIndex extends BaseIndex {
/** */
+ protected final GridResultPage<?> END = new GridResultPage<Object>(null, null);
+
+ /** */
private static final int MAX_FETCH_SIZE = 100000;
/** */
private final AtomicInteger cnt = new AtomicInteger(0);
+ /** Result sources. */
+ private final AtomicInteger srcs = new AtomicInteger(0);
+
/**
* Will be r/w from query execution thread only, does not need to be threadsafe.
*/
private ArrayList<Row> fetched = new ArrayList<>();
+ /**
+ * @param tbl Table.
+ * @param name Index name.
+ * @param type Type.
+ * @param cols Columns.
+ */
+ public GridMergeIndex(GridMergeTable tbl, String name, IndexType type, IndexColumn[] cols) {
+ initBaseIndex(tbl, 0, name, cols, type);
+ }
+
/** {@inheritDoc} */
@Override public long getRowCount(Session session) {
return cnt.get();
@@ -46,6 +65,13 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/**
+ * @param srcs Number of sources.
+ */
+ public void setNumberOfSources(int srcs) {
+ this.srcs.set(srcs);
+ }
+
+ /**
* @param cnt Count.
*/
public void addCount(int cnt) {
@@ -55,7 +81,26 @@ public abstract class GridMergeIndex extends BaseIndex {
/**
* @param page Page.
*/
- public abstract void addPage(GridResultPage<?> page);
+ public final void addPage(GridResultPage<?> page) {
+ if (!page.response().rows().isEmpty())
+ addPage0(page);
+ else
+ assert page.response().isLast();
+
+ if (page.response().isLast()) {
+ int srcs0 = srcs.decrementAndGet();
+
+ assert srcs0 >= 0;
+
+ if (srcs0 == 0)
+ addPage0(END); // We've fetched all.
+ }
+ }
+
+ /**
+ * @param page Page.
+ */
+ protected abstract void addPage0(GridResultPage<?> page);
/** {@inheritDoc} */
@Override public Cursor find(Session session, SearchRow first, SearchRow last) {
@@ -145,7 +190,7 @@ public abstract class GridMergeIndex extends BaseIndex {
*/
protected class IteratorCursor implements Cursor {
/** */
- private Iterator<Row> iter;
+ protected Iterator<Row> iter;
/** */
protected Row cur;
@@ -185,47 +230,66 @@ public abstract class GridMergeIndex extends BaseIndex {
/**
* Fetching cursor.
*/
- protected abstract class FetchingCursor extends IteratorCursor {
+ protected class FetchingCursor extends IteratorCursor {
/** */
- private boolean canFetch = true;
+ private Iterator<Row> stream;
/**
*/
- public FetchingCursor() {
- super(fetched == null ? Collections.<Row>emptyIterator() : fetched.iterator());
- }
+ public FetchingCursor(Iterator<Row> stream) {
+ super(new FetchedIterator());
- /**
- * @return Next row or {@code null} if none available.
- */
- @Nullable protected abstract Row fetchNext();
+ assert stream != null;
+
+ this.stream = stream;
+ }
/** {@inheritDoc} */
@Override public boolean next() {
- if (super.next())
+ if (super.next()) {
+ assert cur != null;
+
+ if (iter == stream && fetched != null) { // Cache fetched rows for reuse.
+ if (fetched.size() == MAX_FETCH_SIZE)
+ fetched = null; // Throw away fetched result if it is too large.
+ else
+ fetched.add(cur);
+ }
+
+ X.println("__ row: " + Arrays.toString(cur.getValueList()));
+
return true;
+ }
- if (!canFetch)
+ if (iter == stream) // We've fetched the stream.
return false;
- cur = fetchNext();
-
- if (cur == null) { // No more results to fetch.
- assert fetched == null || fetched.size() == cnt.get() : fetched.size() + " <> " + cnt.get();
+ iter = stream; // Switch from cached to stream.
- canFetch = false;
+ return next();
+ }
+ }
- return false;
- }
+ /**
+ * List iterator without {@link ConcurrentModificationException}.
+ */
+ private class FetchedIterator implements Iterator<Row> {
+ /** */
+ private int idx;
- if (fetched != null) { // Try to reuse fetched result.
- fetched.add(cur);
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return fetched != null && idx < fetched.size();
+ }
- if (fetched.size() == MAX_FETCH_SIZE)
- fetched = null; // Throw away fetched result if it is too large.
- }
+ /** {@inheritDoc} */
+ @Override public Row next() {
+ return fetched.get(idx++);
+ }
- return true;
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index c6aaea9..b93c6ec 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -10,8 +10,10 @@
package org.gridgain.grid.kernal.processors.query.h2.twostep;
import org.apache.ignite.*;
+import org.gridgain.grid.util.typedef.*;
import org.h2.index.*;
import org.h2.result.*;
+import org.h2.table.*;
import org.h2.value.*;
import org.jetbrains.annotations.*;
@@ -25,50 +27,60 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
/** */
private final BlockingQueue<GridResultPage<?>> queue = new LinkedBlockingQueue<>();
+ /**
+ * @param tbl Table.
+ * @param name Index name.
+ */
+ public GridMergeIndexUnsorted(GridMergeTable tbl, String name) {
+ super(tbl, name, IndexType.createScan(false), IndexColumn.wrap(tbl.getColumns()));
+ }
+
/** {@inheritDoc} */
- @Override public void addPage(GridResultPage<?> page) {
+ @Override public void addPage0(GridResultPage<?> page) {
queue.add(page);
}
/** {@inheritDoc} */
@Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) {
- final GridResultPage<?> p = queue.poll();
-
- assert p != null; // First page must be already fetched.
+ return new FetchingCursor(new Iterator<Row>() {
+ /** */
+ Iterator<Value[]> iter = Collections.emptyIterator();
- if (p.isEmpty())
- return new IteratorCursor(Collections.<Row>emptyIterator());
+ @Override public boolean hasNext() {
+ if (iter.hasNext())
+ return true;
- p.fetchNextPage(); // We always request next page before reading this one.
+ GridResultPage<?> page;
- return new FetchingCursor() {
- /** */
- Iterator<Value[]> iter = p.rows().iterator();
+ try {
+ page = queue.take();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException("Query execution was interrupted.", e);
+ }
- @Nullable @Override protected Row fetchNext() {
- if (!iter.hasNext()) {
- GridResultPage<?> page;
+ if (page == END) {
+ assert queue.isEmpty() : "It must be the last page: " + queue;
- try {
- page = queue.take();
- }
- catch (InterruptedException e) {
- throw new IgniteException("Query execution was interrupted.", e);
- }
+ return false; // We are done.
+ }
- if (page.isEmpty()) {
- assert queue.isEmpty() : "It must be the last page.";
+ page.fetchNextPage();
- return null; // Empty page - we are done.
- }
+ iter = page.response().rows().iterator();
- page.fetchNextPage();
+ assert iter.hasNext();
- iter = page.rows().iterator();
- }
+ return true;
+ }
+ @Override public Row next() {
return new Row(iter.next(), 0);
}
- };
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
index a1f2213..683bb54 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
@@ -27,7 +27,7 @@ public class GridMergeTable extends TableBase {
private final ArrayList<Index> idxs = new ArrayList<>(1);
/** */
- private final GridMergeIndex idx = new GridMergeIndexUnsorted();
+ private final GridMergeIndex idx;
/**
* @param data Data.
@@ -35,6 +35,8 @@ public class GridMergeTable extends TableBase {
public GridMergeTable(CreateTableData data) {
super(data);
+ idx = new GridMergeIndexUnsorted(this, "merge_scan");
+
idxs.add(idx);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 3e7e12c..a12b4f9 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -59,8 +59,8 @@ public class GridReduceQueryExecutor {
// TODO handle node failure.
- ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ ctx.io().addUserMessageListener(GridTopic.TOPIC_QUERY, new IgniteBiPredicate<UUID, Object>() {
+ @Override public boolean apply(UUID nodeId, Object msg) {
assert msg != null;
ClusterNode node = ctx.discovery().node(nodeId);
@@ -69,6 +69,8 @@ public class GridReduceQueryExecutor {
onNextPage(node, (GridNextPageResponse)msg);
else if (msg instanceof GridQueryFailResponse)
onFail(node, (GridQueryFailResponse)msg);
+
+ return true;
}
});
}
@@ -86,7 +88,13 @@ public class GridReduceQueryExecutor {
GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null);
- idx.addPage(new GridResultPage<Object>(node.id(), msg.query(), msg.rows()) {
+ if (msg.allRows() != -1) { // Only the first page contains row count.
+ idx.addCount(msg.allRows());
+
+ r.latch.countDown();
+ }
+
+ idx.addPage(new GridResultPage<UUID>(node.id(), msg) {
@Override public void fetchNextPage() {
try {
ctx.io().sendUserMessage(F.asList(node), new GridNextPageRequest(qryReqId, qry, pageSize));
@@ -96,12 +104,6 @@ public class GridReduceQueryExecutor {
}
}
});
-
- if (msg.allRows() != -1) { // Only the first page contains row count.
- idx.addCount(msg.allRows());
-
- r.latch.countDown();
- }
}
public IgniteFuture<GridCacheSqlResult> query(GridCacheTwoStepQuery qry) {
@@ -118,17 +120,23 @@ public class GridReduceQueryExecutor {
throw new IgniteException(e);
}
- for (GridCacheSqlQuery mapQry : qry.mapQueries())
- r.tbls.add(createTable(r.conn, mapQry));
-
Collection<ClusterNode> nodes = ctx.grid().cluster().nodes(); // TODO filter nodes somehow?
+ for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
+ GridMergeTable tbl = createTable(r.conn, mapQry);
+
+ tbl.getScanIndex(null).setNumberOfSources(nodes.size());
+
+ r.tbls.add(tbl);
+ }
+
r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
this.runs.put(qryReqId, r);
try {
- ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 1000, qry.mapQueries())); // TODO conf page size
+ ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 1000, qry.mapQueries()), // TODO conf page size
+ GridTopic.TOPIC_QUERY, false, 0);
r.latch.await();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
index 6c6ab6a..ef52805 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
@@ -9,68 +9,51 @@
package org.gridgain.grid.kernal.processors.query.h2.twostep;
-import org.h2.result.*;
-import org.h2.value.*;
-
-import java.util.*;
+import org.gridgain.grid.kernal.processors.query.h2.twostep.messages.*;
+import org.gridgain.grid.util.typedef.internal.*;
/**
* Page result.
*/
-public abstract class GridResultPage<S> {
- /** */
- private final S src;
-
+public class GridResultPage<Z> {
/** */
- private final Collection<Value[]> rows;
+ private final Z src;
/** */
- private final int page;
+ private final GridNextPageResponse res;
/**
* @param src Source.
- * @param page Page.
- * @param rows Page rows.
+ * @param res Response.
*/
- protected GridResultPage(S src, int page, Collection<Value[]> rows) {
- assert src != null;
- assert rows != null;
-
+ protected GridResultPage(Z src, GridNextPageResponse res) {
this.src = src;
- this.page = page;
- this.rows = rows;
+ this.res = res;
}
/**
* @return Result source.
*/
- public S source() {
+ public Z source() {
return src;
}
/**
- * @return Page.
+ * @return Response.
*/
- public int page() {
- return page;
+ public GridNextPageResponse response() {
+ return res;
}
/**
- * @return {@code true} If result is empty.
+ * Request next page.
*/
- public boolean isEmpty() {
- return rows.isEmpty();
+ public void fetchNextPage() {
+ throw new UnsupportedOperationException();
}
- /**
- * @return Page rows.
- */
- public Collection<Value[]> rows() {
- return rows;
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridResultPage.class, this);
}
-
- /**
- * Request next page.
- */
- public abstract void fetchNextPage();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
index de38172..0834f4c 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
@@ -9,6 +9,7 @@
package org.gridgain.grid.kernal.processors.query.h2.twostep.messages;
+import org.gridgain.grid.util.typedef.internal.*;
import org.h2.store.*;
import org.h2.value.*;
@@ -34,20 +35,32 @@ public class GridNextPageResponse implements Externalizable {
/** */
private Collection<Value[]> rows;
+ /** */
+ private boolean last;
+
+ /**
+ * For {@link Externalizable}.
+ */
+ public GridNextPageResponse() {
+ // No-op.
+ }
+
/**
* @param qryReqId Query request ID.
* @param qry Query.
* @param page Page.
* @param allRows All rows count.
+ * @param last Last row.
* @param rows Rows.
*/
- public GridNextPageResponse(long qryReqId, int qry, int page, int allRows, Collection<Value[]> rows) {
+ public GridNextPageResponse(long qryReqId, int qry, int page, int allRows, boolean last, Collection<Value[]> rows) {
assert rows != null;
this.qryReqId = qryReqId;
this.qry = qry;
this.page = page;
this.allRows = allRows;
+ this.last = last;
this.rows = rows;
}
@@ -80,6 +93,13 @@ public class GridNextPageResponse implements Externalizable {
}
/**
+ * @return {@code true} If this is the last page.
+ */
+ public boolean isLast() {
+ return last;
+ }
+
+ /**
* @return Rows.
*/
public Collection<Value[]> rows() {
@@ -91,6 +111,7 @@ public class GridNextPageResponse implements Externalizable {
out.writeLong(qryReqId);
out.writeInt(qry);
out.writeInt(page);
+ out.writeBoolean(last);
out.writeInt(allRows);
out.writeInt(rows.size());
@@ -122,6 +143,7 @@ public class GridNextPageResponse implements Externalizable {
qryReqId = in.readLong();
qry = in.readInt();
page = in.readInt();
+ last = in.readBoolean();
allRows = in.readInt();
int rowCnt = in.readInt();
@@ -146,4 +168,9 @@ public class GridNextPageResponse implements Externalizable {
}
}
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNextPageResponse.class, this);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index bc9dcf8..90a2d4c 100644
--- a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -18,6 +18,8 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.cache.query.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
+import org.gridgain.grid.util.typedef.*;
import org.gridgain.testframework.junits.common.*;
import java.util.*;
@@ -88,11 +90,37 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
return cc;
}
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTwoStep() throws Exception {
+ fillCaches();
+
+ GridCacheTwoStepQuery q = new GridCacheTwoStepQuery("select * from _cnts_");
+
+ q.addMapQuery("_cnts_", "select count(*) cnt from \"partitioned\".FactPurchase");
+
+ GridCacheQueriesEx<Integer, FactPurchase> qx =
+ (GridCacheQueriesEx<Integer, FactPurchase>)ignite.<Integer, FactPurchase>cache("partitioned").queries();
+
+ for (List<?> row : qx.execute(q).get())
+ X.println("__ " + row);
+
+
+
+// Object cnt = .next().get(0);
+//
+// assertEquals(10L, cnt);
+ }
+
/** @throws Exception If failed. */
public void testOnProjection() throws Exception {
+ fillCaches();
+
GridCacheProjection<Integer, FactPurchase> prj = ignite.<Integer, FactPurchase>cache("partitioned").projection(
new IgnitePredicate<GridCacheEntry<Integer, FactPurchase>>() {
- @Override public boolean apply(GridCacheEntry<Integer, FactPurchase> e) {
+ @Override
+ public boolean apply(GridCacheEntry<Integer, FactPurchase> e) {
return e.getKey() > 12;
}
});
@@ -105,7 +133,9 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
/**
* @throws IgniteCheckedException If failed.
*/
- private void fillCaches() throws IgniteCheckedException {
+ private void fillCaches() throws IgniteCheckedException, InterruptedException {
+ awaitPartitionMapExchange();
+
int idGen = 0;
GridCache<Integer, Object> dimCache = ignite.cache("replicated");
[2/8] incubator-ignite git commit: ignite-gg9499 -
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryFailResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
new file mode 100644
index 0000000..ba5855e
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
@@ -0,0 +1,46 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep.messages;
+
+import java.io.*;
+
+/**
+ * Error message.
+ */
+public class GridQueryFailResponse implements Serializable {
+ /** */
+ private long qryReqId;
+
+ /** */
+ private Throwable err;
+
+ /**
+ * @param qryReqId Query request ID.
+ * @param err Error.
+ */
+ public GridQueryFailResponse(long qryReqId, Throwable err) {
+ this.qryReqId = qryReqId;
+ this.err = err;
+ }
+
+ /**
+ * @return Query request ID.
+ */
+ public long queryRequestId() {
+ return qryReqId;
+ }
+
+ /**
+ * @return Error.
+ */
+ public Throwable error() {
+ return err;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryRequest.java
new file mode 100644
index 0000000..7e2e9ad
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -0,0 +1,61 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep.messages;
+
+import org.gridgain.grid.kernal.processors.cache.query.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Query request.
+ */
+public class GridQueryRequest implements Serializable {
+ /** */
+ private long reqId;
+
+ /** */
+ private int pageSize;
+
+ /** */
+ private Collection<GridCacheSqlQuery> qrys;
+
+ /**
+ * @param reqId Request ID.
+ * @param pageSize Page size.
+ * @param qrys Queries.
+ */
+ public GridQueryRequest(long reqId, int pageSize, Collection<GridCacheSqlQuery> qrys) {
+ this.reqId = reqId;
+ this.pageSize = pageSize;
+ this.qrys = qrys;
+ }
+
+ /**
+ * @return Request ID.
+ */
+ public long requestId() {
+ return reqId;
+ }
+
+ /**
+ * @return Page size.
+ */
+ public int pageSize() {
+ return pageSize;
+ }
+
+ /**
+ * @return Queries.
+ */
+ public Collection<GridCacheSqlQuery> queries() {
+ return qrys;
+ }
+}