You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/08/16 06:42:07 UTC

ignite git commit: IGNITE-6019: SQL: do not pull the whole result set immediately to the client when there is no merge table. This closes #2430.

Repository: ignite
Updated Branches:
  refs/heads/master f3d3d1bd7 -> 59facfca3


IGNITE-6019: SQL: do not pull the whole result set immediately to the client when there is no merge table. This closes #2430.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59facfca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59facfca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59facfca

Branch: refs/heads/master
Commit: 59facfca3b60f22169f773f0e8b6f2e0b9a5c8e9
Parents: f3d3d1b
Author: Alexander Paschenko <al...@gmail.com>
Authored: Wed Aug 16 09:41:54 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Aug 16 09:41:54 2017 +0300

----------------------------------------------------------------------
 .../h2/twostep/GridMergeIndexIterator.java      | 165 +++++++++++++++++++
 .../h2/twostep/GridReduceQueryExecutor.java     |  61 +++----
 2 files changed, 188 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/59facfca/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
new file mode 100644
index 0000000..1c0efb3
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.h2.index.Cursor;
+import org.h2.result.Row;
+
+/**
+ * Iterator that transparently and sequentially traverses a bunch of {@link GridMergeIndex} objects.
+ */
+class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
+    /** Reduce query executor. */
+    private final GridReduceQueryExecutor rdcExec;
+
+    /** Participating nodes. */
+    private final Collection<ClusterNode> nodes;
+
+    /** Query run. */
+    private final ReduceQueryRun run;
+
+    /** Query request ID. */
+    private final long qryReqId;
+
+    /** Distributed joins. */
+    private final boolean distributedJoins;
+
+    /** Iterator over indexes. */
+    private final Iterator<GridMergeIndex> idxIter;
+
+    /** Current cursor. */
+    private Cursor cursor;
+
+    /** Next row to return. */
+    private List<Object> next;
+
+    /** Whether remote resources were released. */
+    private boolean released;
+
+    /**
+     * Constructor.
+     *
+     * @param rdcExec Reduce query executor.
+     * @param nodes Participating nodes.
+     * @param run Query run.
+     * @param qryReqId Query request ID.
+     * @param distributedJoins Distributed joins.
+     * @throws IgniteCheckedException if failed.
+     */
+    GridMergeIndexIterator(GridReduceQueryExecutor rdcExec, Collection<ClusterNode> nodes, ReduceQueryRun run,
+        long qryReqId, boolean distributedJoins)
+        throws IgniteCheckedException {
+        this.rdcExec = rdcExec;
+        this.nodes = nodes;
+        this.run = run;
+        this.qryReqId = qryReqId;
+        this.distributedJoins = distributedJoins;
+
+        this.idxIter = run.indexes().iterator();
+
+        advance();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNext() {
+        return next != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<?> next() {
+        List<Object> res = next;
+
+        if (res == null)
+            throw new NoSuchElementException();
+
+        advance();
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void remove() {
+        throw new UnsupportedOperationException("Remove is not supported");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        releaseIfNeeded();
+    }
+
+    /**
+     * Advance iterator.
+     */
+    private void advance() {
+        next = null;
+
+        try {
+            boolean hasNext = false;
+
+            while (cursor == null || !(hasNext = cursor.next())) {
+                if (idxIter.hasNext())
+                    cursor = idxIter.next().findInStream(null, null);
+                else {
+                    releaseIfNeeded();
+
+                    break;
+                }
+            }
+
+            if (hasNext) {
+                Row row = cursor.get();
+
+                int cols = row.getColumnCount();
+
+                List<Object> res = new ArrayList<>(cols);
+
+                for (int c = 0; c < cols; c++)
+                    res.add(row.getValue(c).getObject());
+
+                next = res;
+            }
+        }
+        catch (Exception e) {
+            releaseIfNeeded();
+
+            throw e;
+        }
+    }
+
+    /**
+     * Close routine.
+     */
+    private void releaseIfNeeded() {
+        if (!released) {
+            try {
+                rdcExec.releaseRemoteResources(nodes, run, qryReqId, distributedJoins);
+            }
+            finally {
+                released = true;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/59facfca/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 85a7e0b..0e9d1a2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -87,10 +87,8 @@ import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.command.ddl.CreateTableData;
 import org.h2.engine.Session;
-import org.h2.index.Cursor;
 import org.h2.index.Index;
 import org.h2.jdbc.JdbcConnection;
-import org.h2.result.Row;
 import org.h2.table.Column;
 import org.h2.util.IntArray;
 import org.h2.value.Value;
@@ -169,6 +167,7 @@ public class GridReduceQueryExecutor {
         log = ctx.log(GridReduceQueryExecutor.class);
 
         ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
+            @SuppressWarnings("deprecation")
             @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (!busyLock.enterBusy())
                     return;
@@ -511,7 +510,7 @@ public class GridReduceQueryExecutor {
      */
     public Iterator<List<?>> query(
         String schemaName,
-        GridCacheTwoStepQuery qry,
+        final GridCacheTwoStepQuery qry,
         boolean keepBinary,
         boolean enforceJoinOrder,
         int timeoutMillis,
@@ -622,6 +621,8 @@ public class GridReduceQueryExecutor {
 
             int replicatedQrysCnt = 0;
 
+            final Collection<ClusterNode> finalNodes = nodes;
+
             for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
                 GridMergeIndex idx;
 
@@ -665,6 +666,8 @@ public class GridReduceQueryExecutor {
 
             runs.put(qryReqId, r);
 
+            boolean release = true;
+
             try {
                 cancel.checkCancelled();
 
@@ -686,8 +689,6 @@ public class GridReduceQueryExecutor {
 
                 final boolean distributedJoins = qry.distributedJoins();
 
-                final Collection<ClusterNode> finalNodes = nodes;
-
                 cancel.set(new Runnable() {
                     @Override public void run() {
                         send(finalNodes, new GridQueryCancelRequest(qryReqId), null, false);
@@ -757,27 +758,9 @@ public class GridReduceQueryExecutor {
 
                 if (!retry) {
                     if (skipMergeTbl) {
-                        List<List<?>> res = new ArrayList<>();
-
-                        // Simple UNION ALL can have multiple indexes.
-                        for (GridMergeIndex idx : r.indexes()) {
-                            Cursor cur = idx.findInStream(null, null);
-
-                            while (cur.next()) {
-                                Row row = cur.get();
-
-                                int cols = row.getColumnCount();
-
-                                List<Object> resRow = new ArrayList<>(cols);
-
-                                for (int c = 0; c < cols; c++)
-                                    resRow.add(row.getValue(c).getObject());
+                        resIter = new GridMergeIndexIterator(this, finalNodes, r, qryReqId, qry.distributedJoins());
 
-                                res.add(resRow);
-                            }
-                        }
-
-                        resIter = res.iterator();
+                        release = false;
                     }
                     else {
                         cancel.checkCancelled();
@@ -820,6 +803,8 @@ public class GridReduceQueryExecutor {
                 return new GridQueryCacheObjectsIterator(resIter, h2.objectContext(), keepBinary);
             }
             catch (IgniteCheckedException | RuntimeException e) {
+                release = true;
+
                 U.closeQuiet(r.connection());
 
                 if (e instanceof CacheException) {
@@ -842,15 +827,13 @@ public class GridReduceQueryExecutor {
                 throw new CacheException("Failed to run reduce query locally.", cause);
             }
             finally {
-                // Make sure any activity related to current attempt is cancelled.
-                cancelRemoteQueriesIfNeeded(nodes, r, qryReqId, qry.distributedJoins());
-
-                if (!runs.remove(qryReqId, r))
-                    U.warn(log, "Query run was already removed: " + qryReqId);
+                if (release) {
+                    releaseRemoteResources(finalNodes, r, qryReqId, qry.distributedJoins());
 
-                if (!skipMergeTbl) {
-                    for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++)
-                        fakeTable(null, i).innerTable(null); // Drop all merge tables.
+                    if (!skipMergeTbl) {
+                        for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++)
+                            fakeTable(null, i).innerTable(null); // Drop all merge tables.
+                    }
                 }
             }
         }
@@ -885,16 +868,15 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * Release remote resources if needed.
+     *
      * @param nodes Query nodes.
      * @param r Query run.
      * @param qryReqId Query id.
      * @param distributedJoins Distributed join flag.
      */
-    private void cancelRemoteQueriesIfNeeded(Collection<ClusterNode> nodes,
-        ReduceQueryRun r,
-        long qryReqId,
-        boolean distributedJoins)
-    {
+    public void releaseRemoteResources(Collection<ClusterNode> nodes, ReduceQueryRun r, long qryReqId,
+        boolean distributedJoins) {
         // For distributedJoins need always send cancel request to cleanup resources.
         if (distributedJoins)
             send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
@@ -907,6 +889,9 @@ public class GridReduceQueryExecutor {
                 }
             }
         }
+
+        if (!runs.remove(qryReqId, r))
+            U.warn(log, "Query run was already removed: " + qryReqId);
     }
 
     /**