You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/08/16 06:42:07 UTC
ignite git commit: IGNITE-6019: SQL: do not pull the whole result set
immediately to the client when there is no merge table. This closes #2430.
Repository: ignite
Updated Branches:
refs/heads/master f3d3d1bd7 -> 59facfca3
IGNITE-6019: SQL: do not pull the whole result set immediately to the client when there is no merge table. This closes #2430.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59facfca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59facfca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59facfca
Branch: refs/heads/master
Commit: 59facfca3b60f22169f773f0e8b6f2e0b9a5c8e9
Parents: f3d3d1b
Author: Alexander Paschenko <al...@gmail.com>
Authored: Wed Aug 16 09:41:54 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Aug 16 09:41:54 2017 +0300
----------------------------------------------------------------------
.../h2/twostep/GridMergeIndexIterator.java | 165 +++++++++++++++++++
.../h2/twostep/GridReduceQueryExecutor.java | 61 +++----
2 files changed, 188 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/59facfca/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
new file mode 100644
index 0000000..1c0efb3
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.h2.index.Cursor;
+import org.h2.result.Row;
+
+/**
+ * Iterator that transparently and sequentially traverses a bunch of {@link GridMergeIndex} objects.
+ */
+class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
+ /** Reduce query executor. */
+ private final GridReduceQueryExecutor rdcExec;
+
+ /** Participating nodes. */
+ private final Collection<ClusterNode> nodes;
+
+ /** Query run. */
+ private final ReduceQueryRun run;
+
+ /** Query request ID. */
+ private final long qryReqId;
+
+ /** Distributed joins. */
+ private final boolean distributedJoins;
+
+ /** Iterator over indexes. */
+ private final Iterator<GridMergeIndex> idxIter;
+
+ /** Current cursor. */
+ private Cursor cursor;
+
+ /** Next row to return. */
+ private List<Object> next;
+
+ /** Whether remote resources were released. */
+ private boolean released;
+
+ /**
+ * Constructor.
+ *
+ * @param rdcExec Reduce query executor.
+ * @param nodes Participating nodes.
+ * @param run Query run.
+ * @param qryReqId Query request ID.
+ * @param distributedJoins Distributed joins.
+ * @throws IgniteCheckedException if failed.
+ */
+ GridMergeIndexIterator(GridReduceQueryExecutor rdcExec, Collection<ClusterNode> nodes, ReduceQueryRun run,
+ long qryReqId, boolean distributedJoins)
+ throws IgniteCheckedException {
+ this.rdcExec = rdcExec;
+ this.nodes = nodes;
+ this.run = run;
+ this.qryReqId = qryReqId;
+ this.distributedJoins = distributedJoins;
+
+ this.idxIter = run.indexes().iterator();
+
+ advance();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return next != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<?> next() {
+ List<Object> res = next;
+
+ if (res == null)
+ throw new NoSuchElementException();
+
+ advance();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ throw new UnsupportedOperationException("Remove is not supported");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ releaseIfNeeded();
+ }
+
+ /**
+ * Advance iterator.
+ */
+ private void advance() {
+ next = null;
+
+ try {
+ boolean hasNext = false;
+
+ while (cursor == null || !(hasNext = cursor.next())) {
+ if (idxIter.hasNext())
+ cursor = idxIter.next().findInStream(null, null);
+ else {
+ releaseIfNeeded();
+
+ break;
+ }
+ }
+
+ if (hasNext) {
+ Row row = cursor.get();
+
+ int cols = row.getColumnCount();
+
+ List<Object> res = new ArrayList<>(cols);
+
+ for (int c = 0; c < cols; c++)
+ res.add(row.getValue(c).getObject());
+
+ next = res;
+ }
+ }
+ catch (Exception e) {
+ releaseIfNeeded();
+
+ throw e;
+ }
+ }
+
+ /**
+ * Close routine.
+ */
+ private void releaseIfNeeded() {
+ if (!released) {
+ try {
+ rdcExec.releaseRemoteResources(nodes, run, qryReqId, distributedJoins);
+ }
+ finally {
+ released = true;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59facfca/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 85a7e0b..0e9d1a2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -87,10 +87,8 @@ import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.h2.command.ddl.CreateTableData;
import org.h2.engine.Session;
-import org.h2.index.Cursor;
import org.h2.index.Index;
import org.h2.jdbc.JdbcConnection;
-import org.h2.result.Row;
import org.h2.table.Column;
import org.h2.util.IntArray;
import org.h2.value.Value;
@@ -169,6 +167,7 @@ public class GridReduceQueryExecutor {
log = ctx.log(GridReduceQueryExecutor.class);
ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
+ @SuppressWarnings("deprecation")
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (!busyLock.enterBusy())
return;
@@ -511,7 +510,7 @@ public class GridReduceQueryExecutor {
*/
public Iterator<List<?>> query(
String schemaName,
- GridCacheTwoStepQuery qry,
+ final GridCacheTwoStepQuery qry,
boolean keepBinary,
boolean enforceJoinOrder,
int timeoutMillis,
@@ -622,6 +621,8 @@ public class GridReduceQueryExecutor {
int replicatedQrysCnt = 0;
+ final Collection<ClusterNode> finalNodes = nodes;
+
for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
GridMergeIndex idx;
@@ -665,6 +666,8 @@ public class GridReduceQueryExecutor {
runs.put(qryReqId, r);
+ boolean release = true;
+
try {
cancel.checkCancelled();
@@ -686,8 +689,6 @@ public class GridReduceQueryExecutor {
final boolean distributedJoins = qry.distributedJoins();
- final Collection<ClusterNode> finalNodes = nodes;
-
cancel.set(new Runnable() {
@Override public void run() {
send(finalNodes, new GridQueryCancelRequest(qryReqId), null, false);
@@ -757,27 +758,9 @@ public class GridReduceQueryExecutor {
if (!retry) {
if (skipMergeTbl) {
- List<List<?>> res = new ArrayList<>();
-
- // Simple UNION ALL can have multiple indexes.
- for (GridMergeIndex idx : r.indexes()) {
- Cursor cur = idx.findInStream(null, null);
-
- while (cur.next()) {
- Row row = cur.get();
-
- int cols = row.getColumnCount();
-
- List<Object> resRow = new ArrayList<>(cols);
-
- for (int c = 0; c < cols; c++)
- resRow.add(row.getValue(c).getObject());
+ resIter = new GridMergeIndexIterator(this, finalNodes, r, qryReqId, qry.distributedJoins());
- res.add(resRow);
- }
- }
-
- resIter = res.iterator();
+ release = false;
}
else {
cancel.checkCancelled();
@@ -820,6 +803,8 @@ public class GridReduceQueryExecutor {
return new GridQueryCacheObjectsIterator(resIter, h2.objectContext(), keepBinary);
}
catch (IgniteCheckedException | RuntimeException e) {
+ release = true;
+
U.closeQuiet(r.connection());
if (e instanceof CacheException) {
@@ -842,15 +827,13 @@ public class GridReduceQueryExecutor {
throw new CacheException("Failed to run reduce query locally.", cause);
}
finally {
- // Make sure any activity related to current attempt is cancelled.
- cancelRemoteQueriesIfNeeded(nodes, r, qryReqId, qry.distributedJoins());
-
- if (!runs.remove(qryReqId, r))
- U.warn(log, "Query run was already removed: " + qryReqId);
+ if (release) {
+ releaseRemoteResources(finalNodes, r, qryReqId, qry.distributedJoins());
- if (!skipMergeTbl) {
- for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++)
- fakeTable(null, i).innerTable(null); // Drop all merge tables.
+ if (!skipMergeTbl) {
+ for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++)
+ fakeTable(null, i).innerTable(null); // Drop all merge tables.
+ }
}
}
}
@@ -885,16 +868,15 @@ public class GridReduceQueryExecutor {
}
/**
+ * Release remote resources if needed.
+ *
* @param nodes Query nodes.
* @param r Query run.
* @param qryReqId Query id.
* @param distributedJoins Distributed join flag.
*/
- private void cancelRemoteQueriesIfNeeded(Collection<ClusterNode> nodes,
- ReduceQueryRun r,
- long qryReqId,
- boolean distributedJoins)
- {
+ public void releaseRemoteResources(Collection<ClusterNode> nodes, ReduceQueryRun r, long qryReqId,
+ boolean distributedJoins) {
// For distributedJoins need always send cancel request to cleanup resources.
if (distributedJoins)
send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
@@ -907,6 +889,9 @@ public class GridReduceQueryExecutor {
}
}
}
+
+ if (!runs.remove(qryReqId, r))
+ U.warn(log, "Query run was already removed: " + qryReqId);
}
/**