You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2021/02/08 14:13:05 UTC
[ignite] branch sql-calcite updated: IGNITE-13973 Fix query hang if
an assertion is thrown during the query execution - Fixes #8709.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new a823f50 IGNITE-13973 Fix query hang if an assertion is thrown during the query execution - Fixes #8709.
a823f50 is described below
commit a823f507d6e6893a53ac7a01e10943949cfa3384
Author: korlov42 <ko...@gridgain.com>
AuthorDate: Mon Feb 8 17:10:59 2021 +0300
IGNITE-13973 Fix query hang if an assertion is thrown during the query execution - Fixes #8709.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../query/calcite/exec/ExchangeServiceImpl.java | 24 ++-
.../query/calcite/exec/ExecutionContext.java | 20 ++-
.../query/calcite/exec/ExecutionServiceImpl.java | 7 +-
.../query/calcite/exec/LogicalRelImplementor.java | 5 +-
.../query/calcite/exec/rel/AbstractNode.java | 4 +-
.../query/calcite/exec/rel/AggregateNode.java | 68 +++----
.../exec/rel/CorrelatedNestedLoopJoinNode.java | 132 ++++++--------
.../query/calcite/exec/rel/Downstream.java | 6 +-
.../query/calcite/exec/rel/FilterNode.java | 60 +++----
.../processors/query/calcite/exec/rel/Inbox.java | 66 +++----
.../query/calcite/exec/rel/IndexSpoolNode.java | 51 ++----
.../query/calcite/exec/rel/LimitNode.java | 37 ++--
.../query/calcite/exec/rel/MergeJoinNode.java | 106 ++++-------
.../query/calcite/exec/rel/ModifyNode.java | 67 +++----
.../query/calcite/exec/rel/NestedLoopJoinNode.java | 108 ++++--------
.../processors/query/calcite/exec/rel/Node.java | 3 +-
.../processors/query/calcite/exec/rel/Outbox.java | 50 ++----
.../query/calcite/exec/rel/ProjectNode.java | 33 +---
.../query/calcite/exec/rel/RootNode.java | 16 +-
.../query/calcite/exec/rel/ScanNode.java | 32 ++--
.../query/calcite/exec/rel/SortNode.java | 65 +++----
.../query/calcite/exec/rel/TableSpoolNode.java | 59 +++----
.../query/calcite/exec/rel/UnionAllNode.java | 43 ++---
.../query/calcite/message/ErrorMessage.java | 3 +-
.../CalciteErrorHandlilngIntegrationTest.java | 196 +++++++++++++++++++++
.../calcite/exec/rel/ContinuousExecutionTest.java | 2 +-
.../query/calcite/exec/rel/ExecutionTest.java | 85 +++++++++
.../query/calcite/planner/PlannerTest.java | 10 +-
.../ignite/testsuites/IgniteCalciteTestSuite.java | 2 +
.../processors/query/GridQueryProcessor.java | 2 +-
modules/core/src/test/config/log4j-test.xml | 4 +
31 files changed, 696 insertions(+), 670 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index 6ed92b6..2da1b9d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -159,7 +159,7 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
if (!F.isEmpty(inboxes)) {
for (Inbox<?> inbox : inboxes) {
inbox.context().cancel();
- inbox.context().execute(inbox::close);
+ inbox.context().execute(inbox::close, inbox::onError);
}
}
else if (log.isDebugEnabled()) {
@@ -177,7 +177,7 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
if (!F.isEmpty(outboxes)) {
for (Outbox<?> outbox : outboxes) {
outbox.context().cancel();
- outbox.context().execute(outbox::close);
+ outbox.context().execute(outbox::close, outbox::onError);
}
}
else if (log.isDebugEnabled()) {
@@ -193,8 +193,14 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
protected void onMessage(UUID nodeId, QueryBatchAcknowledgeMessage msg) {
Outbox<?> outbox = mailboxRegistry().outbox(msg.queryId(), msg.exchangeId());
- if (outbox != null)
- outbox.onAcknowledge(nodeId, msg.batchId());
+ if (outbox != null) {
+ try {
+ outbox.onAcknowledge(nodeId, msg.batchId());
+ }
+ catch (Throwable t) {
+ outbox.onError(t);
+ }
+ }
else if (log.isDebugEnabled()) {
log.debug("Stale acknowledge message received: [" +
"nodeId=" + nodeId + ", " +
@@ -218,8 +224,14 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
inbox = mailboxRegistry().register(newInbox);
}
- if (inbox != null)
- inbox.onBatchReceived(nodeId, msg.batchId(), msg.last(), Commons.cast(msg.rows()));
+ if (inbox != null) {
+ try {
+ inbox.onBatchReceived(nodeId, msg.batchId(), msg.last(), Commons.cast(msg.rows()));
+ }
+ catch (Throwable t) {
+ inbox.onError(t);
+ }
+ }
else if (log.isDebugEnabled()) {
log.debug("Stale batch message received: [" +
"nodeId=" + nodeId + ", " +
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 936a782..23309f0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -22,6 +22,8 @@ import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.schema.SchemaPlus;
@@ -226,8 +228,22 @@ public class ExecutionContext<Row> implements DataContext {
*
* @param task Query task.
*/
- public void execute(Runnable task) {
- executor.execute(qryId, fragmentId(), task);
+ public void execute(RunnableX task, Consumer<Throwable> onError) {
+ executor.execute(qryId, fragmentId(), () -> {
+ try {
+ task.run();
+ }
+ catch (Throwable t) {
+ onError.accept(t);
+ }
+ });
+ }
+
+ /** */
+ @FunctionalInterface
+ public interface RunnableX {
+ /** */
+ void run() throws Exception;
}
/**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index f77273a..ef5dfb3 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -27,6 +27,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
@@ -718,7 +719,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
plan.target(fragment),
plan.remotes(fragment));
- Exception ex = null;
+ Throwable ex = null;
for (UUID nodeId : fragmentDesc.nodeIds()) {
if (ex != null)
info.onResponse(nodeId, fragment.fragmentId(), ex);
@@ -734,7 +735,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
messageService().send(nodeId, req);
}
- catch (Exception e) {
+ catch (Throwable e) {
info.onResponse(nodeId, fragment.fragmentId(), ex = e);
}
}
@@ -770,7 +771,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
failureProcessor())
.go(plan.root());
}
- catch (Exception ex) {
+ catch (Throwable ex) {
U.error(log, "Failed to build execution tree. ", ex);
mailboxRegistry.outboxes(qryId, frId, -1)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 823c236..e9a1ec3 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -96,6 +96,9 @@ import static org.apache.ignite.internal.processors.query.calcite.util.TypeUtils
@SuppressWarnings("TypeMayBeWeakened")
public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
/** */
+ public static final String CNLJ_SUPPORTS_ONLY_INNER_ASSERTION_MSG = "only INNER join supported by IgniteCorrelatedNestedLoop";
+
+ /** */
private final ExecutionContext<Row> ctx;
/** */
@@ -223,7 +226,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
RelDataType rowType = combinedRowType(ctx.getTypeFactory(), leftType, rightType);
Predicate<Row> cond = expressionFactory.predicate(rel.getCondition(), rowType);
- assert rel.getJoinType() == JoinRelType.INNER; // TODO LEFT, SEMI, ANTI
+ assert rel.getJoinType() == JoinRelType.INNER : CNLJ_SUPPORTS_ONLY_INNER_ASSERTION_MSG;
Node<Row> node = new CorrelatedNestedLoopJoinNode<>(ctx, outType, cond, rel.getVariablesSet());
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
index 41d2181..409d633 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
@@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
+
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;
@@ -178,7 +178,7 @@ public abstract class AbstractNode<Row> implements Node<Row> {
}
/** */
- protected void checkState() throws IgniteCheckedException {
+ protected void checkState() throws Exception {
if (context().isCancelled())
throw new ExecutionCancelledException();
if (Thread.interrupted())
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
index 7e242dd..70cc79d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
@@ -25,9 +25,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
+
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
@@ -98,62 +98,47 @@ public class AggregateNode<Row> extends AbstractNode<Row> implements SingleNode<
}
/** {@inheritDoc} */
- @Override public void request(int rowsCnt) {
+ @Override public void request(int rowsCnt) throws Exception {
assert !F.isEmpty(sources()) && sources().size() == 1;
assert rowsCnt > 0 && requested == 0;
assert waiting <= 0;
- try {
- checkState();
+ checkState();
- requested = rowsCnt;
+ requested = rowsCnt;
- if (waiting == 0)
- source().request(waiting = IN_BUFFER_SIZE);
- else if (!inLoop)
- context().execute(this::doFlush);
- }
- catch (Exception e) {
- onError(e);
- }
+ if (waiting == 0)
+ source().request(waiting = IN_BUFFER_SIZE);
+ else if (!inLoop)
+ context().execute(this::doFlush, this::onError);
}
/** {@inheritDoc} */
- @Override public void push(Row row) {
+ @Override public void push(Row row) throws Exception {
assert downstream() != null;
assert waiting > 0;
- try {
- checkState();
+ checkState();
- waiting--;
+ waiting--;
- for (Grouping grouping : groupings)
- grouping.add(row);
+ for (Grouping grouping : groupings)
+ grouping.add(row);
- if (waiting == 0)
- source().request(waiting = IN_BUFFER_SIZE);
- }
- catch (Exception e) {
- onError(e);
- }
+ if (waiting == 0)
+ source().request(waiting = IN_BUFFER_SIZE);
}
/** {@inheritDoc} */
- @Override public void end() {
+ @Override public void end() throws Exception {
assert downstream() != null;
assert waiting > 0;
- try {
- checkState();
+ checkState();
- waiting = -1;
+ waiting = -1;
- flush();
- }
- catch (Exception e) {
- onError(e);
- }
+ flush();
}
/** {@inheritDoc} */
@@ -172,19 +157,14 @@ public class AggregateNode<Row> extends AbstractNode<Row> implements SingleNode<
}
/** */
- private void doFlush() {
- try {
- checkState();
+ private void doFlush() throws Exception {
+ checkState();
- flush();
- }
- catch (Exception e) {
- onError(e);
- }
+ flush();
}
/** */
- private void flush() throws IgniteCheckedException {
+ private void flush() throws Exception {
assert waiting == -1;
int processed = 0;
@@ -208,7 +188,7 @@ public class AggregateNode<Row> extends AbstractNode<Row> implements SingleNode<
if (processed >= IN_BUFFER_SIZE && requested > 0) {
// allow others to do their job
- context().execute(this::doFlush);
+ context().execute(this::doFlush, this::onError);
return;
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
index 741bcc4..72c8de5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
@@ -22,9 +22,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
+
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import org.apache.ignite.internal.util.typedef.F;
@@ -96,20 +96,15 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override public void request(int rowsCnt) {
+ @Override public void request(int rowsCnt) throws Exception {
assert !F.isEmpty(sources()) && sources().size() == 2;
assert rowsCnt > 0 && requested == 0;
- try {
- checkState();
+ checkState();
- requested = rowsCnt;
+ requested = rowsCnt;
- onRequest();
- }
- catch (Exception e) {
- onError(e);
- }
+ onRequest();
}
/** {@inheritDoc} */
@@ -132,12 +127,12 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> {
if (idx == 0)
return new Downstream<Row>() {
/** {@inheritDoc} */
- @Override public void push(Row row) {
+ @Override public void push(Row row) throws Exception {
pushLeft(row);
}
/** {@inheritDoc} */
- @Override public void end() {
+ @Override public void end() throws Exception {
endLeft();
}
@@ -149,12 +144,12 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> {
else if (idx == 1)
return new Downstream<Row>() {
/** {@inheritDoc} */
- @Override public void push(Row row) {
+ @Override public void push(Row row) throws Exception {
pushRight(row);
}
/** {@inheritDoc} */
- @Override public void end() {
+ @Override public void end() throws Exception {
endRight();
}
@@ -168,91 +163,71 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> {
}
/** */
- private void pushLeft(Row row) {
+ private void pushLeft(Row row) throws Exception {
assert downstream() != null;
assert waitingLeft > 0;
- try {
- checkState();
+ checkState();
- waitingLeft--;
+ waitingLeft--;
- if (leftInBuf == null)
- leftInBuf = new ArrayList<>(leftInBufferSize);
+ if (leftInBuf == null)
+ leftInBuf = new ArrayList<>(leftInBufferSize);
- leftInBuf.add(row);
+ leftInBuf.add(row);
- onPushLeft();
- }
- catch (Exception e) {
- onError(e);
- }
+ onPushLeft();
}
/** */
- private void pushRight(Row row) {
+ private void pushRight(Row row) throws Exception {
assert downstream() != null;
assert waitingRight > 0;
- try {
- checkState();
+ checkState();
- waitingRight--;
+ waitingRight--;
- if (rightInBuf == null)
- rightInBuf = new ArrayList<>(rightInBufferSize);
+ if (rightInBuf == null)
+ rightInBuf = new ArrayList<>(rightInBufferSize);
- rightInBuf.add(row);
+ rightInBuf.add(row);
- onPushRight();
- }
- catch (Exception e) {
- onError(e);
- }
+ onPushRight();
}
/** */
- private void endLeft() {
+ private void endLeft() throws Exception {
assert downstream() != null;
assert waitingLeft > 0;
- try {
- checkState();
+ checkState();
- waitingLeft = -1;
+ waitingLeft = -1;
- if (leftInBuf == null)
- leftInBuf = Collections.emptyList();
+ if (leftInBuf == null)
+ leftInBuf = Collections.emptyList();
- onEndLeft();
- }
- catch (Exception e) {
- onError(e);
- }
+ onEndLeft();
}
/** */
- private void endRight() {
+ private void endRight() throws Exception {
assert downstream() != null;
assert waitingRight > 0;
- try {
- checkState();
+ checkState();
- waitingRight = -1;
+ waitingRight = -1;
- if (rightInBuf == null)
- rightInBuf = Collections.emptyList();
+ if (rightInBuf == null)
+ rightInBuf = Collections.emptyList();
- onEndRight();
- }
- catch (Exception e) {
- onError(e);
- }
+ onEndRight();
}
/** */
- private void onRequest() {
+ private void onRequest() throws Exception {
switch (state) {
case IN_LOOP:
case FILLING_RIGHT:
@@ -265,16 +240,11 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> {
assert F.isEmpty(rightInBuf);
context().execute(() -> {
- try {
- checkState();
+ checkState();
- state = State.FILLING_LEFT;
- leftSource().request(waitingLeft = leftInBufferSize);
- }
- catch (Exception e) {
- onError(e);
- }
- });
+ state = State.FILLING_LEFT;
+ leftSource().request(waitingLeft = leftInBufferSize);
+ }, this::onError);
break;
case IDLE:
@@ -284,15 +254,10 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> {
assert waitingLeft == -1 || waitingLeft == 0 && leftInBuf.size() == leftInBufferSize;
context().execute(() -> {
- try {
- checkState();
+ checkState();
- join();
- }
- catch (Exception e) {
- onError(e);
- }
- });
+ join();
+ }, this::onError);
break;
@@ -306,7 +271,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> {
}
/** */
- private void onPushLeft() {
+ private void onPushLeft() throws Exception {
assert state == State.FILLING_LEFT : "Unexpected state:" + state;
assert waitingRight == 0 || waitingRight == -1;
assert F.isEmpty(rightInBuf);
@@ -325,7 +290,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> {
}
/** */
- private void onPushRight() throws IgniteCheckedException {
+ private void onPushRight() throws Exception {
assert state == State.FILLING_RIGHT : "Unexpected state:" + state;
assert !F.isEmpty(leftInBuf);
assert waitingLeft == -1 || waitingLeft == 0 && leftInBuf.size() == leftInBufferSize;
@@ -340,7 +305,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> {
}
/** */
- private void onEndLeft() {
+ private void onEndLeft() throws Exception {
assert state == State.FILLING_LEFT : "Unexpected state:" + state;
assert waitingLeft == -1;
assert waitingRight == 0 || waitingRight == -1;
@@ -367,7 +332,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> {
}
/** */
- private void onEndRight() throws IgniteCheckedException {
+ private void onEndRight() throws Exception {
assert state == State.FILLING_RIGHT : "Unexpected state:" + state;
assert waitingRight == -1;
assert !F.isEmpty(leftInBuf);
@@ -379,7 +344,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> {
}
/** */
- private void join() throws IgniteCheckedException {
+ private void join() throws Exception {
assert state == State.IDLE;
state = State.IN_LOOP;
@@ -466,6 +431,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> {
return sources().get(1);
}
+ /** */
private void prepareCorrelations() {
for (int i = 0; i < correlationIds.size(); i++) {
Row row = i < leftInBuf.size() ? leftInBuf.get(i) : F.first(leftInBuf);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Downstream.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Downstream.java
index 50b7d66..bf274c1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Downstream.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Downstream.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel;
/**
* Represents an abstract data consumer.
*
- * <p/><b>Note</b>: except several cases (like consumer node and mailboxes), {@link Node#request(int)}, {@link Node#cancel()},
+ * <p/><b>Note</b>: except several cases (like consumer node and mailboxes), {@link Node#request(int)},
* {@link Downstream#push(Object)} and {@link Downstream#end()} methods should be used from one single thread.
*/
public interface Downstream<Row> {
@@ -28,12 +28,12 @@ public interface Downstream<Row> {
* Pushes a row to consumer.
* @param row Data row.
*/
- void push(Row row);
+ void push(Row row) throws Exception;
/**
* Signals that data is over.
*/
- void end();
+ void end() throws Exception;
/** */
void onError(Throwable e);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
index cbc0922..b0c0c6d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
@@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.function.Predicate;
+
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.util.typedef.F;
@@ -55,58 +55,43 @@ public class FilterNode<Row> extends AbstractNode<Row> implements SingleNode<Row
}
/** {@inheritDoc} */
- @Override public void request(int rowsCnt) {
+ @Override public void request(int rowsCnt) throws Exception {
assert !F.isEmpty(sources()) && sources().size() == 1;
assert rowsCnt > 0 && requested == 0;
- try {
- checkState();
+ checkState();
- requested = rowsCnt;
+ requested = rowsCnt;
- if (!inLoop)
- context().execute(this::doFilter);
- }
- catch (Exception e) {
- onError(e);
- }
+ if (!inLoop)
+ context().execute(this::doFilter, this::onError);
}
/** {@inheritDoc} */
- @Override public void push(Row row) {
+ @Override public void push(Row row) throws Exception {
assert downstream() != null;
assert waiting > 0;
- try {
- checkState();
+ checkState();
- waiting--;
+ waiting--;
- if (pred.test(row))
- inBuf.add(row);
+ if (pred.test(row))
+ inBuf.add(row);
- filter();
- }
- catch (Exception e) {
- onError(e);
- }
+ filter();
}
/** {@inheritDoc} */
- @Override public void end() {
+ @Override public void end() throws Exception {
assert downstream() != null;
assert waiting > 0;
- try {
- checkState();
+ checkState();
- waiting = -1;
+ waiting = -1;
- filter();
- }
- catch (Exception e) {
- onError(e);
- }
+ filter();
}
/** {@inheritDoc} */
@@ -125,19 +110,14 @@ public class FilterNode<Row> extends AbstractNode<Row> implements SingleNode<Row
}
/** */
- private void doFilter() {
- try {
- checkState();
+ private void doFilter() throws Exception {
+ checkState();
- filter();
- }
- catch (Exception e) {
- onError(e);
- }
+ filter();
}
/** */
- private void filter() throws IgniteCheckedException {
+ private void filter() throws Exception {
inLoop = true;
try {
while (requested > 0 && !inBuf.isEmpty()) {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
index 7bdc0ac..ba2a146 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
@@ -124,21 +124,16 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl
}
/** {@inheritDoc} */
- @Override public void request(int rowsCnt) {
+ @Override public void request(int rowsCnt) throws Exception {
assert srcNodeIds != null;
assert rowsCnt > 0 && requested == 0;
- try {
- checkState();
+ checkState();
- requested = rowsCnt;
+ requested = rowsCnt;
- if (!inLoop)
- context().execute(this::doPush);
- }
- catch (Exception e) {
- onError(e);
- }
+ if (!inLoop)
+ context().execute(this::doPush, this::onError);
}
/** {@inheritDoc} */
@@ -171,36 +166,26 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl
* @param last Last batch flag.
* @param rows Rows.
*/
- public void onBatchReceived(UUID src, int batchId, boolean last, List<Row> rows) {
- try {
- Buffer buf = getOrCreateBuffer(src);
+ public void onBatchReceived(UUID src, int batchId, boolean last, List<Row> rows) throws Exception {
+ Buffer buf = getOrCreateBuffer(src);
- boolean waitingBefore = buf.check() == State.WAITING;
+ boolean waitingBefore = buf.check() == State.WAITING;
- buf.offer(batchId, last, rows);
+ buf.offer(batchId, last, rows);
- if (requested > 0 && waitingBefore && buf.check() != State.WAITING)
- push();
- }
- catch (Exception e) {
- onError(e);
- }
+ if (requested > 0 && waitingBefore && buf.check() != State.WAITING)
+ push();
}
/** */
- private void doPush() {
- try {
- checkState();
+ private void doPush() throws Exception {
+ checkState();
- push();
- }
- catch (Exception e) {
- onError(e);
- }
+ push();
}
/** */
- private void push() throws IgniteCheckedException {
+ private void push() throws Exception {
if (buffers == null) {
for (UUID node : srcNodeIds)
checkNode(node);
@@ -219,7 +204,7 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl
}
/** */
- private void pushOrdered() throws IgniteCheckedException {
+ private void pushOrdered() throws Exception {
PriorityQueue<Pair<Row, Buffer>> heap =
new PriorityQueue<>(Math.max(buffers.size(), 1), Map.Entry.comparingByKey(comp));
@@ -281,7 +266,7 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl
}
/** */
- private void pushUnordered() throws IgniteCheckedException {
+ private void pushUnordered() throws Exception {
int idx = 0, noProgress = 0;
inLoop = true;
@@ -341,22 +326,17 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl
/** */
public void onNodeLeft(UUID nodeId) {
if (context().originatingNodeId().equals(nodeId) && srcNodeIds == null)
- context().execute(this::close);
+ context().execute(this::close, this::onError);
else if (srcNodeIds != null && srcNodeIds.contains(nodeId))
- context().execute(() -> onNodeLeft0(nodeId));
+ context().execute(() -> onNodeLeft0(nodeId), this::onError);
}
/** */
- private void onNodeLeft0(UUID nodeId) {
- try {
- checkState();
+ private void onNodeLeft0(UUID nodeId) throws Exception {
+ checkState();
- if (getOrCreateBuffer(nodeId).check() != State.END)
- onError(new ClusterTopologyCheckedException("Failed to execute query, node left [nodeId=" + nodeId + ']'));
- }
- catch (Exception e) {
- onError(e);
- }
+ if (getOrCreateBuffer(nodeId).check() != State.END)
+ throw new ClusterTopologyCheckedException("Failed to execute query, node left [nodeId=" + nodeId + ']');
}
/** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java
index d8745f6..51607d3 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java
@@ -101,62 +101,47 @@ public class IndexSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
}
/** {@inheritDoc} */
- @Override public void request(int rowsCnt) {
+ @Override public void request(int rowsCnt) throws Exception {
assert !F.isEmpty(sources()) && sources().size() == 1;
assert rowsCnt > 0;
- try {
- checkState();
+ checkState();
- if (!indexReady()) {
- requested = rowsCnt;
+ if (!indexReady()) {
+ requested = rowsCnt;
- requestSource();
- }
- else
- scan.request(rowsCnt);
- }
- catch (Exception e) {
- onError(e);
+ requestSource();
}
+ else
+ scan.request(rowsCnt);
}
/** */
- private void requestSource() {
+ private void requestSource() throws Exception {
waiting = IN_BUFFER_SIZE;
source().request(IN_BUFFER_SIZE);
}
/** {@inheritDoc} */
- @Override public void push(Row row) {
- try {
- checkState();
+ @Override public void push(Row row) throws Exception {
+ checkState();
- idx.push(row);
+ idx.push(row);
- waiting--;
+ waiting--;
- if (waiting == 0)
- context().execute(this::requestSource);
- }
- catch (Exception e) {
- onError(e);
- }
+ if (waiting == 0)
+ context().execute(this::requestSource, this::onError);
}
/** {@inheritDoc} */
- @Override public void end() {
- try {
- checkState();
+ @Override public void end() throws Exception {
+ checkState();
- waiting = -1;
+ waiting = -1;
- scan.request(requested);
- }
- catch (Exception e) {
- scan.downstream().onError(e);
- }
+ scan.request(requested);
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitNode.java
index 842a44b..ce85c38 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitNode.java
@@ -60,7 +60,7 @@ public class LimitNode<Row> extends AbstractNode<Row> implements SingleNode<Row>
}
/** {@inheritDoc} */
- @Override public void request(int rowsCnt) {
+ @Override public void request(int rowsCnt) throws Exception {
assert !F.isEmpty(sources()) && sources().size() == 1;
assert rowsCnt > 0;
@@ -73,18 +73,13 @@ public class LimitNode<Row> extends AbstractNode<Row> implements SingleNode<Row>
if (offset > 0 && rowsProcessed == 0)
rowsCnt = offset + rowsCnt;
- try {
- checkState();
+ checkState();
- source().request(waiting = rowsCnt);
- }
- catch (Exception e) {
- onError(e);
- }
+ source().request(waiting = rowsCnt);
}
/** {@inheritDoc} */
- @Override public void push(Row row) {
+ @Override public void push(Row row) throws Exception {
if (waiting == -1)
return;
@@ -92,12 +87,7 @@ public class LimitNode<Row> extends AbstractNode<Row> implements SingleNode<Row>
--waiting;
- try {
- checkState();
- }
- catch (Throwable e) {
- onError(e);
- }
+ checkState();
if (rowsProcessed > offset) {
if (fetchNode == null || (fetchNode != null && rowsProcessed <= fetch + offset))
@@ -109,20 +99,15 @@ public class LimitNode<Row> extends AbstractNode<Row> implements SingleNode<Row>
}
/** {@inheritDoc} */
- @Override public void end() {
- try {
- if (waiting == -1)
- return;
+ @Override public void end() throws Exception {
+ if (waiting == -1)
+ return;
- assert downstream() != null;
+ assert downstream() != null;
- waiting = -1;
+ waiting = -1;
- downstream().end();
- }
- catch (Exception e) {
- onError(e);
- }
+ downstream().end();
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
index feaa373..e97968b85 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
@@ -22,9 +22,9 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
+
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import org.apache.ignite.internal.util.typedef.F;
@@ -71,33 +71,23 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override public void request(int rowsCnt) {
+ @Override public void request(int rowsCnt) throws Exception {
assert !F.isEmpty(sources()) && sources().size() == 2;
assert rowsCnt > 0 && requested == 0;
- try {
- checkState();
+ checkState();
- requested = rowsCnt;
+ requested = rowsCnt;
- if (!inLoop)
- context().execute(this::doJoin);
- }
- catch (Exception e) {
- onError(e);
- }
+ if (!inLoop)
+ context().execute(this::doJoin, this::onError);
}
/** */
- private void doJoin() {
- try {
- checkState();
+ private void doJoin() throws Exception {
+ checkState();
- join();
- }
- catch (Exception e) {
- onError(e);
- }
+ join();
}
/** {@inheritDoc} */
@@ -115,12 +105,12 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> {
if (idx == 0)
return new Downstream<Row>() {
/** {@inheritDoc} */
- @Override public void push(Row row) {
+ @Override public void push(Row row) throws Exception {
pushLeft(row);
}
/** {@inheritDoc} */
- @Override public void end() {
+ @Override public void end() throws Exception {
endLeft();
}
@@ -132,12 +122,12 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> {
else if (idx == 1)
return new Downstream<Row>() {
/** {@inheritDoc} */
- @Override public void push(Row row) {
+ @Override public void push(Row row) throws Exception {
pushRight(row);
}
/** {@inheritDoc} */
- @Override public void end() {
+ @Override public void end() throws Exception {
endRight();
}
@@ -151,75 +141,55 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> {
}
/** */
- private void pushLeft(Row row) {
+ private void pushLeft(Row row) throws Exception {
assert downstream() != null;
assert waitingLeft > 0;
- try {
- checkState();
+ checkState();
- waitingLeft--;
+ waitingLeft--;
- leftInBuf.add(row);
+ leftInBuf.add(row);
- join();
- }
- catch (Exception e) {
- onError(e);
- }
+ join();
}
/** */
- private void pushRight(Row row) {
+ private void pushRight(Row row) throws Exception {
assert downstream() != null;
assert waitingRight > 0;
- try {
- checkState();
+ checkState();
- waitingRight--;
+ waitingRight--;
- rightInBuf.add(row);
+ rightInBuf.add(row);
- join();
- }
- catch (Exception e) {
- onError(e);
- }
+ join();
}
/** */
- private void endLeft() {
+ private void endLeft() throws Exception {
assert downstream() != null;
assert waitingLeft > 0;
- try {
- checkState();
+ checkState();
- waitingLeft = NOT_WAITING;
+ waitingLeft = NOT_WAITING;
- join();
- }
- catch (Exception e) {
- onError(e);
- }
+ join();
}
/** */
- private void endRight() {
+ private void endRight() throws Exception {
assert downstream() != null;
assert waitingRight > 0;
- try {
- checkState();
+ checkState();
- waitingRight = NOT_WAITING;
+ waitingRight = NOT_WAITING;
- join();
- }
- catch (Exception e) {
- onError(e);
- }
+ join();
}
/** */
@@ -233,7 +203,7 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> {
}
/** */
- protected abstract void join() throws IgniteCheckedException;
+ protected abstract void join() throws Exception;
/** */
@NotNull public static <Row> MergeJoinNode<Row> create(ExecutionContext<Row> ctx, RelDataType outputRowType, RelDataType leftRowType,
@@ -310,7 +280,7 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override protected void join() throws IgniteCheckedException {
+ @Override protected void join() throws Exception {
inLoop = true;
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty() || rightMaterialization != null)) {
@@ -466,7 +436,7 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override protected void join() throws IgniteCheckedException {
+ @Override protected void join() throws Exception {
inLoop = true;
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty())
@@ -639,7 +609,7 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override protected void join() throws IgniteCheckedException {
+ @Override protected void join() throws Exception {
inLoop = true;
try {
while (requested > 0 && !(left == null && leftInBuf.isEmpty() && waitingLeft != NOT_WAITING)
@@ -835,7 +805,7 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override protected void join() throws IgniteCheckedException {
+ @Override protected void join() throws Exception {
inLoop = true;
try {
while (requested > 0 && !(left == null && leftInBuf.isEmpty() && waitingLeft != NOT_WAITING)
@@ -1033,7 +1003,7 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override protected void join() throws IgniteCheckedException {
+ @Override protected void join() throws Exception {
inLoop = true;
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty())) {
@@ -1109,7 +1079,7 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override protected void join() throws IgniteCheckedException {
+ @Override protected void join() throws Exception {
inLoop = true;
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && !(right == null && rightInBuf.isEmpty() && waitingRight != NOT_WAITING)) {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
index 5d253c3..d1aaa38 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
@@ -89,71 +89,56 @@ public class ModifyNode<Row> extends AbstractNode<Row> implements SingleNode<Row
}
/** {@inheritDoc} */
- @Override public void request(int rowsCnt) {
+ @Override public void request(int rowsCnt) throws Exception {
assert !F.isEmpty(sources()) && sources().size() == 1;
assert rowsCnt > 0 && requested == 0;
- try {
- checkState();
+ checkState();
- requested = rowsCnt;
+ requested = rowsCnt;
- if (!inLoop)
- tryEnd();
- }
- catch (Exception e) {
- onError(e);
- }
+ if (!inLoop)
+ tryEnd();
}
/** {@inheritDoc} */
- @Override public void push(Row row) {
+ @Override public void push(Row row) throws Exception {
assert downstream() != null;
assert waiting > 0;
assert state == State.UPDATING;
- try {
- checkState();
+ checkState();
- waiting--;
+ waiting--;
- switch (op) {
- case DELETE:
- case UPDATE:
- case INSERT:
- tuples.add(desc.toTuple(context(), row, op, cols));
-
- flushTuples(false);
+ switch (op) {
+ case DELETE:
+ case UPDATE:
+ case INSERT:
+ tuples.add(desc.toTuple(context(), row, op, cols));
- break;
- default:
- throw new UnsupportedOperationException(op.name());
- }
+ flushTuples(false);
- if (waiting == 0)
- source().request(waiting = MODIFY_BATCH_SIZE);
- }
- catch (Exception e) {
- onError(e);
+ break;
+ default:
+ throw new UnsupportedOperationException(op.name());
}
+
+ if (waiting == 0)
+ source().request(waiting = MODIFY_BATCH_SIZE);
}
/** {@inheritDoc} */
- @Override public void end() {
+ @Override public void end() throws Exception {
assert downstream() != null;
assert waiting > 0;
- try {
- checkState();
+ checkState();
- waiting = -1;
- state = State.UPDATED;
+ waiting = -1;
+ state = State.UPDATED;
- tryEnd();
- }
- catch (Exception e) {
- onError(e);
- }
+ tryEnd();
}
/** {@inheritDoc} */
@@ -170,7 +155,7 @@ public class ModifyNode<Row> extends AbstractNode<Row> implements SingleNode<Row
}
/** */
- private void tryEnd() throws IgniteCheckedException {
+ private void tryEnd() throws Exception {
assert downstream() != null;
if (state == State.UPDATING && waiting == 0)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
index b26649a..a27e9f2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
@@ -23,9 +23,9 @@ import java.util.BitSet;
import java.util.Deque;
import java.util.List;
import java.util.function.Predicate;
+
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import org.apache.ignite.internal.util.typedef.F;
@@ -72,33 +72,23 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override public void request(int rowsCnt) {
+ @Override public void request(int rowsCnt) throws Exception {
assert !F.isEmpty(sources()) && sources().size() == 2;
assert rowsCnt > 0 && requested == 0;
- try {
- checkState();
+ checkState();
- requested = rowsCnt;
+ requested = rowsCnt;
- if (!inLoop)
- context().execute(this::doJoin);
- }
- catch (Exception e) {
- onError(e);
- }
+ if (!inLoop)
+ context().execute(this::doJoin, this::onError);
}
/** */
- private void doJoin() {
- try {
- checkState();
+ private void doJoin() throws Exception {
+ checkState();
- join();
- }
- catch (Exception e) {
- onError(e);
- }
+ join();
}
/** {@inheritDoc} */
@@ -116,12 +106,12 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> {
if (idx == 0)
return new Downstream<Row>() {
/** {@inheritDoc} */
- @Override public void push(Row row) {
+ @Override public void push(Row row) throws Exception {
pushLeft(row);
}
/** {@inheritDoc} */
- @Override public void end() {
+ @Override public void end() throws Exception {
endLeft();
}
@@ -133,12 +123,12 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> {
else if (idx == 1)
return new Downstream<Row>() {
/** {@inheritDoc} */
- @Override public void push(Row row) {
+ @Override public void push(Row row) throws Exception {
pushRight(row);
}
/** {@inheritDoc} */
- @Override public void end() {
+ @Override public void end() throws Exception {
endRight();
}
@@ -152,76 +142,56 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> {
}
/** */
- private void pushLeft(Row row) {
+ private void pushLeft(Row row) throws Exception {
assert downstream() != null;
assert waitingLeft > 0;
- try {
- checkState();
+ checkState();
- waitingLeft--;
+ waitingLeft--;
- leftInBuf.add(row);
+ leftInBuf.add(row);
- join();
- }
- catch (Exception e) {
- onError(e);
- }
+ join();
}
/** */
- private void pushRight(Row row) {
+ private void pushRight(Row row) throws Exception {
assert downstream() != null;
assert waitingRight > 0;
- try {
- checkState();
+ checkState();
- waitingRight--;
+ waitingRight--;
- rightMaterialized.add(row);
+ rightMaterialized.add(row);
- if (waitingRight == 0)
- rightSource().request(waitingRight = IN_BUFFER_SIZE);
- }
- catch (Exception e) {
- onError(e);
- }
+ if (waitingRight == 0)
+ rightSource().request(waitingRight = IN_BUFFER_SIZE);
}
/** */
- private void endLeft() {
+ private void endLeft() throws Exception {
assert downstream() != null;
assert waitingLeft > 0;
- try {
- checkState();
+ checkState();
- waitingLeft = NOT_WAITING;
+ waitingLeft = NOT_WAITING;
- join();
- }
- catch (Exception e) {
- onError(e);
- }
+ join();
}
/** */
- private void endRight() {
+ private void endRight() throws Exception {
assert downstream() != null;
assert waitingRight > 0;
- try {
- checkState();
+ checkState();
- waitingRight = NOT_WAITING;
+ waitingRight = NOT_WAITING;
- join();
- }
- catch (Exception e) {
- onError(e);
- }
+ join();
}
/** */
@@ -235,7 +205,7 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> {
}
/** */
- protected abstract void join() throws IgniteCheckedException;
+ protected abstract void join() throws Exception;
/** */
@NotNull public static <Row> NestedLoopJoinNode<Row> create(ExecutionContext<Row> ctx, RelDataType outputRowType, RelDataType leftRowType,
@@ -299,7 +269,7 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> {
}
/** */
- @Override protected void join() throws IgniteCheckedException {
+ @Override protected void join() throws Exception {
if (waitingRight == NOT_WAITING) {
inLoop = true;
try {
@@ -377,7 +347,7 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override protected void join() throws IgniteCheckedException {
+ @Override protected void join() throws Exception {
if (waitingRight == NOT_WAITING) {
inLoop = true;
try {
@@ -474,7 +444,7 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override protected void join() throws IgniteCheckedException {
+ @Override protected void join() throws Exception {
if (waitingRight == NOT_WAITING) {
if (rightNotMatchedIndexes == null) {
rightNotMatchedIndexes = new BitSet(rightMaterialized.size());
@@ -603,7 +573,7 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override protected void join() throws IgniteCheckedException {
+ @Override protected void join() throws Exception {
if (waitingRight == NOT_WAITING) {
if (rightNotMatchedIndexes == null) {
rightNotMatchedIndexes = new BitSet(rightMaterialized.size());
@@ -725,7 +695,7 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override protected void join() throws IgniteCheckedException {
+ @Override protected void join() throws Exception {
if (waitingRight == NOT_WAITING) {
while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
if (left == null)
@@ -793,7 +763,7 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override protected void join() throws IgniteCheckedException {
+ @Override protected void join() throws Exception {
if (waitingRight == NOT_WAITING) {
inLoop = true;
try {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
index b9833af..59daa66 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.exec.rel;
import java.util.List;
+
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
@@ -71,7 +72,7 @@ public interface Node<Row> extends AutoCloseable {
/**
* Requests next bunch of rows.
*/
- void request(int rowsCnt);
+ void request(int rowsCnt) throws Exception;
/**
* Rewinds upstream.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
index 3c44646..7b8c3da 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
+
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
@@ -100,17 +101,12 @@ public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Sing
* @param nodeId Target ID.
* @param batchId Batch ID.
*/
- public void onAcknowledge(UUID nodeId, int batchId) {
+ public void onAcknowledge(UUID nodeId, int batchId) throws Exception {
assert nodeBuffers.containsKey(nodeId);
- try {
- checkState();
+ checkState();
- nodeBuffers.get(nodeId).acknowledge(batchId);
- }
- catch (Exception e) {
- onError(e);
- }
+ nodeBuffers.get(nodeId).acknowledge(batchId);
}
/** */
@@ -120,8 +116,8 @@ public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Sing
flush();
}
- catch (Exception e) {
- onError(e);
+ catch (Throwable t) {
+ onError(t);
}
}
@@ -131,37 +127,27 @@ public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Sing
}
/** {@inheritDoc} */
- @Override public void push(Row row) {
+ @Override public void push(Row row) throws Exception {
assert waiting > 0;
- try {
- checkState();
+ checkState();
- waiting--;
+ waiting--;
- inBuf.add(row);
+ inBuf.add(row);
- flush();
- }
- catch (Exception e) {
- onError(e);
- }
+ flush();
}
/** {@inheritDoc} */
- @Override public void end() {
+ @Override public void end() throws Exception {
assert waiting > 0;
- try {
- checkState();
+ checkState();
- waiting = -1;
+ waiting = -1;
- flush();
- }
- catch (Exception e) {
- onError(e);
- }
+ flush();
}
/** {@inheritDoc} */
@@ -241,7 +227,7 @@ public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Sing
}
/** */
- private void flush() throws IgniteCheckedException {
+ private void flush() throws Exception {
while (!inBuf.isEmpty()) {
checkState();
@@ -273,7 +259,7 @@ public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Sing
/** */
public void onNodeLeft(UUID nodeId) {
if (nodeId.equals(context().originatingNodeId()))
- context().execute(this::close);
+ context().execute(this::close, this::onError);
}
/** */
@@ -347,7 +333,7 @@ public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Sing
*
* @param id batch ID.
*/
- private void acknowledge(int id) throws IgniteCheckedException {
+ private void acknowledge(int id) throws Exception {
if (lwm > id)
return;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
index 4da1284..6b98efc 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
@@ -45,46 +45,31 @@ public class ProjectNode<Row> extends AbstractNode<Row> implements SingleNode<Ro
}
/** {@inheritDoc} */
- @Override public void request(int rowsCnt) {
+ @Override public void request(int rowsCnt) throws Exception {
assert !F.isEmpty(sources()) && sources().size() == 1;
assert rowsCnt > 0;
- try {
- checkState();
+ checkState();
- source().request(rowsCnt);
- }
- catch (Exception e) {
- onError(e);
- }
+ source().request(rowsCnt);
}
/** {@inheritDoc} */
- @Override public void push(Row row) {
+ @Override public void push(Row row) throws Exception {
assert downstream() != null;
- try {
- checkState();
+ checkState();
- downstream().push(prj.apply(row));
- }
- catch (Throwable e) {
- onError(e);
- }
+ downstream().push(prj.apply(row));
}
/** {@inheritDoc} */
- @Override public void end() {
+ @Override public void end() throws Exception {
assert downstream() != null;
- try {
- checkState();
+ checkState();
- downstream().end();
- }
- catch (Exception e) {
- onError(e);
- }
+ downstream().end();
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
index cc57d00..a99693b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
@@ -121,11 +121,11 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
/** {@inheritDoc} */
@Override public void closeInternal() {
- context().execute(() -> sources().forEach(U::closeQuiet));
+ context().execute(() -> sources().forEach(U::closeQuiet), this::onError);
}
/** {@inheritDoc} */
- @Override public void push(Row row) {
+ @Override public void push(Row row) throws Exception {
assert waiting > 0;
lock.lock();
@@ -139,16 +139,13 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
if (inBuff.size() == IN_BUFFER_SIZE)
cond.signalAll();
}
- catch (Exception e) {
- onError(e);
- }
finally {
lock.unlock();
}
}
/** {@inheritDoc} */
- @Override public void end() {
+ @Override public void end() throws Exception {
assert waiting > 0;
lock.lock();
@@ -159,9 +156,6 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
cond.signalAll();
}
- catch (Exception e) {
- onError(e);
- }
finally {
lock.unlock();
}
@@ -240,7 +234,7 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
close();
else if (inBuff.isEmpty() && waiting == 0) {
int req = waiting = IN_BUFFER_SIZE;
- context().execute(() -> source().request(req));
+ context().execute(() -> source().request(req), this::onError);
}
if (!outBuff.isEmpty() || waiting == -1)
@@ -250,7 +244,7 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
}
}
catch (InterruptedException e) {
- onError(new IgniteInterruptedException(e));
+ throw new IgniteInterruptedException(e);
}
finally {
lock.unlock();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
index d83267b..c4805d6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
@@ -19,8 +19,8 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel;
import java.util.Iterator;
import java.util.List;
+
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
@@ -51,32 +51,22 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row>
}
/** {@inheritDoc} */
- @Override public void request(int rowsCnt) {
+ @Override public void request(int rowsCnt) throws Exception {
assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", requested=" + requested;
- try {
- checkState();
+ checkState();
- requested = rowsCnt;
+ requested = rowsCnt;
- if (!inLoop)
- context().execute(this::doPush);
- }
- catch (Exception e) {
- onError(e);
- }
+ if (!inLoop)
+ context().execute(this::doPush, this::onError);
}
/** */
- private void doPush() {
- try {
- checkState();
+ private void doPush() throws Exception {
+ checkState();
- push();
- }
- catch (Exception e) {
- onError(e);
- }
+ push();
}
/** {@inheritDoc} */
@@ -105,7 +95,7 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row>
}
/** */
- private void push() throws IgniteCheckedException {
+ private void push() throws Exception {
inLoop = true;
try {
if (it == null)
@@ -120,7 +110,7 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row>
if (++processed == IN_BUFFER_SIZE && requested > 0) {
// allow others to do their job
- context().execute(this::doPush);
+ context().execute(this::doPush, this::onError);
return;
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
index cb30293..9f7a31f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
@@ -18,8 +18,8 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel;
import java.util.Comparator;
import java.util.PriorityQueue;
+
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.util.typedef.F;
@@ -65,75 +65,50 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
}
/** {@inheritDoc} */
- @Override public void request(int rowsCnt) {
+ @Override public void request(int rowsCnt) throws Exception {
assert !F.isEmpty(sources()) && sources().size() == 1;
assert rowsCnt > 0 && requested == 0;
assert waiting <= 0;
- try {
- checkState();
-
- requested = rowsCnt;
+ checkState();
- if (waiting == 0)
- source().request(waiting = IN_BUFFER_SIZE);
- else if (!inLoop)
- context().execute(this::doFlush);
- }
- catch (Exception e) {
- onError(e);
- }
- }
+ requested = rowsCnt;
- /** */
- private void doFlush() {
- try {
- flush();
- }
- catch (Exception e) {
- onError(e);
- }
+ if (waiting == 0)
+ source().request(waiting = IN_BUFFER_SIZE);
+ else if (!inLoop)
+ context().execute(this::flush, this::onError);
}
/** {@inheritDoc} */
- @Override public void push(Row row) {
+ @Override public void push(Row row) throws Exception {
assert downstream() != null;
assert waiting > 0;
- try {
- checkState();
+ checkState();
- waiting--;
+ waiting--;
- rows.add(row);
+ rows.add(row);
- if (waiting == 0)
- source().request(waiting = IN_BUFFER_SIZE);
- }
- catch (Exception e) {
- onError(e);
- }
+ if (waiting == 0)
+ source().request(waiting = IN_BUFFER_SIZE);
}
/** {@inheritDoc} */
- @Override public void end() {
+ @Override public void end() throws Exception {
assert downstream() != null;
assert waiting > 0;
- try {
- checkState();
+ checkState();
- waiting = -1;
+ waiting = -1;
- flush();
- }
- catch (Exception e) {
- downstream().onError(e);
- }
+ flush();
}
/** */
- private void flush() throws IgniteCheckedException {
+ private void flush() throws Exception {
assert waiting == -1;
int processed = 0;
@@ -149,7 +124,7 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
if (++processed >= IN_BUFFER_SIZE && requested > 0) {
// allow others to do their job
- context().execute(this::doFlush);
+ context().execute(this::flush, this::onError);
return;
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
index d1ad00a..7da2e58 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
@@ -74,27 +74,22 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
}
/** {@inheritDoc} */
- @Override public void request(int rowsCnt) {
+ @Override public void request(int rowsCnt) throws Exception {
assert !F.isEmpty(sources()) && sources().size() == 1;
assert rowsCnt > 0;
- try {
- checkState();
+ checkState();
- requested += rowsCnt;
+ requested += rowsCnt;
- if ((waiting == -1 || rowIdx < rows.size()) && !inLoop)
- context().execute(this::doPush);
- else if (waiting == 0)
- source().request(waiting = IN_BUFFER_SIZE);
- }
- catch (Exception e) {
- onError(e);
- }
+ if ((waiting == -1 || rowIdx < rows.size()) && !inLoop)
+ context().execute(this::doPush, this::onError);
+ else if (waiting == 0)
+ source().request(waiting = IN_BUFFER_SIZE);
}
/** */
- private void doPush() {
+ private void doPush() throws Exception {
if (rowIdx >= rows.size() && waiting == -1 && requested > 0) {
downstream().end();
@@ -106,7 +101,7 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
}
/** */
- private void pushToDownstream() {
+ private void pushToDownstream() throws Exception {
inLoop = true;
downstream().push(rows.get(rowIdx));
@@ -121,43 +116,33 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
}
/** {@inheritDoc} */
- @Override public void push(Row row) {
+ @Override public void push(Row row) throws Exception {
assert downstream() != null;
assert waiting > 0;
- try {
- checkState();
+ checkState();
- waiting--;
+ waiting--;
- rows.add(row);
+ rows.add(row);
- if (waiting == 0)
- source().request(waiting = IN_BUFFER_SIZE);
+ if (waiting == 0)
+ source().request(waiting = IN_BUFFER_SIZE);
- if (requested > 0 && rowIdx < rows.size())
- pushToDownstream();
- }
- catch (Exception e) {
- onError(e);
- }
+ if (requested > 0 && rowIdx < rows.size())
+ pushToDownstream();
}
/** {@inheritDoc} */
- @Override public void end() {
+ @Override public void end() throws Exception {
assert downstream() != null;
assert waiting > 0;
- try {
- checkState();
+ checkState();
- waiting = -1;
+ waiting = -1;
- if (rowIdx >= rows.size() && requested > 0)
- downstream().end();
- }
- catch (Exception e) {
- downstream().onError(e);
- }
+ if (rowIdx >= rows.size() && requested > 0)
+ downstream().end();
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
index 385577b..c2669e1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
@@ -47,54 +47,39 @@ public class UnionAllNode<Row> extends AbstractNode<Row> implements Downstream<R
}
/** {@inheritDoc} */
- @Override public void request(int rowsCnt) {
+ @Override public void request(int rowsCnt) throws Exception {
assert !F.isEmpty(sources());
assert rowsCnt > 0 && waiting == 0;
- try {
- checkState();
+ checkState();
- source().request(waiting = rowsCnt);
- }
- catch (Exception e) {
- onError(e);
- }
+ source().request(waiting = rowsCnt);
}
/** {@inheritDoc} */
- @Override public void push(Row row) {
+ @Override public void push(Row row) throws Exception {
assert downstream() != null;
assert waiting > 0;
- try {
- checkState();
+ checkState();
- waiting--;
+ waiting--;
- downstream().push(row);
- }
- catch (Exception e) {
- onError(e);
- }
+ downstream().push(row);
}
/** {@inheritDoc} */
- @Override public void end() {
+ @Override public void end() throws Exception {
assert downstream() != null;
assert waiting > 0;
- try {
- checkState();
+ checkState();
- if (++curSrc < sources().size())
- source().request(waiting);
- else {
- waiting = -1;
- downstream().end();
- }
- }
- catch (Exception e) {
- onError(e);
+ if (++curSrc < sources().size())
+ source().request(waiting);
+ else {
+ waiting = -1;
+ downstream().end();
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
index 6d09c09..45b4385 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
@@ -167,6 +167,7 @@ public class ErrorMessage implements MarshalableMessage {
/** {@inheritDoc} */
@Override public void prepareUnmarshal(MarshallingContext ctx) throws IgniteCheckedException {
- err = ctx.unmarshal(errBytes);
+ if (errBytes != null)
+ err = ctx.unmarshal(errBytes);
}
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java
new file mode 100644
index 0000000..537d1f2
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.CorruptedTreeException;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
+import org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
+import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_EXPERIMENTAL_SQL_ENGINE;
+import static org.apache.ignite.internal.processors.query.calcite.exec.LogicalRelImplementor.CNLJ_SUPPORTS_ONLY_INNER_ASSERTION_MSG;
+
+/** */
+@WithSystemProperty(key = "calcite.debug", value = "false")
+@WithSystemProperty(key = IGNITE_EXPERIMENTAL_SQL_ENGINE, value = "true")
+public class CalciteErrorHandlilngIntegrationTest extends GridCommonAbstractTest {
+ /** */
+ @After
+ public void cleanUp() {
+ stopAllGrids();
+ }
+
+ /**
+ * Test verifies that AssertionError on fragment deserialization phase doesn't lead to execution freezing.
+ * <ol>
+ * <li>Start several nodes.</li>
+ * <li>Replace CommunicationSpi to one that modifies messages (replace join type inside a QueryStartRequest).</li>
+ * <li>Execute query that requires CNLJ.</li>
+ * <li>Verify that query failed with proper exception.</li>
+ * </ol>
+ */
+ @Test
+ public void assertionOnDeserialization() throws Exception {
+ Supplier<TcpCommunicationSpi> spiLsnrSupp = () -> new TcpCommunicationSpi() {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
+ if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() instanceof QueryStartRequest) {
+ QueryStartRequest req = (QueryStartRequest)((GridIoMessage)msg).message();
+
+ String root = GridTestUtils.getFieldValue(req, "root");
+
+ GridTestUtils.setFieldValue(req, "root",
+ root.replace("\"joinType\":\"inner\"", "\"joinType\":\"full\""));
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ };
+
+ startGrid(createConfiguration(1, false).setCommunicationSpi(spiLsnrSupp.get()));
+ startGrid(createConfiguration(2, false).setCommunicationSpi(spiLsnrSupp.get()));
+
+ IgniteEx client = startGrid(createConfiguration(0, true).setCommunicationSpi(spiLsnrSupp.get()));
+
+ sql(client, "create table test (id int primary key, val varchar)");
+
+ String sql = "select /*+ DISABLE_RULE('NestedLoopJoinConverter', 'MergeJoinConverter') */ t1.id from test t1, test t2 where t1.id = t2.id";
+
+ Throwable t = GridTestUtils.assertThrowsWithCause(() -> sql(client, sql), AssertionError.class);
+ assertEquals(CNLJ_SUPPORTS_ONLY_INNER_ASSERTION_MSG, t.getCause().getMessage());
+ }
+
+ /**
+ * Test verifies that a Exception during index look up doesn't lead to execution freezing.
+ * <ol>
+ * <li>Start several nodes.</li>
+ * <li>Inject tree's action wrapper that throws exception on demand.</li>
+ * <li>Execute query that do index look up.</li>
+ * <li>Verify that query failed with proper exception.</li>
+ * <li>Verify that FailureHandler was triggered.</li>
+ * </ol>
+ */
+ @Test
+ @SuppressWarnings("ThrowableNotThrown")
+ public void assertionOnTreeLookup() throws Exception {
+ AtomicBoolean shouldThrow = new AtomicBoolean();
+
+ BPlusTree.testHndWrapper = (tree, hnd) -> {
+ if (hnd instanceof BPlusTree.Search) {
+ PageHandler<Object, BPlusTree.Result> delegate = (PageHandler<Object, BPlusTree.Result>)hnd;
+
+ return new PageHandler<Object, BPlusTree.Result>() {
+ @Override public BPlusTree.Result run(
+ int cacheId,
+ long pageId,
+ long page,
+ long pageAddr,
+ PageIO io,
+ Boolean walPlc,
+ Object arg,
+ int intArg,
+ IoStatisticsHolder statHolder
+ ) throws IgniteCheckedException {
+ BPlusTree.Result res =
+ delegate.run(cacheId, pageId, page, pageAddr, io, walPlc, arg, intArg, statHolder);
+
+ if (shouldThrow.get() && tree instanceof H2Tree)
+ throw new RuntimeException("test exception");
+
+ return res;
+ }
+ };
+ }
+ else
+ return hnd;
+ };
+
+ try {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ FailureHandler failureHnd = (ignite, failureCtx) -> {
+ latch.countDown();
+
+ return false;
+ };
+
+ startGrid(createConfiguration(1, false).setFailureHandler(failureHnd));
+ startGrid(createConfiguration(2, false).setFailureHandler(failureHnd));
+
+ IgniteEx client = startGrid(createConfiguration(0, true));
+
+ sql(client, "create table test (id integer primary key, val varchar)");
+ sql(client, "create index test_id_idx on test (id)");
+
+ awaitPartitionMapExchange(true, true, null);
+
+ shouldThrow.set(true);
+
+ List<String> sqls = F.asList(
+ "select id from test where id > -10",
+ "select max(id) from test where id > -10"
+ );
+
+ for (String sql : sqls) {
+ GridTestUtils.assertThrowsWithCause(() -> sql(client, sql), CorruptedTreeException.class);
+
+ assertTrue("Failure handler was not invoked", latch.await(5, TimeUnit.SECONDS));
+ }
+ }
+ finally {
+ BPlusTree.testHndWrapper = null;
+ }
+ }
+
+ /** */
+ private List<List<?>> sql(IgniteEx ignite, String sql, Object... args) {
+ return ignite.context().query().querySqlFields(
+ new SqlFieldsQuery(sql).setSchema("PUBLIC").setArgs(args), false).getAll();
+ }
+
+ /** */
+ private IgniteConfiguration createConfiguration(int id, boolean client) throws Exception {
+ return getConfiguration(client ? "client" : "server-" + id).setClientMode(client);
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
index b9b7516..e93bd56 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
@@ -155,7 +155,7 @@ public class ContinuousExecutionTest extends AbstractExecutionTest {
outbox.register(filter);
registry.register(outbox);
- outbox.context().execute(outbox::init);
+ outbox.context().execute(outbox::init, outbox::onError);
}
UUID locNodeId = nodes.get(0);
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
index 729b79b..4ba60cb 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
@@ -33,6 +33,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
@@ -1047,6 +1048,40 @@ public class ExecutionTest extends AbstractExecutionTest {
}
/**
+ * Test verifies that an AssertionError thrown from an execution node
+ * proprely handled by a task executor.
+ */
+ @Test
+ @SuppressWarnings("ThrowableNotThrown")
+ public void assertionHandlingTest() {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class);
+
+ CorruptedNode<Object[]> node = new CorruptedNode<>();
+
+ RootNode<Object[]> root = new RootNode<>(ctx, rowType);
+ root.register(node);
+
+ Thread watchDog = new Thread(() -> {
+ try {
+ U.sleep(5_000);
+ }
+ catch (IgniteInterruptedCheckedException ignored) {
+ }
+
+ if (!root.isClosed())
+ root.close();
+ }, "test-watchdog");
+
+ watchDog.start();
+
+ GridTestUtils.assertThrowsWithCause(root::hasNext, AssertionError.class);
+
+ watchDog.interrupt();
+ }
+
+ /**
*
*/
private Object[] row(Object... fields) {
@@ -1082,4 +1117,54 @@ public class ExecutionTest extends AbstractExecutionTest {
}
};
}
+
+ /**
+ * Node that always throws assertion error except for {@link #close()}
+ * and {@link #onRegister(Downstream)} methods.
+ */
+ static class CorruptedNode<T> implements Node<T> {
+ /** {@inheritDoc} */
+ @Override public ExecutionContext<T> context() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelDataType rowType() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Downstream<T> downstream() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void register(List<Node<T>> sources) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Node<T>> sources() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onRegister(Downstream<T> downstream) {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void request(int rowsCnt) throws Exception {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rewind() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ }
+ }
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index 8f8a74e..f20deb5 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -1366,7 +1366,9 @@ public class PlannerTest extends AbstractPlannerTest {
assert exec instanceof Outbox;
- exec.context().execute(((Outbox<Object[]>) exec)::init);
+ Outbox<Object[]> outbox = (Outbox<Object[]>) exec;
+
+ exec.context().execute(outbox::init, outbox::onError);
ArrayList<Object[]> res = new ArrayList<>();
@@ -1625,7 +1627,9 @@ public class PlannerTest extends AbstractPlannerTest {
assert exec instanceof Outbox;
- exec.context().execute(((Outbox<Object[]>) exec)::init);
+ Outbox<Object[]> outbox = (Outbox<Object[]>) exec;
+
+ exec.context().execute(outbox::init, outbox::onError);
ArrayList<Object[]> res = new ArrayList<>();
@@ -2650,7 +2654,7 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(phys);
assertEquals("" +
- "IgniteMergeJoin(condition=[AND(=($0, $4), =($3, $1))], joinType=[inner])\n" +
+ "IgniteMergeJoin(condition=[AND(=($0, $4), =($3, $1))], joinType=[inner], leftCollation=[[0, 1]], rightCollation=[[2, 1]])\n" +
" IgniteIndexScan(table=[[PUBLIC, DEPT]], index=[dep_idx])\n" +
" IgniteIndexScan(table=[[PUBLIC, EMP]], index=[emp_idx])\n",
RelOptUtil.toString(phys));
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index 32197f2..c5a9b63 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -18,6 +18,7 @@
package org.apache.ignite.testsuites;
import org.apache.ignite.internal.processors.query.calcite.CalciteBasicSecondaryIndexIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.CalciteErrorHandlilngIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
import org.apache.ignite.internal.processors.query.calcite.CancelTest;
import org.apache.ignite.internal.processors.query.calcite.DateTimeTest;
@@ -44,6 +45,7 @@ import org.junit.runners.Suite;
ClosableIteratorsHolderTest.class,
ContinuousExecutionTest.class,
CalciteQueryProcessorTest.class,
+ CalciteErrorHandlilngIntegrationTest.class,
JdbcQueryTest.class,
CalciteBasicSecondaryIndexIntegrationTest.class,
CancelTest.class,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 0e1f602..49ab287 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -247,7 +247,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/** h2 redirection stub. */
public static final Pattern H2_REDIRECTION_RULES =
- Pattern.compile("\\s*(create\\s*table|drop\\s*table|alter\\s*table)", CASE_INSENSITIVE);
+ Pattern.compile("\\s*((create|drop)\\s*(table|index)|alter\\s*table)", CASE_INSENSITIVE);
/** @see IgniteSystemProperties#IGNITE_EXPERIMENTAL_SQL_ENGINE */
public static final boolean DFLT_IGNITE_EXPERIMENTAL_SQL_ENGINE = false;
diff --git a/modules/core/src/test/config/log4j-test.xml b/modules/core/src/test/config/log4j-test.xml
index e82c103..91a8919 100755
--- a/modules/core/src/test/config/log4j-test.xml
+++ b/modules/core/src/test/config/log4j-test.xml
@@ -133,6 +133,10 @@
-->
<!-- Disable all open source debugging. -->
+<!-- <category name="org.apache.calcite">-->
+<!-- <level value="DEBUG"/>-->
+<!-- </category>-->
+
<category name="org">
<level value="INFO"/>
</category>