You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/25 06:53:20 UTC

[04/15] ignite git commit: Skip merge table query.

Skip merge table query.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eb2b5f38
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eb2b5f38
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eb2b5f38

Branch: refs/heads/ignite-sql-opt-2
Commit: eb2b5f38ae639293b682c35603505933ec350612
Parents: a0a10fc
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 24 16:22:03 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 24 16:22:03 2015 +0300

----------------------------------------------------------------------
 .../cache/query/GridCacheSqlQuery.java          |  20 ++++
 .../query/h2/sql/GridSqlQuerySplitter.java      |  38 ++++++-
 .../query/h2/twostep/GridMergeIndex.java        |   7 ++
 .../h2/twostep/GridMergeIndexUnsorted.java      |  14 +++
 .../h2/twostep/GridReduceQueryExecutor.java     | 102 +++++++++++++------
 5 files changed, 149 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/eb2b5f38/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
index e56e445..49a926a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
@@ -70,6 +70,10 @@ public class GridCacheSqlQuery implements Message {
     /** Field kept for backward compatibility. */
     private String alias;
 
+    /** */
+    @GridDirectTransient
+    private boolean skipMergeTbl;
+
     /**
      * For {@link Message}.
      */
@@ -242,6 +246,21 @@ public class GridCacheSqlQuery implements Message {
     }
 
     /**
+     * @return {@code True} if reduce query can skip merge table creation and get data directly from merge index.
+     */
+    public boolean skipMergeTable() {
+        return skipMergeTbl;
+    }
+
+    /**
+     * @param skipMergeTbl {@code True} if reduce query can skip merge table creation and get
+     *      data directly from merge index.
+     */
+    public void skipMergeTable(boolean skipMergeTbl) {
+        this.skipMergeTbl = skipMergeTbl;
+    }
+
+    /**
      * @param args Arguments.
      * @return Copy.
      */
@@ -252,6 +271,7 @@ public class GridCacheSqlQuery implements Message {
         cp.cols = cols;
         cp.paramIdxs = paramIdxs;
         cp.paramsSize = paramsSize;
+        cp.skipMergeTbl = skipMergeTbl;
 
         if (F.isEmpty(args))
             cp.params = EMPTY_PARAMS;

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb2b5f38/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index ac47b50..741a998 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -173,22 +173,46 @@ public class GridSqlQuerySplitter {
         for (GridSqlElement exp : mapExps) // Add all map expressions as visible.
             mapQry.addColumn(exp, true);
 
-        for (GridSqlElement rdcExp : rdcExps) // Add corresponding visible reduce columns.
+        boolean rdcSimpleQry = true;
+
+        for (GridSqlElement rdcExp : rdcExps) { // Add corresponding visible reduce columns.
             rdcQry.addColumn(rdcExp, true);
 
+            if (rdcSimpleQry) {
+                if (rdcExp instanceof GridSqlColumn || rdcExp instanceof GridSqlConst)
+                    continue;
+
+                if (rdcExp instanceof GridSqlAlias) {
+                    if (rdcExp.size() == 1) {
+                        GridSqlElement child = rdcExp.child();
+
+                        if (child instanceof GridSqlColumn || child instanceof GridSqlConst)
+                            continue;
+                    }
+                }
+
+                rdcSimpleQry = false;
+            }
+        }
+
         for (int i = rdcExps.length; i < mapExps.size(); i++)  // Add all extra map columns as invisible reduce columns.
             rdcQry.addColumn(column(((GridSqlAlias)mapExps.get(i)).alias()), false);
 
         // -- GROUP BY
-        if (mapQry.groupColumns() != null && !collocated)
+        if (mapQry.groupColumns() != null && !collocated) {
             rdcQry.groupColumns(mapQry.groupColumns());
 
+            rdcSimpleQry = false;
+        }
+
         // -- HAVING
         if (mapQry.havingColumn() >= 0 && !collocated) {
             // TODO IGNITE-1140 - Find aggregate functions in HAVING clause or rewrite query to put all aggregates to SELECT clause.
             rdcQry.whereAnd(column(columnName(mapQry.havingColumn())));
 
             mapQry.havingColumn(-1);
+
+            rdcSimpleQry = false;
         }
 
         // -- ORDER BY
@@ -199,6 +223,8 @@ public class GridSqlQuerySplitter {
             if (aggregateFound) // Ordering over aggregates does not make sense.
                 mapQry.clearSort(); // Otherwise map sort will be used by offset-limit.
             // TODO IGNITE-1141 - Check if sorting is done over aggregated expression, otherwise we can sort and use offset-limit.
+
+            rdcSimpleQry = false;
         }
 
         // -- LIMIT
@@ -207,6 +233,8 @@ public class GridSqlQuerySplitter {
 
             if (aggregateFound)
                 mapQry.limit(null);
+
+            rdcSimpleQry = false;
         }
 
         // -- OFFSET
@@ -217,12 +245,16 @@ public class GridSqlQuerySplitter {
                 mapQry.limit(op(GridSqlOperationType.PLUS, mapQry.offset(), mapQry.limit()));
 
             mapQry.offset(null);
+
+            rdcSimpleQry = false;
         }
 
         // -- DISTINCT
         if (mapQry.distinct()) {
             mapQry.distinct(!aggregateFound && mapQry.groupColumns() == null && mapQry.havingColumn() < 0);
             rdcQry.distinct(true);
+
+            rdcSimpleQry = false;
         }
 
         IntArray paramIdxs = new IntArray(params.length);
@@ -230,6 +262,8 @@ public class GridSqlQuerySplitter {
         GridCacheSqlQuery rdc = new GridCacheSqlQuery(rdcQry.getSQL(),
             findParams(rdcQry, params, new ArrayList<>(), paramIdxs).toArray());
 
+        rdc.skipMergeTable(rdcSimpleQry);
+
         rdc.parameterIndexes(toIntArray(paramIdxs));
 
         paramIdxs = new IntArray(params.length);

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb2b5f38/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 7f8caed..12c2240 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -80,6 +80,13 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /**
+     *
+     */
+    protected GridMergeIndex() {
+        // No-op.
+    }
+
+    /**
      * @return Return source nodes for this merge index.
      */
     public Set<UUID> sources() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb2b5f38/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index 5f5eb25..501480a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -47,6 +47,20 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
         super(tbl, name, IndexType.createScan(false), IndexColumn.wrap(tbl.getColumns()));
     }
 
+    /**
+     * @return Dummy index instance.
+     */
+    public static GridMergeIndexUnsorted createDummy() {
+        return new GridMergeIndexUnsorted();
+    }
+
+    /**
+     *
+     */
+    private GridMergeIndexUnsorted() {
+        // No-op.
+    }
+
     /** {@inheritDoc} */
     @Override protected void addPage0(GridResultPage page) {
         assert page.rowsInPage() > 0 || page.isLast() || page.isFail();

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb2b5f38/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 78c1f77..a57366d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -79,10 +79,12 @@ import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.command.ddl.CreateTableData;
 import org.h2.engine.Session;
+import org.h2.index.Cursor;
 import org.h2.jdbc.JdbcConnection;
 import org.h2.jdbc.JdbcResultSet;
 import org.h2.jdbc.JdbcStatement;
 import org.h2.result.ResultInterface;
+import org.h2.result.Row;
 import org.h2.table.Column;
 import org.h2.util.IntArray;
 import org.h2.value.Value;
@@ -184,8 +186,8 @@ public class GridReduceQueryExecutor {
                 UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
 
                 for (QueryRun r : runs.values()) {
-                    for (GridMergeTable tbl : r.tbls) {
-                        if (tbl.getScanIndex(null).hasSource(nodeId)) {
+                    for (GridMergeIndex idx : r.idxs) {
+                        if (idx.hasSource(nodeId)) {
                             handleNodeLeft(r, nodeId);
 
                             break;
@@ -270,7 +272,7 @@ public class GridReduceQueryExecutor {
 
         final int pageSize = r.pageSize;
 
-        GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null);
+        GridMergeIndex idx = r.idxs.get(msg.query());
 
         GridResultPage page;
 
@@ -468,7 +470,7 @@ public class GridReduceQueryExecutor {
 
             r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize();
 
-            r.tbls = new ArrayList<>(qry.mapQueries().size());
+            r.idxs = new ArrayList<>(qry.mapQueries().size());
 
             String space = cctx.name();
 
@@ -510,27 +512,35 @@ public class GridReduceQueryExecutor {
 
             int tblIdx = 0;
 
+            final boolean skipMergeTbl = !qry.explain() && qry.reduceQuery().skipMergeTable();
+
             for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
-                GridMergeTable tbl;
+                GridMergeIndex idx;
 
-                try {
-                    tbl = createMergeTable(r.conn, mapQry, qry.explain());
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteException(e);
-                }
+                if (!skipMergeTbl) {
+                    GridMergeTable tbl;
 
-                GridMergeIndex idx = tbl.getScanIndex(null);
+                    try {
+                        tbl = createMergeTable(r.conn, mapQry, qry.explain());
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException(e);
+                    }
+
+                    idx = tbl.getScanIndex(null);
+
+                    fakeTable(r.conn, tblIdx++).setInnerTable(tbl);
+                }
+                else
+                    idx = GridMergeIndexUnsorted.createDummy();
 
                 for (ClusterNode node : nodes)
                     idx.addSource(node.id());
 
-                r.tbls.add(tbl);
-
-                fakeTable(r.conn, tblIdx++).setInnerTable(tbl);
+                r.idxs.add(idx);
             }
 
-            r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
+            r.latch = new CountDownLatch(r.idxs.size() * nodes.size());
 
             runs.put(qryReqId, r);
 
@@ -586,20 +596,50 @@ public class GridReduceQueryExecutor {
                 else // Send failed.
                     retry = true;
 
-                ResultSet res = null;
+                Iterator<List<?>> resIter = null;
 
                 if (!retry) {
                     if (qry.explain())
                         return explainPlan(r.conn, space, qry);
 
-                    GridCacheSqlQuery rdc = qry.reduceQuery();
+                    if (skipMergeTbl) {
+                        List<List<?>> res = new ArrayList<>();
 
-                    // Statement caching is prohibited here because we can't guarantee correct merge index reuse.
-                    res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters()), false);
+                        for (GridMergeIndex idx : r.idxs) {
+                            Cursor cur = idx.findInStream(null, null);
+
+                            while (cur.next()) {
+                                Row row = cur.get();
+
+                                int cols = row.getColumnCount();
+
+                                List<Object> resRow  = new ArrayList<>(cols);
+
+                                for (int c = 0; c < cols; c++)
+                                    resRow .add(row.getValue(c).getObject());
+
+                                res.add(resRow);
+                            }
+                        }
+
+                        resIter = res.iterator();
+                    }
+                    else {
+                        GridCacheSqlQuery rdc = qry.reduceQuery();
+
+                        // Statement caching is prohibited here because we can't guarantee correct merge index reuse.
+                        ResultSet res = h2.executeSqlQueryWithTimer(space,
+                            r.conn,
+                            rdc.query(),
+                            F.asList(rdc.parameters()),
+                            false);
+
+                        resIter = new Iter(res);
+                    }
                 }
 
-                for (GridMergeTable tbl : r.tbls) {
-                    if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes.
+                for (GridMergeIndex idx : r.idxs) {
+                    if (!idx.fetchedAll()) // We have to explicitly cancel queries on remote nodes.
                         send(nodes, new GridQueryCancelRequest(qryReqId), null);
                 }
 
@@ -610,7 +650,7 @@ public class GridReduceQueryExecutor {
                     continue;
                 }
 
-                return new GridQueryCacheObjectsIterator(new Iter(res), cctx, keepPortable);
+                return new GridQueryCacheObjectsIterator(resIter, cctx, keepPortable);
             }
             catch (IgniteCheckedException | RuntimeException e) {
                 U.closeQuiet(r.conn);
@@ -634,8 +674,10 @@ public class GridReduceQueryExecutor {
                 if (!runs.remove(qryReqId, r))
                     U.warn(log, "Query run was already removed: " + qryReqId);
 
-                for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++)
-                    fakeTable(null, i).setInnerTable(null); // Drop all merge tables.
+                if (!skipMergeTbl) {
+                    for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++)
+                        fakeTable(null, i).setInnerTable(null); // Drop all merge tables.
+                }
             }
         }
     }
@@ -1125,7 +1167,7 @@ public class GridReduceQueryExecutor {
      */
     private static class QueryRun {
         /** */
-        private List<GridMergeTable> tbls;
+        private List<GridMergeIndex> idxs;
 
         /** */
         private CountDownLatch latch;
@@ -1153,8 +1195,8 @@ public class GridReduceQueryExecutor {
             while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
                 latch.countDown();
 
-            for (GridMergeTable tbl : tbls) // Fail all merge indexes.
-                tbl.getScanIndex(null).fail(nodeId);
+            for (GridMergeIndex idx : idxs) // Fail all merge indexes.
+                idx.fail(nodeId);
         }
 
         /**
@@ -1167,8 +1209,8 @@ public class GridReduceQueryExecutor {
             while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
                 latch.countDown();
 
-            for (GridMergeTable tbl : tbls) // Fail all merge indexes.
-                tbl.getScanIndex(null).fail(e);
+            for (GridMergeIndex idx : idxs) // Fail all merge indexes.
+                idx.fail(e);
         }
     }