You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2020/09/22 10:00:37 UTC
[ignite] branch ignite-12248 updated: IGNITE-13463: Calcite
improvements. Rework RootNode rows exchange. This closes #8261
This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new 74230a4 IGNITE-13463: Calcite improvements. Rework RootNode rows exchange. This closes #8261
74230a4 is described below
commit 74230a45e0c5213bf48bcda1be333389702af52d
Author: zstan <st...@gmail.com>
AuthorDate: Tue Sep 22 13:00:16 2020 +0300
IGNITE-13463: Calcite improvements. Rework RootNode rows exchange. This closes #8261
---
.../query/calcite/exec/rel/RootNode.java | 176 +++++++++------------
.../query/calcite/CalciteQueryProcessorTest.java | 50 ++++++
.../processors/query/calcite/CancelTest.java | 4 +-
3 files changed, 126 insertions(+), 104 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
index f38c613..75a6962 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
@@ -25,7 +25,6 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -33,39 +32,43 @@ import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import static org.apache.ignite.cache.query.QueryCancelledException.ERR_MSG;
+
/**
* Client iterator.
*/
public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, Downstream<Row>, Iterator<Row> {
/** */
- private final ReentrantLock lock;
+ private final ReentrantLock lock = new ReentrantLock();
/** */
- private final Condition cond;
+ private final Condition cond = lock.newCondition();
/** */
- private final Deque<Row> buff;
+ private final Runnable onClose;
/** */
- private final Runnable onClose;
+ private final AtomicReference<Throwable> ex = new AtomicReference<>();
/** */
- private volatile State state = State.RUNNING;
+ private int waiting;
/** */
- private final AtomicReference<Throwable> ex = new AtomicReference<>();
+ private Deque<Row> inBuff = new ArrayDeque<>(IN_BUFFER_SIZE);
/** */
- private Row row;
+ private Deque<Row> outBuff = new ArrayDeque<>(IN_BUFFER_SIZE);
/** */
- private int waiting;
+ private volatile boolean closed;
/**
* @param ctx Execution context.
*/
public RootNode(ExecutionContext<Row> ctx) {
- this(ctx, null);
+ super(ctx);
+
+ onClose = this::closeInternal;
}
/**
@@ -74,10 +77,6 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
public RootNode(ExecutionContext<Row> ctx, Runnable onClose) {
super(ctx);
- buff = new ArrayDeque<>(IN_BUFFER_SIZE);
- lock = new ReentrantLock();
- cond = lock.newCondition();
-
this.onClose = onClose;
}
@@ -88,14 +87,15 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
/** {@inheritDoc} */
@Override public void close() {
+ if (closed)
+ return;
+
lock.lock();
try {
- if (state == State.RUNNING)
- state = State.CANCELLED;
- else if (state == State.END)
- state = State.CLOSED;
- else
- return;
+ if (waiting != -1)
+ ex.compareAndSet(null, new IgniteSQLException(ERR_MSG, IgniteQueryErrorCode.QUERY_CANCELED));
+
+ closed = true; // an exception has to be set first to get right check order
cond.signalAll();
}
@@ -103,84 +103,60 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
lock.unlock();
}
- if (onClose == null)
- closeInternal();
- else
- onClose.run();
+ onClose.run();
}
/** {@inheritDoc} */
@Override protected boolean isClosed() {
- return state == State.CANCELLED || state == State.CLOSED;
+ return closed;
}
/** {@inheritDoc} */
@Override public void closeInternal() {
- context().execute(() -> {
- buff.clear();
-
- U.closeQuiet(super::close);
- });
+ context().execute(() -> sources().forEach(U::closeQuiet));
}
/** {@inheritDoc} */
@Override public void push(Row row) {
assert waiting > 0;
+ lock.lock();
try {
checkState();
- int req = 0;
-
- lock.lock();
- try {
- if (state != State.RUNNING)
- return;
-
- waiting--;
-
- buff.offer(row);
+ waiting--;
- if (waiting == 0)
- waiting = req = IN_BUFFER_SIZE - buff.size();
+ inBuff.offer(row);
+ if (inBuff.size() == IN_BUFFER_SIZE)
cond.signalAll();
- }
- finally {
- lock.unlock();
- }
-
- if (req > 0)
- source().request(req);
}
catch (Exception e) {
onError(e);
}
+ finally {
+ lock.unlock();
+ }
}
/** {@inheritDoc} */
@Override public void end() {
+ assert waiting > 0;
+
+ lock.lock();
try {
checkState();
- lock.lock();
- try {
- assert waiting > 0 : "waiting=" + waiting;
-
- waiting = -1;
+ waiting = -1;
- if (state != State.RUNNING)
- return;
-
- cond.signalAll();
- }
- finally {
- lock.unlock();
- }
+ cond.signalAll();
}
catch (Exception e) {
onError(e);
}
+ finally {
+ lock.unlock();
+ }
}
/** {@inheritDoc} */
@@ -193,12 +169,17 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
/** {@inheritDoc} */
@Override public boolean hasNext() {
- if (row != null)
+ checkException();
+
+ if (!outBuff.isEmpty())
return true;
- else if (state == State.END || state == State.CLOSED)
+
+ if (closed && ex.get() == null)
return false;
- else
- return (row = take()) != null;
+
+ exchangeBuffers();
+
+ return !outBuff.isEmpty();
}
/** {@inheritDoc} */
@@ -206,10 +187,7 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
if (!hasNext())
throw new NoSuchElementException();
- Row cur0 = row;
- row = null;
-
- return cur0;
+ return outBuff.remove();
}
/** {@inheritDoc} */
@@ -236,61 +214,53 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
}
/** */
- private Row take() {
+ private void exchangeBuffers() {
assert !F.isEmpty(sources()) && sources().size() == 1;
lock.lock();
try {
- while (true) {
- checkCancelled();
- assert state == State.RUNNING;
+ while (ex.get() == null) {
+ assert outBuff.isEmpty();
- if (!buff.isEmpty())
- return buff.poll();
- else if (waiting == -1)
- break;
- else if (waiting == 0) {
+ if (inBuff.size() == IN_BUFFER_SIZE || waiting == -1) {
+ Deque<Row> tmp = inBuff;
+ inBuff = outBuff;
+ outBuff = tmp;
+ }
+
+ if (waiting == -1)
+ close();
+ else if (inBuff.isEmpty() && waiting == 0) {
int req = waiting = IN_BUFFER_SIZE;
context().execute(() -> source().request(req));
}
+ if (!outBuff.isEmpty() || waiting == -1)
+ break;
+
cond.await();
}
-
- state = State.END;
}
catch (InterruptedException e) {
- throw new IgniteInterruptedException(e);
+ onError(new IgniteInterruptedException(e));
}
finally {
lock.unlock();
}
- assert state == State.END;
-
- close();
-
- return null;
+ checkException();
}
/** */
- private void checkCancelled() {
- if (state == State.CANCELLED) {
- ex.compareAndSet(null, new IgniteSQLException("The query was cancelled while executing.", IgniteQueryErrorCode.QUERY_CANCELED));
+ private void checkException() {
+ Throwable e = ex.get();
- throw sqlException(ex.get());
- }
- }
+ if (e == null)
+ return;
- /** */
- private IgniteSQLException sqlException(Throwable e) {
- return e instanceof IgniteSQLException
- ? (IgniteSQLException)e
- : new IgniteSQLException("An error occurred while query executing.", IgniteQueryErrorCode.UNKNOWN, e);
- }
-
- /** */
- private enum State {
- RUNNING, CANCELLED, END, CLOSED
+ if (e instanceof IgniteSQLException)
+ throw (IgniteSQLException)e;
+ else
+ throw new IgniteSQLException("An error occurred while query executing.", IgniteQueryErrorCode.UNKNOWN, e);
}
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 2886ad9..12448a9 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -25,6 +25,7 @@ import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
@@ -407,6 +408,55 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
assertNull(row);
}
+
+ public void testThroughput() {
+ IgniteCache<Integer, Developer> developer = ignite.getOrCreateCache(new CacheConfiguration<Integer, Developer>()
+ .setCacheMode(CacheMode.REPLICATED)
+ .setName("developer")
+ .setSqlSchema("PUBLIC")
+ .setIndexedTypes(Integer.class, Developer.class)
+ .setBackups(2)
+ );
+
+ int numIterations = 1000;
+
+ int prId = -1;
+
+ for (int i = 0; i < 5000; i++) {
+ if (i % 1000 == 0)
+ prId++;
+
+ developer.put(i, new Developer("Name" + i, prId));
+ }
+
+ QueryEngine engine = Commons.lookupComponent(ignite.context(), QueryEngine.class);
+
+ // warmup
+ for (int i = 0; i < numIterations; i++) {
+ List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC", "select * from DEVELOPER");
+ query.get(0).getAll();
+ }
+
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < numIterations; i++) {
+ List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC", "select * from DEVELOPER");
+ query.get(0).getAll();
+ }
+ System.out.println("Calcite duration = " + (System.currentTimeMillis() - start));
+
+ // warmup
+ for (int i = 0; i < numIterations; i++) {
+ List<FieldsQueryCursor<List<?>>> query = ignite.context().query().querySqlFields(new SqlFieldsQuery("select * from DEVELOPER").setSchema("PUBLIC"), false, false);
+ query.get(0).getAll();
+ }
+
+ start = System.currentTimeMillis();
+ for (int i = 0; i < numIterations; i++) {
+ List<FieldsQueryCursor<List<?>>> query = ignite.context().query().querySqlFields(new SqlFieldsQuery("select * from DEVELOPER").setSchema("PUBLIC"), false, false);
+ query.get(0).getAll();
+ }
+ System.out.println("H2 duration = " + (System.currentTimeMillis() - start));
+ }
/** */
public static class Key {
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
index 0ddac9e..48af61c 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.ignite.internal.processors.query.calcite;
import java.util.Iterator;
@@ -43,6 +44,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static java.util.Collections.singletonList;
+import static org.apache.ignite.cache.query.QueryCancelledException.ERR_MSG;
/**
* Cancel query test.
@@ -110,7 +112,7 @@ public class CancelTest extends GridCommonAbstractTest {
return null;
},
- IgniteSQLException.class, "The query was cancelled while executing"
+ IgniteSQLException.class, ERR_MSG
);
awaitReservationsRelease("TEST");