You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/03/04 09:50:53 UTC
[ignite] branch sql-calcite updated: IGNITE-14229 Calcite. Fix
exceptions handling
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new 5b3e8c03 IGNITE-14229 Calcite. Fix exceptions handling
5b3e8c03 is described below
commit 5b3e8c03767321659aa84ff99ec31a4411630047
Author: tledkov <tl...@gridgain.com>
AuthorDate: Thu Mar 4 12:50:37 2021 +0300
IGNITE-14229 Calcite. Fix exceptions handling
---
.../query/calcite/exec/ExchangeServiceImpl.java | 13 ++--
.../query/calcite/exec/ExecutionContext.java | 7 +-
.../query/calcite/exec/ExecutionServiceImpl.java | 79 +++++++++++++---------
.../query/calcite/exec/QueryTaskExecutorImpl.java | 34 +++++-----
.../query/calcite/externalize/RelJson.java | 8 ++-
.../query/calcite/message/MessageServiceImpl.java | 6 +-
.../CalciteErrorHandlilngIntegrationTest.java | 40 +++++++++++
.../query/calcite/planner/PlannerTest.java | 39 +++++++++++
8 files changed, 170 insertions(+), 56 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index 2da1b9d..c795694 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -23,6 +23,7 @@ import java.util.Objects;
import java.util.UUID;
import com.google.common.collect.ImmutableMap;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
@@ -197,8 +198,10 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
try {
outbox.onAcknowledge(nodeId, msg.batchId());
}
- catch (Throwable t) {
- outbox.onError(t);
+ catch (Throwable e) {
+ outbox.onError(e);
+
+ throw new IgniteException("Unexpected exception", e);
}
}
else if (log.isDebugEnabled()) {
@@ -228,8 +231,10 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
try {
inbox.onBatchReceived(nodeId, msg.batchId(), msg.last(), Commons.cast(msg.rows()));
}
- catch (Throwable t) {
- inbox.onError(t);
+ catch (Throwable e) {
+ inbox.onError(e);
+
+ throw new IgniteException("Unexpected exception", e);
}
}
else if (log.isDebugEnabled()) {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 23309f0..6d65beb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -27,6 +27,7 @@ import java.util.function.Consumer;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactoryImpl;
@@ -233,8 +234,10 @@ public class ExecutionContext<Row> implements DataContext {
try {
task.run();
}
- catch (Throwable t) {
- onError.accept(t);
+ catch (Throwable e) {
+ onError.accept(e);
+
+ throw new IgniteException("Unexpected exception", e);
}
});
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index ef5dfb3..354b926 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -50,6 +50,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.ValidationException;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.events.EventType;
@@ -113,6 +114,7 @@ import org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryC
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -761,43 +763,21 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
long frId = fragmentDesc.fragmentId();
UUID origNodeId = pctx.originatingNodeId();
- Outbox<Row> node;
- try {
- node = new LogicalRelImplementor<>(
+ Outbox<Row> node = new LogicalRelImplementor<>(
ectx,
partitionService(),
mailboxRegistry(),
exchangeService(),
failureProcessor())
.go(plan.root());
- }
- catch (Throwable ex) {
- U.error(log, "Failed to build execution tree. ", ex);
-
- mailboxRegistry.outboxes(qryId, frId, -1)
- .forEach(Outbox::close);
- mailboxRegistry.inboxes(qryId, frId, -1)
- .forEach(Inbox::close);
-
- try {
- messageService().send(origNodeId, new QueryStartResponse(qryId, frId, ex));
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to send reply. [nodeId=" + origNodeId + ']', e);
- }
-
- return;
- }
try {
messageService().send(origNodeId, new QueryStartResponse(qryId, frId));
}
catch (IgniteCheckedException e) {
- U.warn(log, "Failed to send reply. [nodeId=" + origNodeId + ']', e);
+ IgniteException wrpEx = new IgniteException("Failed to send reply. [nodeId=" + origNodeId + ']', e);
- node.onNodeLeft(origNodeId);
-
- return;
+ throw wrpEx;
}
node.init();
@@ -842,15 +822,44 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
private void onMessage(UUID nodeId, QueryStartRequest msg) {
assert nodeId != null && msg != null;
- PlanningContext pctx = createContext(Contexts.empty(), msg.topologyVersion(), nodeId, msg.schema(), msg.root(), msg.parameters());
+ try {
+ PlanningContext pctx = createContext(Contexts.empty(), msg.topologyVersion(), nodeId, msg.schema(), msg.root(), msg.parameters());
- List<QueryPlan> qryPlans = queryPlanCache().queryPlan(pctx, new CacheKey(pctx.schemaName(), pctx.query()), this::prepareFragment);
+ List<QueryPlan> qryPlans = queryPlanCache().queryPlan(
+ pctx,
+ new CacheKey(pctx.schemaName(), pctx.query()),
+ this::prepareFragment
+ );
- assert qryPlans.size() == 1 && qryPlans.get(0).type() == QueryPlan.Type.FRAGMENT;
+ assert qryPlans.size() == 1 && qryPlans.get(0).type() == QueryPlan.Type.FRAGMENT;
- FragmentPlan plan = (FragmentPlan)qryPlans.get(0);
+ FragmentPlan plan = (FragmentPlan)qryPlans.get(0);
- executeFragment(msg.queryId(), plan, pctx, msg.fragmentDescription());
+ executeFragment(msg.queryId(), plan, pctx, msg.fragmentDescription());
+ }
+ catch (Throwable ex) {
+ U.error(log, "Failed to start query fragment ", ex);
+
+ mailboxRegistry.outboxes(msg.queryId(), msg.fragmentId(), -1)
+ .forEach(Outbox::close);
+ mailboxRegistry.inboxes(msg.queryId(), msg.fragmentId(), -1)
+ .forEach(Inbox::close);
+
+ try {
+ messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentId(), ex));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Error occurred during send error message: " + X.getFullStackTrace(e));
+
+ IgniteException wrpEx = new IgniteException("Error occurred during send error message", e);
+
+ e.addSuppressed(ex);
+
+ throw wrpEx;
+ }
+
+ throw ex;
+ }
}
/** */
@@ -1007,14 +1016,22 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
running.remove(ctx.queryId());
// 4) close remote fragments
+ IgniteException wrpEx = null;
+
for (UUID nodeId : remotes) {
try {
exchangeService().closeOutbox(nodeId, ctx.queryId(), -1, -1);
}
catch (IgniteCheckedException e) {
- U.warn(log, "Failed to send cancel message. [nodeId=" + nodeId + ']', e);
+ if (wrpEx == null)
+ wrpEx = new IgniteException("Failed to send cancel message. [nodeId=" + nodeId + ']', e);
+ else
+ wrpEx.addSuppressed(e);
}
}
+
+ if (wrpEx != null)
+ throw wrpEx;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
index ff8263d..e9bcab2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
@@ -41,9 +41,6 @@ public class QueryTaskExecutorImpl extends AbstractService implements QueryTaskE
private IgniteStripedThreadPoolExecutor stripedThreadPoolExecutor;
/** */
- private FailureProcessor failureProcessor;
-
- /** */
private Thread.UncaughtExceptionHandler eHnd;
/** */
@@ -65,16 +62,26 @@ public class QueryTaskExecutorImpl extends AbstractService implements QueryTaskE
this.eHnd = eHnd;
}
- /**
- * @param failureProcessor Failure processor.
- */
- public void failureProcessor(FailureProcessor failureProcessor) {
- this.failureProcessor = failureProcessor;
- }
-
/** {@inheritDoc} */
@Override public void execute(UUID qryId, long fragmentId, Runnable qryTask) {
- stripedThreadPoolExecutor.execute(qryTask, hash(qryId, fragmentId));
+ stripedThreadPoolExecutor.execute(
+ () -> {
+ try {
+ qryTask.run();
+ }
+ catch (Throwable e) {
+ U.warn(log, "Uncaught exception", e);
+
+ /*
+ * No exceptions are rethrown here to preserve the current thread from being destroyed,
+ * because other queries may be pinned to the current thread id.
+ * However, unrecoverable errors must be processed by FailureHandler.
+ */
+ uncaughtException(Thread.currentThread(), e);
+ }
+ },
+ hash(qryId, fragmentId)
+ );
}
/** {@inheritDoc} */
@@ -83,8 +90,6 @@ public class QueryTaskExecutorImpl extends AbstractService implements QueryTaskE
CalciteQueryProcessor proc = Objects.requireNonNull(Commons.lookupComponent(ctx, CalciteQueryProcessor.class));
- failureProcessor(proc.failureProcessor());
-
stripedThreadPoolExecutor(new IgniteStripedThreadPoolExecutor(
ctx.config().getQueryThreadPoolSize(),
ctx.igniteInstanceName(),
@@ -102,9 +107,6 @@ public class QueryTaskExecutorImpl extends AbstractService implements QueryTaskE
/** {@inheritDoc} */
@Override public void uncaughtException(Thread t, Throwable e) {
- if (failureProcessor != null)
- failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, e));
-
if (eHnd != null)
eHnd.uncaughtException(t, e);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
index 1751154..cf01709 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
@@ -96,6 +96,7 @@ import org.apache.calcite.sql.validate.SqlNameMatchers;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Util;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
@@ -520,12 +521,15 @@ class RelJson {
SqlKind sqlKind = toEnum(map.get("kind"));
SqlSyntax sqlSyntax = toEnum(map.get("syntax"));
List<SqlOperator> operators = new ArrayList<>();
- SqlStdOperatorTable.instance().lookupOperatorOverloads(
+
+ CalciteQueryProcessor.FRAMEWORK_CONFIG.getOperatorTable().lookupOperatorOverloads(
new SqlIdentifier(name, new SqlParserPos(0, 0)),
null,
sqlSyntax,
operators,
- SqlNameMatchers.liberal());
+ SqlNameMatchers.liberal()
+ );
+
for (SqlOperator operator : operators)
if (operator.kind == sqlKind)
return operator;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
index c3b8cc2..2631bf9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
@@ -256,7 +256,11 @@ public class MessageServiceImpl extends AbstractService implements MessageServic
taskExecutor().execute(msg0.queryId(), msg0.fragmentId(), () -> onMessageInternal(nodeId, msg));
}
else if (async)
- taskExecutor().execute(IgniteUuid.VM_ID, ThreadLocalRandom.current().nextLong(1024), () -> onMessageInternal(nodeId, msg));
+ taskExecutor().execute(
+ IgniteUuid.VM_ID,
+ ThreadLocalRandom.current().nextLong(1024),
+ () -> onMessageInternal(nodeId, msg)
+ );
else
onMessageInternal(nodeId, msg);
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java
index 537d1f2..43733f8 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java
@@ -102,6 +102,45 @@ public class CalciteErrorHandlilngIntegrationTest extends GridCommonAbstractTest
}
/**
+ * Test verifies that exception on fragment deserialization phase doesn't lead to execution freezing.
+ * <ol>
+ * <li>Start several nodes.</li>
+ * <li>Replace CommunicationSpi to one that modifies messages (invalid fragment JSON).</li>
+ * <li>Execute query.</li>
+ * <li>Verify that query failed with proper exception.</li>
+ * </ol>
+ */
+ @Test
+ public void assertionOnDeserializationInvalidFragment() throws Exception {
+ Supplier<TcpCommunicationSpi> spiLsnrSupp = () -> new TcpCommunicationSpi() {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
+ if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() instanceof QueryStartRequest) {
+ QueryStartRequest req = (QueryStartRequest)((GridIoMessage)msg).message();
+
+ String root = GridTestUtils.getFieldValue(req, "root");
+
+ GridTestUtils.setFieldValue(req, "root",
+ root.replace("\"table\"", "\"invalidTag\""));
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ };
+
+ startGrid(createConfiguration(1, false).setCommunicationSpi(spiLsnrSupp.get()));
+ startGrid(createConfiguration(2, false).setCommunicationSpi(spiLsnrSupp.get()));
+
+ IgniteEx client = startGrid(createConfiguration(0, true).setCommunicationSpi(spiLsnrSupp.get()));
+
+ sql(client, "create table test (id int primary key, val varchar)");
+
+ String sql = "select id from test";
+
+ GridTestUtils.assertThrowsWithCause(() -> sql(client, sql), NullPointerException.class);
+ }
+
+ /**
* Test verifies that a Exception during index look up doesn't lead to execution freezing.
* <ol>
* <li>Start several nodes.</li>
@@ -162,6 +201,7 @@ public class CalciteErrorHandlilngIntegrationTest extends GridCommonAbstractTest
sql(client, "create table test (id integer primary key, val varchar)");
sql(client, "create index test_id_idx on test (id)");
+ sql(client, "insert into test values (0, 'val_0');");
awaitPartitionMapExchange(true, true, null);
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index f20deb5..a14060a 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -41,6 +41,9 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.RelVisitor;
import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.util.ImmutableBitSet;
@@ -71,6 +74,7 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate
import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
@@ -2784,4 +2788,39 @@ public class PlannerTest extends AbstractPlannerTest {
checkSplitAndSerialization(phys, publicSchema);
}
}
+
+ /** */
+ @Test
+ public void testNotStandardFunctions() throws Exception {
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ publicSchema.addTable(
+ "TEST",
+ new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "TEST", "hash");
+ }
+ }
+ );
+
+ String queries[] = {
+ "select REVERSE(val) from TEST", // MYSQL
+ "select TO_DATE(val, 'yyyymmdd') from TEST" // ORACLE
+ };
+
+ for (String sql : queries) {
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema
+ );
+
+ checkSplitAndSerialization(phys, publicSchema);
+ }
+ }
}