You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/03/04 09:50:53 UTC

[ignite] branch sql-calcite updated: IGNITE-14229 Calcite. Fix exceptions handling

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new 5b3e8c03 IGNITE-14229 Calcite. Fix exceptions handling
5b3e8c03 is described below

commit 5b3e8c03767321659aa84ff99ec31a4411630047
Author: tledkov <tl...@gridgain.com>
AuthorDate: Thu Mar 4 12:50:37 2021 +0300

    IGNITE-14229 Calcite. Fix exceptions handling
---
 .../query/calcite/exec/ExchangeServiceImpl.java    | 13 ++--
 .../query/calcite/exec/ExecutionContext.java       |  7 +-
 .../query/calcite/exec/ExecutionServiceImpl.java   | 79 +++++++++++++---------
 .../query/calcite/exec/QueryTaskExecutorImpl.java  | 34 +++++-----
 .../query/calcite/externalize/RelJson.java         |  8 ++-
 .../query/calcite/message/MessageServiceImpl.java  |  6 +-
 .../CalciteErrorHandlilngIntegrationTest.java      | 40 +++++++++++
 .../query/calcite/planner/PlannerTest.java         | 39 +++++++++++
 8 files changed, 170 insertions(+), 56 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index 2da1b9d..c795694 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -23,6 +23,7 @@ import java.util.Objects;
 import java.util.UUID;
 import com.google.common.collect.ImmutableMap;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
@@ -197,8 +198,10 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
             try {
                 outbox.onAcknowledge(nodeId, msg.batchId());
             }
-            catch (Throwable t) {
-                outbox.onError(t);
+            catch (Throwable e) {
+                outbox.onError(e);
+
+                throw new IgniteException("Unexpected exception", e);
             }
         }
         else if (log.isDebugEnabled()) {
@@ -228,8 +231,10 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
             try {
                 inbox.onBatchReceived(nodeId, msg.batchId(), msg.last(), Commons.cast(msg.rows()));
             }
-            catch (Throwable t) {
-                inbox.onError(t);
+            catch (Throwable e) {
+                inbox.onError(e);
+
+                throw new IgniteException("Unexpected exception", e);
             }
         }
         else if (log.isDebugEnabled()) {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 23309f0..6d65beb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -27,6 +27,7 @@ import java.util.function.Consumer;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.linq4j.QueryProvider;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactoryImpl;
@@ -233,8 +234,10 @@ public class ExecutionContext<Row> implements DataContext {
             try {
                 task.run();
             }
-            catch (Throwable t) {
-                onError.accept(t);
+            catch (Throwable e) {
+                onError.accept(e);
+
+                throw new IgniteException("Unexpected exception", e);
             }
         });
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index ef5dfb3..354b926 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -50,6 +50,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.ValidationException;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.events.EventType;
@@ -113,6 +114,7 @@ import org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryC
 import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -761,43 +763,21 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
         long frId = fragmentDesc.fragmentId();
         UUID origNodeId = pctx.originatingNodeId();
 
-        Outbox<Row> node;
-        try {
-            node = new LogicalRelImplementor<>(
+        Outbox<Row> node = new LogicalRelImplementor<>(
                 ectx,
                 partitionService(),
                 mailboxRegistry(),
                 exchangeService(),
                 failureProcessor())
                 .go(plan.root());
-        }
-        catch (Throwable ex) {
-            U.error(log, "Failed to build execution tree. ", ex);
-
-            mailboxRegistry.outboxes(qryId, frId, -1)
-                .forEach(Outbox::close);
-            mailboxRegistry.inboxes(qryId, frId, -1)
-                .forEach(Inbox::close);
-
-            try {
-                messageService().send(origNodeId, new QueryStartResponse(qryId, frId, ex));
-            }
-            catch (IgniteCheckedException e) {
-                U.warn(log, "Failed to send reply. [nodeId=" + origNodeId + ']', e);
-            }
-
-            return;
-        }
 
         try {
             messageService().send(origNodeId, new QueryStartResponse(qryId, frId));
         }
         catch (IgniteCheckedException e) {
-            U.warn(log, "Failed to send reply. [nodeId=" + origNodeId + ']', e);
+            IgniteException wrpEx = new IgniteException("Failed to send reply. [nodeId=" + origNodeId + ']', e);
 
-            node.onNodeLeft(origNodeId);
-
-            return;
+            throw wrpEx;
         }
 
         node.init();
@@ -842,15 +822,44 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
     private void onMessage(UUID nodeId, QueryStartRequest msg) {
         assert nodeId != null && msg != null;
 
-        PlanningContext pctx = createContext(Contexts.empty(), msg.topologyVersion(), nodeId, msg.schema(), msg.root(), msg.parameters());
+        try {
+            PlanningContext pctx = createContext(Contexts.empty(), msg.topologyVersion(), nodeId, msg.schema(), msg.root(), msg.parameters());
 
-        List<QueryPlan> qryPlans = queryPlanCache().queryPlan(pctx, new CacheKey(pctx.schemaName(), pctx.query()), this::prepareFragment);
+            List<QueryPlan> qryPlans = queryPlanCache().queryPlan(
+                pctx,
+                new CacheKey(pctx.schemaName(), pctx.query()),
+                this::prepareFragment
+            );
 
-        assert qryPlans.size() == 1 && qryPlans.get(0).type() == QueryPlan.Type.FRAGMENT;
+            assert qryPlans.size() == 1 && qryPlans.get(0).type() == QueryPlan.Type.FRAGMENT;
 
-        FragmentPlan plan = (FragmentPlan)qryPlans.get(0);
+            FragmentPlan plan = (FragmentPlan)qryPlans.get(0);
 
-        executeFragment(msg.queryId(), plan, pctx, msg.fragmentDescription());
+            executeFragment(msg.queryId(), plan, pctx, msg.fragmentDescription());
+        }
+        catch (Throwable ex) {
+            U.error(log, "Failed to start query fragment ", ex);
+
+            mailboxRegistry.outboxes(msg.queryId(), msg.fragmentId(), -1)
+                .forEach(Outbox::close);
+            mailboxRegistry.inboxes(msg.queryId(), msg.fragmentId(), -1)
+                .forEach(Inbox::close);
+
+            try {
+                messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentId(), ex));
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Error occurred during send error message: " + X.getFullStackTrace(e));
+
+                IgniteException wrpEx = new IgniteException("Error occurred during send error message", e);
+
+                e.addSuppressed(ex);
+
+                throw wrpEx;
+            }
+
+            throw ex;
+        }
     }
 
     /** */
@@ -1007,14 +1016,22 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
                 running.remove(ctx.queryId());
 
                 // 4) close remote fragments
+                IgniteException wrpEx = null;
+
                 for (UUID nodeId : remotes) {
                     try {
                         exchangeService().closeOutbox(nodeId, ctx.queryId(), -1, -1);
                     }
                     catch (IgniteCheckedException e) {
-                        U.warn(log, "Failed to send cancel message. [nodeId=" + nodeId + ']', e);
+                        if (wrpEx == null)
+                            wrpEx = new IgniteException("Failed to send cancel message. [nodeId=" + nodeId + ']', e);
+                        else
+                            wrpEx.addSuppressed(e);
                     }
                 }
+
+                if (wrpEx != null)
+                    throw wrpEx;
             }
         }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
index ff8263d..e9bcab2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
@@ -41,9 +41,6 @@ public class QueryTaskExecutorImpl extends AbstractService implements QueryTaskE
     private IgniteStripedThreadPoolExecutor stripedThreadPoolExecutor;
 
     /** */
-    private FailureProcessor failureProcessor;
-
-    /** */
     private Thread.UncaughtExceptionHandler eHnd;
 
     /** */
@@ -65,16 +62,26 @@ public class QueryTaskExecutorImpl extends AbstractService implements QueryTaskE
         this.eHnd = eHnd;
     }
 
-    /**
-     * @param failureProcessor Failure processor.
-     */
-    public void failureProcessor(FailureProcessor failureProcessor) {
-        this.failureProcessor = failureProcessor;
-    }
-
     /** {@inheritDoc} */
     @Override public void execute(UUID qryId, long fragmentId, Runnable qryTask) {
-        stripedThreadPoolExecutor.execute(qryTask, hash(qryId, fragmentId));
+        stripedThreadPoolExecutor.execute(
+            () -> {
+                try {
+                    qryTask.run();
+                }
+                catch (Throwable e) {
+                    U.warn(log, "Uncaught exception", e);
+
+                    /*
+                     * No exceptions are rethrown here to preserve the current thread from being destroyed,
+                     * because other queries may be pinned to the current thread id.
+                     * However, unrecoverable errors must be processed by FailureHandler.
+                     */
+                    uncaughtException(Thread.currentThread(), e);
+                }
+            },
+            hash(qryId, fragmentId)
+        );
     }
 
     /** {@inheritDoc} */
@@ -83,8 +90,6 @@ public class QueryTaskExecutorImpl extends AbstractService implements QueryTaskE
 
         CalciteQueryProcessor proc = Objects.requireNonNull(Commons.lookupComponent(ctx, CalciteQueryProcessor.class));
 
-        failureProcessor(proc.failureProcessor());
-
         stripedThreadPoolExecutor(new IgniteStripedThreadPoolExecutor(
             ctx.config().getQueryThreadPoolSize(),
             ctx.igniteInstanceName(),
@@ -102,9 +107,6 @@ public class QueryTaskExecutorImpl extends AbstractService implements QueryTaskE
 
     /** {@inheritDoc} */
     @Override public void uncaughtException(Thread t, Throwable e) {
-        if (failureProcessor != null)
-            failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, e));
-
         if (eHnd != null)
             eHnd.uncaughtException(t, e);
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
index 1751154..cf01709 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
@@ -96,6 +96,7 @@ import org.apache.calcite.sql.validate.SqlNameMatchers;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
@@ -520,12 +521,15 @@ class RelJson {
         SqlKind sqlKind = toEnum(map.get("kind"));
         SqlSyntax sqlSyntax = toEnum(map.get("syntax"));
         List<SqlOperator> operators = new ArrayList<>();
-        SqlStdOperatorTable.instance().lookupOperatorOverloads(
+
+        CalciteQueryProcessor.FRAMEWORK_CONFIG.getOperatorTable().lookupOperatorOverloads(
             new SqlIdentifier(name, new SqlParserPos(0, 0)),
             null,
             sqlSyntax,
             operators,
-            SqlNameMatchers.liberal());
+            SqlNameMatchers.liberal()
+        );
+
         for (SqlOperator operator : operators)
             if (operator.kind == sqlKind)
                 return operator;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
index c3b8cc2..2631bf9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
@@ -256,7 +256,11 @@ public class MessageServiceImpl extends AbstractService implements MessageServic
             taskExecutor().execute(msg0.queryId(), msg0.fragmentId(), () -> onMessageInternal(nodeId, msg));
         }
         else if (async)
-            taskExecutor().execute(IgniteUuid.VM_ID, ThreadLocalRandom.current().nextLong(1024), () -> onMessageInternal(nodeId, msg));
+            taskExecutor().execute(
+                IgniteUuid.VM_ID,
+                ThreadLocalRandom.current().nextLong(1024),
+                () -> onMessageInternal(nodeId, msg)
+            );
         else
             onMessageInternal(nodeId, msg);
     }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java
index 537d1f2..43733f8 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java
@@ -102,6 +102,45 @@ public class CalciteErrorHandlilngIntegrationTest extends GridCommonAbstractTest
     }
 
     /**
+     * Test verifies that exception on fragment deserialization phase doesn't lead to execution freezing.
+     * <ol>
+     *     <li>Start several nodes.</li>
+     *     <li>Replace CommunicationSpi to one that modifies messages (invalid fragment JSON).</li>
+     *     <li>Execute query.</li>
+     *     <li>Verify that query failed with proper exception.</li>
+     * </ol>
+     */
+    @Test
+    public void assertionOnDeserializationInvalidFragment() throws Exception {
+        Supplier<TcpCommunicationSpi> spiLsnrSupp = () -> new TcpCommunicationSpi() {
+            /** {@inheritDoc} */
+            @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
+                if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() instanceof QueryStartRequest) {
+                    QueryStartRequest req = (QueryStartRequest)((GridIoMessage)msg).message();
+
+                    String root = GridTestUtils.getFieldValue(req, "root");
+
+                    GridTestUtils.setFieldValue(req, "root",
+                        root.replace("\"table\"", "\"invalidTag\""));
+                }
+
+                super.sendMessage(node, msg, ackC);
+            }
+        };
+
+        startGrid(createConfiguration(1, false).setCommunicationSpi(spiLsnrSupp.get()));
+        startGrid(createConfiguration(2, false).setCommunicationSpi(spiLsnrSupp.get()));
+
+        IgniteEx client = startGrid(createConfiguration(0, true).setCommunicationSpi(spiLsnrSupp.get()));
+
+        sql(client, "create table test (id int primary key, val varchar)");
+
+        String sql = "select id from test";
+
+        GridTestUtils.assertThrowsWithCause(() -> sql(client, sql), NullPointerException.class);
+    }
+
+    /**
      * Test verifies that a Exception during index look up doesn't lead to execution freezing.
      * <ol>
      *     <li>Start several nodes.</li>
@@ -162,6 +201,7 @@ public class CalciteErrorHandlilngIntegrationTest extends GridCommonAbstractTest
 
             sql(client, "create table test (id integer primary key, val varchar)");
             sql(client, "create index test_id_idx on test (id)");
+            sql(client, "insert into test values (0, 'val_0');");
 
             awaitPartitionMapExchange(true, true, null);
 
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index f20deb5..a14060a 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -41,6 +41,9 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.RelVisitor;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.util.ImmutableBitSet;
@@ -71,6 +74,7 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate
 import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
@@ -2784,4 +2788,39 @@ public class PlannerTest extends AbstractPlannerTest {
             checkSplitAndSerialization(phys, publicSchema);
         }
     }
+
+    /** */
+    @Test
+    public void testNotStandardFunctions() throws Exception {
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        publicSchema.addTable(
+            "TEST",
+            new TestTable(
+                new RelDataTypeFactory.Builder(f)
+                    .add("ID", f.createJavaType(Integer.class))
+                    .add("VAL", f.createJavaType(String.class))
+                    .build()) {
+
+                @Override public IgniteDistribution distribution() {
+                    return IgniteDistributions.affinity(0, "TEST", "hash");
+                }
+            }
+        );
+
+        String queries[] = {
+            "select REVERSE(val) from TEST", // MYSQL
+            "select TO_DATE(val, 'yyyymmdd') from TEST" // ORACLE
+        };
+
+        for (String sql : queries) {
+            IgniteRel phys = physicalPlan(
+                sql,
+                publicSchema
+            );
+
+            checkSplitAndSerialization(phys, publicSchema);
+        }
+    }
 }