You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/24 14:22:21 UTC
ignite git commit: Skip merge table query.
Repository: ignite
Updated Branches:
refs/heads/ignite-sql-cache-stmt a0a10fcae -> eb2b5f38a
Skip merge table query.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eb2b5f38
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eb2b5f38
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eb2b5f38
Branch: refs/heads/ignite-sql-cache-stmt
Commit: eb2b5f38ae639293b682c35603505933ec350612
Parents: a0a10fc
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 24 16:22:03 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 24 16:22:03 2015 +0300
----------------------------------------------------------------------
.../cache/query/GridCacheSqlQuery.java | 20 ++++
.../query/h2/sql/GridSqlQuerySplitter.java | 38 ++++++-
.../query/h2/twostep/GridMergeIndex.java | 7 ++
.../h2/twostep/GridMergeIndexUnsorted.java | 14 +++
.../h2/twostep/GridReduceQueryExecutor.java | 102 +++++++++++++------
5 files changed, 149 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb2b5f38/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
index e56e445..49a926a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
@@ -70,6 +70,10 @@ public class GridCacheSqlQuery implements Message {
/** Field kept for backward compatibility. */
private String alias;
+ /** */
+ @GridDirectTransient
+ private boolean skipMergeTbl;
+
/**
* For {@link Message}.
*/
@@ -242,6 +246,21 @@ public class GridCacheSqlQuery implements Message {
}
/**
+ * @return {@code True} if reduce query can skip merge table creation and get data directly from merge index.
+ */
+ public boolean skipMergeTable() {
+ return skipMergeTbl;
+ }
+
+ /**
+ * @param skipMergeTbl {@code True} if reduce query can skip merge table creation and get
+ * data directly from merge index.
+ */
+ public void skipMergeTable(boolean skipMergeTbl) {
+ this.skipMergeTbl = skipMergeTbl;
+ }
+
+ /**
* @param args Arguments.
* @return Copy.
*/
@@ -252,6 +271,7 @@ public class GridCacheSqlQuery implements Message {
cp.cols = cols;
cp.paramIdxs = paramIdxs;
cp.paramsSize = paramsSize;
+ cp.skipMergeTbl = skipMergeTbl;
if (F.isEmpty(args))
cp.params = EMPTY_PARAMS;
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb2b5f38/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index ac47b50..741a998 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -173,22 +173,46 @@ public class GridSqlQuerySplitter {
for (GridSqlElement exp : mapExps) // Add all map expressions as visible.
mapQry.addColumn(exp, true);
- for (GridSqlElement rdcExp : rdcExps) // Add corresponding visible reduce columns.
+ boolean rdcSimpleQry = true;
+
+ for (GridSqlElement rdcExp : rdcExps) { // Add corresponding visible reduce columns.
rdcQry.addColumn(rdcExp, true);
+ if (rdcSimpleQry) {
+ if (rdcExp instanceof GridSqlColumn || rdcExp instanceof GridSqlConst)
+ continue;
+
+ if (rdcExp instanceof GridSqlAlias) {
+ if (rdcExp.size() == 1) {
+ GridSqlElement child = rdcExp.child();
+
+ if (child instanceof GridSqlColumn || child instanceof GridSqlConst)
+ continue;
+ }
+ }
+
+ rdcSimpleQry = false;
+ }
+ }
+
for (int i = rdcExps.length; i < mapExps.size(); i++) // Add all extra map columns as invisible reduce columns.
rdcQry.addColumn(column(((GridSqlAlias)mapExps.get(i)).alias()), false);
// -- GROUP BY
- if (mapQry.groupColumns() != null && !collocated)
+ if (mapQry.groupColumns() != null && !collocated) {
rdcQry.groupColumns(mapQry.groupColumns());
+ rdcSimpleQry = false;
+ }
+
// -- HAVING
if (mapQry.havingColumn() >= 0 && !collocated) {
// TODO IGNITE-1140 - Find aggregate functions in HAVING clause or rewrite query to put all aggregates to SELECT clause.
rdcQry.whereAnd(column(columnName(mapQry.havingColumn())));
mapQry.havingColumn(-1);
+
+ rdcSimpleQry = false;
}
// -- ORDER BY
@@ -199,6 +223,8 @@ public class GridSqlQuerySplitter {
if (aggregateFound) // Ordering over aggregates does not make sense.
mapQry.clearSort(); // Otherwise map sort will be used by offset-limit.
// TODO IGNITE-1141 - Check if sorting is done over aggregated expression, otherwise we can sort and use offset-limit.
+
+ rdcSimpleQry = false;
}
// -- LIMIT
@@ -207,6 +233,8 @@ public class GridSqlQuerySplitter {
if (aggregateFound)
mapQry.limit(null);
+
+ rdcSimpleQry = false;
}
// -- OFFSET
@@ -217,12 +245,16 @@ public class GridSqlQuerySplitter {
mapQry.limit(op(GridSqlOperationType.PLUS, mapQry.offset(), mapQry.limit()));
mapQry.offset(null);
+
+ rdcSimpleQry = false;
}
// -- DISTINCT
if (mapQry.distinct()) {
mapQry.distinct(!aggregateFound && mapQry.groupColumns() == null && mapQry.havingColumn() < 0);
rdcQry.distinct(true);
+
+ rdcSimpleQry = false;
}
IntArray paramIdxs = new IntArray(params.length);
@@ -230,6 +262,8 @@ public class GridSqlQuerySplitter {
GridCacheSqlQuery rdc = new GridCacheSqlQuery(rdcQry.getSQL(),
findParams(rdcQry, params, new ArrayList<>(), paramIdxs).toArray());
+ rdc.skipMergeTable(rdcSimpleQry);
+
rdc.parameterIndexes(toIntArray(paramIdxs));
paramIdxs = new IntArray(params.length);
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb2b5f38/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 7f8caed..12c2240 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -80,6 +80,13 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/**
+ *
+ */
+ protected GridMergeIndex() {
+ // No-op.
+ }
+
+ /**
* @return Return source nodes for this merge index.
*/
public Set<UUID> sources() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb2b5f38/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index 5f5eb25..501480a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -47,6 +47,20 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
super(tbl, name, IndexType.createScan(false), IndexColumn.wrap(tbl.getColumns()));
}
+ /**
+ * @return Dummy index instance.
+ */
+ public static GridMergeIndexUnsorted createDummy() {
+ return new GridMergeIndexUnsorted();
+ }
+
+ /**
+ *
+ */
+ private GridMergeIndexUnsorted() {
+ // No-op.
+ }
+
/** {@inheritDoc} */
@Override protected void addPage0(GridResultPage page) {
assert page.rowsInPage() > 0 || page.isLast() || page.isFail();
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb2b5f38/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 78c1f77..a57366d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -79,10 +79,12 @@ import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.h2.command.ddl.CreateTableData;
import org.h2.engine.Session;
+import org.h2.index.Cursor;
import org.h2.jdbc.JdbcConnection;
import org.h2.jdbc.JdbcResultSet;
import org.h2.jdbc.JdbcStatement;
import org.h2.result.ResultInterface;
+import org.h2.result.Row;
import org.h2.table.Column;
import org.h2.util.IntArray;
import org.h2.value.Value;
@@ -184,8 +186,8 @@ public class GridReduceQueryExecutor {
UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
for (QueryRun r : runs.values()) {
- for (GridMergeTable tbl : r.tbls) {
- if (tbl.getScanIndex(null).hasSource(nodeId)) {
+ for (GridMergeIndex idx : r.idxs) {
+ if (idx.hasSource(nodeId)) {
handleNodeLeft(r, nodeId);
break;
@@ -270,7 +272,7 @@ public class GridReduceQueryExecutor {
final int pageSize = r.pageSize;
- GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null);
+ GridMergeIndex idx = r.idxs.get(msg.query());
GridResultPage page;
@@ -468,7 +470,7 @@ public class GridReduceQueryExecutor {
r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize();
- r.tbls = new ArrayList<>(qry.mapQueries().size());
+ r.idxs = new ArrayList<>(qry.mapQueries().size());
String space = cctx.name();
@@ -510,27 +512,35 @@ public class GridReduceQueryExecutor {
int tblIdx = 0;
+ final boolean skipMergeTbl = !qry.explain() && qry.reduceQuery().skipMergeTable();
+
for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
- GridMergeTable tbl;
+ GridMergeIndex idx;
- try {
- tbl = createMergeTable(r.conn, mapQry, qry.explain());
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
+ if (!skipMergeTbl) {
+ GridMergeTable tbl;
- GridMergeIndex idx = tbl.getScanIndex(null);
+ try {
+ tbl = createMergeTable(r.conn, mapQry, qry.explain());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+
+ idx = tbl.getScanIndex(null);
+
+ fakeTable(r.conn, tblIdx++).setInnerTable(tbl);
+ }
+ else
+ idx = GridMergeIndexUnsorted.createDummy();
for (ClusterNode node : nodes)
idx.addSource(node.id());
- r.tbls.add(tbl);
-
- fakeTable(r.conn, tblIdx++).setInnerTable(tbl);
+ r.idxs.add(idx);
}
- r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
+ r.latch = new CountDownLatch(r.idxs.size() * nodes.size());
runs.put(qryReqId, r);
@@ -586,20 +596,50 @@ public class GridReduceQueryExecutor {
else // Send failed.
retry = true;
- ResultSet res = null;
+ Iterator<List<?>> resIter = null;
if (!retry) {
if (qry.explain())
return explainPlan(r.conn, space, qry);
- GridCacheSqlQuery rdc = qry.reduceQuery();
+ if (skipMergeTbl) {
+ List<List<?>> res = new ArrayList<>();
- // Statement caching is prohibited here because we can't guarantee correct merge index reuse.
- res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters()), false);
+ for (GridMergeIndex idx : r.idxs) {
+ Cursor cur = idx.findInStream(null, null);
+
+ while (cur.next()) {
+ Row row = cur.get();
+
+ int cols = row.getColumnCount();
+
+ List<Object> resRow = new ArrayList<>(cols);
+
+ for (int c = 0; c < cols; c++)
+ resRow .add(row.getValue(c).getObject());
+
+ res.add(resRow);
+ }
+ }
+
+ resIter = res.iterator();
+ }
+ else {
+ GridCacheSqlQuery rdc = qry.reduceQuery();
+
+ // Statement caching is prohibited here because we can't guarantee correct merge index reuse.
+ ResultSet res = h2.executeSqlQueryWithTimer(space,
+ r.conn,
+ rdc.query(),
+ F.asList(rdc.parameters()),
+ false);
+
+ resIter = new Iter(res);
+ }
}
- for (GridMergeTable tbl : r.tbls) {
- if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes.
+ for (GridMergeIndex idx : r.idxs) {
+ if (!idx.fetchedAll()) // We have to explicitly cancel queries on remote nodes.
send(nodes, new GridQueryCancelRequest(qryReqId), null);
}
@@ -610,7 +650,7 @@ public class GridReduceQueryExecutor {
continue;
}
- return new GridQueryCacheObjectsIterator(new Iter(res), cctx, keepPortable);
+ return new GridQueryCacheObjectsIterator(resIter, cctx, keepPortable);
}
catch (IgniteCheckedException | RuntimeException e) {
U.closeQuiet(r.conn);
@@ -634,8 +674,10 @@ public class GridReduceQueryExecutor {
if (!runs.remove(qryReqId, r))
U.warn(log, "Query run was already removed: " + qryReqId);
- for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++)
- fakeTable(null, i).setInnerTable(null); // Drop all merge tables.
+ if (!skipMergeTbl) {
+ for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++)
+ fakeTable(null, i).setInnerTable(null); // Drop all merge tables.
+ }
}
}
}
@@ -1125,7 +1167,7 @@ public class GridReduceQueryExecutor {
*/
private static class QueryRun {
/** */
- private List<GridMergeTable> tbls;
+ private List<GridMergeIndex> idxs;
/** */
private CountDownLatch latch;
@@ -1153,8 +1195,8 @@ public class GridReduceQueryExecutor {
while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
latch.countDown();
- for (GridMergeTable tbl : tbls) // Fail all merge indexes.
- tbl.getScanIndex(null).fail(nodeId);
+ for (GridMergeIndex idx : idxs) // Fail all merge indexes.
+ idx.fail(nodeId);
}
/**
@@ -1167,8 +1209,8 @@ public class GridReduceQueryExecutor {
while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
latch.countDown();
- for (GridMergeTable tbl : tbls) // Fail all merge indexes.
- tbl.getScanIndex(null).fail(e);
+ for (GridMergeIndex idx : idxs) // Fail all merge indexes.
+ idx.fail(e);
}
}