You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/10/27 14:38:50 UTC

[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9476: IGNITE-12991 Calcite integration. Introduce running query registry & cancellation refactoring

alex-plekhanov commented on a change in pull request #9476:
URL: https://github.com/apache/ignite/pull/9476#discussion_r737200735



##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ *
+ */
+public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest {
+    /** */
+    private static IgniteEx srv;
+
+    /** Timeout in ms for async operations. */
+    private static final long TIMEOUT_IN_MS = 10_000;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        srv = grid(0);
+    }
+
+    /** */
+    @Test
+    public void testCancelAtPlanningPhase() throws IgniteCheckedException {
+        QueryEngine engine = queryProcessor(client);
+        int cnt = 9;
+
+        for (int i = 0; i < cnt; i++)
+            sql("CREATE TABLE test_tbl" + i + " (id int, val varchar)");
+
+        String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "test_tbl" + i + " p" + i).collect(joining(", "));
+        String sql = "SELECT * FROM " + bigJoin;
+
+        IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql));
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> !engine.runningQueries().isEmpty() || fut.isDone(), TIMEOUT_IN_MS));
+
+        Collection<? extends RunningQuery> running = engine.runningQueries();
+
+        assertEquals("Running: " + running, 1, running.size());
+
+        RunningQuery qry = F.first(running);
+
+        // Waits for planning.
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> qry.state() == QueryState.PLANNING, TIMEOUT_IN_MS));
+
+        qry.cancel();
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> engine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(0), IgniteSQLException.class, "The query was cancelled while planning");
+    }
+
+    /** */
+    @Test
+    public void testCancelAtExecutionPhase() throws IgniteCheckedException {
+        QueryEngine engine = queryProcessor(client);
+        int cnt = 6;
+
+        sql("CREATE TABLE person (id int, val varchar)");
+
+        String data = IntStream.range(0, 1000).mapToObj((i) -> "(" + i + "," + i + ")").collect(joining(", "));
+        String insertSql = "INSERT INTO person (id, val) VALUES " + data;
+
+        sql(insertSql);
+
+        String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "person p" + i).collect(joining(", "));
+        String sql = "SELECT * FROM " + bigJoin;
+
+        IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql));
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> {
+                Collection<? extends RunningQuery> queries = engine.runningQueries();
+
+                return !queries.isEmpty() && F.first(queries).state() == QueryState.EXECUTING;
+            },
+            TIMEOUT_IN_MS));
+
+        Collection<? extends RunningQuery> running = engine.runningQueries();
+
+        assertEquals(1, running.size());
+
+        RunningQuery qry = F.first(running);
+        qry.cancel();
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> engine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(100), IgniteSQLException.class, "The query was cancelled while executing.");
+    }
+
+    /** */
+    @Test
+    public void testCancelByRemoteFragment() throws IgniteCheckedException {
+        QueryEngine clientEngine = queryProcessor(client);
+        QueryEngine serverEngine = queryProcessor(srv);
+        int cnt = 6;
+
+        sql("CREATE TABLE t (id int, val varchar)");
+
+        String data = IntStream.range(0, 10000).mapToObj((i) -> "(" + i + ",'" + i + "')").collect(joining(", "));
+        String insertSql = "INSERT INTO t (id, val) VALUES " + data;
+
+        sql(insertSql);
+
+        String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "t t" + i).collect(joining(", "));
+        String sql = "SELECT * FROM " + bigJoin;
+
+        IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql));
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> {
+                Collection<? extends RunningQuery> queries = clientEngine.runningQueries();
+
+                return !queries.isEmpty() && F.first(queries).state() == QueryState.EXECUTING;
+            },
+            TIMEOUT_IN_MS));
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(() -> !serverEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+        Collection<? extends RunningQuery> running = serverEngine.runningQueries();
+        RunningQuery qry = F.first(running);
+
+        qry.cancel();
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> {
+                log.info("+++ " + F.first(clientEngine.runningQueries()));

Review comment:
       looks redundant

##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ *
+ */
+public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest {
+    /** */
+    private static IgniteEx srv;
+
+    /** Timeout in ms for async operations. */
+    private static final long TIMEOUT_IN_MS = 10_000;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        srv = grid(0);
+    }
+
+    /** */
+    @Test
+    public void testCancelAtPlanningPhase() throws IgniteCheckedException {
+        QueryEngine engine = queryProcessor(client);
+        int cnt = 9;
+
+        for (int i = 0; i < cnt; i++)
+            sql("CREATE TABLE test_tbl" + i + " (id int, val varchar)");
+
+        String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "test_tbl" + i + " p" + i).collect(joining(", "));
+        String sql = "SELECT * FROM " + bigJoin;
+
+        IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql));
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> !engine.runningQueries().isEmpty() || fut.isDone(), TIMEOUT_IN_MS));
+
+        Collection<? extends RunningQuery> running = engine.runningQueries();
+
+        assertEquals("Running: " + running, 1, running.size());
+
+        RunningQuery qry = F.first(running);
+
+        // Waits for planning.
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> qry.state() == QueryState.PLANNING, TIMEOUT_IN_MS));
+
+        qry.cancel();
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> engine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(0), IgniteSQLException.class, "The query was cancelled while planning");
+    }
+
+    /** */
+    @Test
+    public void testCancelAtExecutionPhase() throws IgniteCheckedException {
+        QueryEngine engine = queryProcessor(client);
+        int cnt = 6;
+
+        sql("CREATE TABLE person (id int, val varchar)");
+
+        String data = IntStream.range(0, 1000).mapToObj((i) -> "(" + i + "," + i + ")").collect(joining(", "));
+        String insertSql = "INSERT INTO person (id, val) VALUES " + data;
+
+        sql(insertSql);
+
+        String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "person p" + i).collect(joining(", "));
+        String sql = "SELECT * FROM " + bigJoin;
+
+        IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql));
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> {
+                Collection<? extends RunningQuery> queries = engine.runningQueries();
+
+                return !queries.isEmpty() && F.first(queries).state() == QueryState.EXECUTING;

Review comment:
       The same as for `testCancelAtPlanningPhase`, no guarantees to catch the execution phase.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.util.CancelFlag;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
+import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
+
+/**
+ * The RootQuery is created on the qury initiator (originator) node as the first step of a query run;
+ * It contains the information about query state, contexts, remote fragments;
+ * It provides 'cancel' functionality for running query like a base query class.
+ */
+public class RootQuery<RowT> extends Query<RowT> {
+    /** SQL query. */
+    private final String sql;
+
+    /** Parameters. */
+    private final Object[] params;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** remote nodes */
+    private final Set<UUID> remotes;
+
+    /** node to fragment */
+    private final Set<RemoteFragmentKey> waiting;
+
+    /** */
+    private volatile RootNode<RowT> root;
+
+    /** */
+    private volatile PlanningContext pctx;
+
+    /** */
+    private final BaseQueryContext ctx;
+
+    /** */
+    private final ExchangeService exch;
+
+    /** */
+    public RootQuery(
+        String sql,
+        SchemaPlus schema,
+        Object[] params,
+        QueryContext qryCtx,
+        ExchangeService exch,
+        Consumer<Query<RowT>> unregister,
+        IgniteLogger log
+    ) {
+        super(UUID.randomUUID(), qryCtx != null ? qryCtx.unwrap(GridQueryCancel.class) : null, unregister);
+
+        this.sql = sql;
+        this.params = params;
+        this.exch = exch;
+        this.log = log;
+
+        remotes = new HashSet<>();
+        waiting = new HashSet<>();
+
+        Context parent = Commons.convert(qryCtx);
+
+        ctx = BaseQueryContext.builder()
+            .parentContext(parent)
+            .frameworkConfig(
+                Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+                    .defaultSchema(schema)
+                    .build()
+            )
+            .logger(log)
+            .build();
+    }
+
+    /**
+     * Creates the new root that inherits the query parameters from {@code this} query.
+     * Is used to execute DML query immediately after (inside) DDL.
+     * e.g.:
+     *      CREATE TABLE MY_TABLE AS SELECT ... FROM ...;
+     *
+     * @param schema new schema.
+     */
+    public RootQuery<RowT> childQuery(SchemaPlus schema) {
+        return new RootQuery<>(sql, schema, params, QueryContext.of(cancel), exch, unregister, log);
+    }
+
+    /** */
+    public BaseQueryContext context() {
+        return ctx;
+    }
+
+    /** */
+    public String sql() {
+        return sql;
+    }
+
+    /** */
+    public Object[] parameters() {
+        return params;
+    }
+
+    /**
+     * Starts maping phase for the query.
+     */
+    public void mapping() {
+        synchronized (mux) {
+            if (state == QueryState.CLOSED) {
+                throw new IgniteSQLException(
+                    "The query was cancelled while executing.",
+                    IgniteQueryErrorCode.QUERY_CANCELED
+                );
+            }
+
+            state = QueryState.MAPPING;
+        }
+    }
+
+    /**
+     * Starts execution phase for the query and setup remote fragments.
+     */
+    public void run(ExecutionContext<RowT> ctx, MultiStepPlan plan, Node<RowT> root) {
+        synchronized (mux) {
+            if (state == QueryState.CLOSED) {
+                throw new IgniteSQLException(
+                    "The query was cancelled while executing.",
+                    IgniteQueryErrorCode.QUERY_CANCELED
+                );
+            }
+
+            RootNode<RowT> rootNode = new RootNode<>(ctx, plan.fieldsMetadata().rowType(), this::tryClose);
+            rootNode.register(root);
+
+            addFragment(new RunningFragment<>(F.first(plan.fragments()).root(), rootNode, ctx));
+
+            this.root = rootNode;
+
+            for (int i = 1; i < plan.fragments().size(); i++) {
+                Fragment fragment = plan.fragments().get(i);
+                List<UUID> nodes = plan.mapping(fragment).nodeIds();
+
+                remotes.addAll(nodes);
+
+                for (UUID node : nodes)
+                    waiting.add(new RemoteFragmentKey(node, fragment.fragmentId()));
+            }
+
+            state = QueryState.EXECUTING;
+        }
+    }
+
+    /**
+     * Can be called multiple times after receive each error
+     * at {@link #onResponse(RemoteFragmentKey, Throwable)}.
+     */
+    @Override protected void tryClose() {
+        QueryState state0 = null;
+
+        synchronized (mux) {
+            if (state == QueryState.CLOSED)
+                return;
+
+            if (state == QueryState.INITED || state == QueryState.PLANNING) {
+                state = QueryState.CLOSED;
+
+                return;
+            }
+
+            if (state == QueryState.EXECUTING)
+                state0 = state = QueryState.CLOSING;
+
+            root.closeInternal();
+
+            if (state == QueryState.CLOSING && waiting.isEmpty())
+                state0 = state = QueryState.CLOSED;
+        }
+
+        if (state0 == QueryState.CLOSED) {
+            try {
+                IgniteException wrpEx = null;
+
+                for (UUID nodeId : remotes) {
+                    try {
+                        if (!nodeId.equals(root.context().localNodeId()))
+                            exch.closeQuery(nodeId, id());
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (wrpEx == null)
+                            wrpEx = new IgniteException("Failed to send cancel message. [nodeId=" + nodeId + ']', e);
+                        else
+                            wrpEx.addSuppressed(e);
+                    }
+                }
+
+                if (wrpEx != null)
+                    throw wrpEx;
+            }
+            finally {
+                super.tryClose();
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        cancel.cancel();
+
+        tryClose();
+    }
+
+    /** */
+    public PlanningContext planningContext() {
+        synchronized (mux) {
+            if (state == QueryState.CLOSED) {
+                throw new IgniteSQLException(
+                    "The query was cancelled while executing.",
+                    IgniteQueryErrorCode.QUERY_CANCELED
+                );
+            }
+
+            if (pctx == null) {
+                state = QueryState.PLANNING;
+
+                pctx = PlanningContext.builder()
+                    .parentContext(ctx)
+                    .query(sql)
+                    .parameters(params)
+                    .build();
+
+                try {
+                    cancel.add(() -> pctx.unwrap(CancelFlag.class).requestCancel());
+                }
+                catch (QueryCancelledException e) {
+                    throw new IgniteSQLException(e.getMessage(), IgniteQueryErrorCode.QUERY_CANCELED, e);
+                }
+            }
+
+            return pctx;
+        }
+    }
+
+    /** */
+    public Iterator<RowT> iterator() {
+        return root;
+    }
+
+    /** */
+    public void onNodeLeft(UUID nodeId) {
+        List<RemoteFragmentKey> fragments = null;
+
+        synchronized (mux) {
+            fragments = waiting.stream().filter(f -> f.nodeId().equals(nodeId)).collect(Collectors.toList());
+        }
+
+        if (!F.isEmpty(fragments)) {
+            ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException(
+                "Failed to start query, node left. nodeId=" + nodeId);
+
+            for (RemoteFragmentKey fragment : fragments)
+                onResponse(fragment, ex);
+        }
+    }
+
+    /** */
+    public void onResponse(UUID nodeId, long fragmentId, Throwable error) {
+        onResponse(new RemoteFragmentKey(nodeId, fragmentId), error);
+    }
+
+    /** */
+    private void onResponse(RemoteFragmentKey fragment, Throwable error) {
+        QueryState state;
+        synchronized (mux) {
+            waiting.remove(fragment);
+
+            state = this.state;
+        }
+
+        if (error != null)
+            onError(error);
+        else if (state == QueryState.CLOSING)
+            tryClose();
+    }
+
+    /** */
+    public void onError(Throwable error) {
+        root.onError(error);
+
+        tryClose();
+    }
+
+    /** */
+    public static PlanningContext createPlanningContext(BaseQueryContext ctx, String qry, Object[] params) {

Review comment:
       There are no usages of this method

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
##########
@@ -271,9 +300,85 @@ public FailureProcessor failureProcessor() {
 
     /** {@inheritDoc} */
     @Override public List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext qryCtx, @Nullable String schemaName,
-        String qry, Object... params) throws IgniteSQLException {
+        String sql, Object... params) throws IgniteSQLException {
+        SchemaPlus schema = schemaHolder.schema(schemaName);
+
+        QueryPlan plan = queryPlanCache().queryPlan(new CacheKey(schema.getName(), sql));
+
+        if (plan != null) {
+            RootQuery<Object[]> qry = new RootQuery<>(
+                sql,
+                schema,
+                params,
+                qryCtx,
+                exchangeSvc,
+                (q) -> qryReg.unregister(q.id()),
+                log
+            );
+
+            qryReg.register(qry);
+
+            try {
+                return Collections.singletonList(executionSvc.executePlan(

Review comment:
       Let's add `<Object[]>` to `executionSvc` declaration, to avoid warnings here 

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;
+import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
+
+/** */
+public class Query<RowT> implements RunningQuery {
+    /** Completable futures empty array. */
+    private static final CompletableFuture<?>[] COMPLETABLE_FUTURES_EMPTY_ARRAY = new CompletableFuture<?>[0];
+
+    /** */
+    private final UUID id;
+
+    /** */
+    protected final Object mux = new Object();
+
+    /** */
+    protected final Set<RunningFragment<RowT>> fragments;
+
+    /** */
+    protected final GridQueryCancel cancel;
+
+    /** */
+    protected final Consumer<Query<RowT>> unregister;
+
+    /** */
+    protected volatile QueryState state = QueryState.INITED;
+
+    /** */
+    public Query(UUID id, GridQueryCancel cancel, Consumer<Query<RowT>> unregister) {
+        this.id = id;
+        this.unregister = unregister;
+
+        this.cancel = cancel != null ? cancel : new GridQueryCancel();
+
+        fragments = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    }
+
+    /** */
+    @Override public UUID id() {
+        return id;
+    }
+
+    /** */
+    @Override public QueryState state() {
+        return state;
+    }
+
+    /** */
+    public static BaseQueryContext createQueryContext(QueryContext ctx, SchemaPlus schema, IgniteLogger log) {

Review comment:
       Not used

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
##########
@@ -828,16 +622,13 @@ private FieldsMetadata queryFieldsMetadata(PlanningContext ctx, RelDataType sqlT
         return new FieldsMetadataImpl(resultType, origins);
     }
 
-    /** */
-    private boolean single(SqlNode sqlNode) {
-        return !(sqlNode instanceof SqlNodeList);
-    }
-
     /** */
     private void onMessage(UUID nodeId, final QueryStartRequest msg) {
         assert nodeId != null && msg != null;
 
         try {
+            Query<Row> qry = qryReg.register(new Query<>(msg.queryId(), null, (q) -> qryReg.unregister(q.id())));

Review comment:
       A little optimization is possible here. An additional instance of `Query` is created each time. Perhaps it's better to check `qryReg.query(msg.queryId())` first, and if instance not exists create new one and register. In this case, register should not return anything and `computeIfAbsent` in `register` method (with lambda creation on each call) can be replaced with `putIfAbstent` or `put`

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
##########
@@ -45,10 +45,10 @@
     QUERY_INBOX_CANCEL_MESSAGE(305, InboxCloseMessage::new),
 
     /** */
-    QUERY_OUTBOX_CANCEL_MESSAGE(306, OutboxCloseMessage::new),
+    GENERIC_VALUE_MESSAGE(307, GenericValueMessage::new),
 
     /** */
-    GENERIC_VALUE_MESSAGE(307, GenericValueMessage::new),
+    QUERY_CLOSE_MESSAGE(308, QueryCloseMessage::new),

Review comment:
       Why not reuse 306 code?

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistry.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.Collection;
+import java.util.UUID;
+
+import org.apache.ignite.internal.processors.query.calcite.util.Service;
+
+/**
+ * Registry of the running queries.
+ */
+public interface QueryRegistry<RowT> extends Service {

Review comment:
       Perhaps it's better to work in this interface with interface `RunningQuery` instead of class `Query<RowT>`. In this case you do not require `<RowT>` generic in the `QueryRegistry` interface. All current code do not require additional casting of `RunningQuery` to `Query` except one call to the `register` method.  WDYT?

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.util.CancelFlag;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
+import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
+
+/**
+ * The RootQuery is created on the qury initiator (originator) node as the first step of a query run;
+ * It contains the information about query state, contexts, remote fragments;
+ * It provides 'cancel' functionality for running query like a base query class.
+ */
+public class RootQuery<RowT> extends Query<RowT> {
+    /** SQL query. */
+    private final String sql;
+
+    /** Parameters. */
+    private final Object[] params;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** remote nodes */
+    private final Set<UUID> remotes;
+
+    /** node to fragment */
+    private final Set<RemoteFragmentKey> waiting;
+
+    /** */
+    private volatile RootNode<RowT> root;
+
+    /** */
+    private volatile PlanningContext pctx;
+
+    /** */
+    private final BaseQueryContext ctx;
+
+    /** */
+    private final ExchangeService exch;
+
+    /** */
+    public RootQuery(
+        String sql,
+        SchemaPlus schema,
+        Object[] params,
+        QueryContext qryCtx,
+        ExchangeService exch,
+        Consumer<Query<RowT>> unregister,
+        IgniteLogger log
+    ) {
+        super(UUID.randomUUID(), qryCtx != null ? qryCtx.unwrap(GridQueryCancel.class) : null, unregister);
+
+        this.sql = sql;
+        this.params = params;
+        this.exch = exch;
+        this.log = log;
+
+        remotes = new HashSet<>();
+        waiting = new HashSet<>();
+
+        Context parent = Commons.convert(qryCtx);
+
+        ctx = BaseQueryContext.builder()
+            .parentContext(parent)
+            .frameworkConfig(
+                Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+                    .defaultSchema(schema)
+                    .build()
+            )
+            .logger(log)
+            .build();
+    }
+
+    /**
+     * Creates the new root that inherits the query parameters from {@code this} query.
+     * Is used to execute DML query immediately after (inside) DDL.
+     * e.g.:
+     *      CREATE TABLE MY_TABLE AS SELECT ... FROM ...;
+     *
+     * @param schema new schema.
+     */
+    public RootQuery<RowT> childQuery(SchemaPlus schema) {
+        return new RootQuery<>(sql, schema, params, QueryContext.of(cancel), exch, unregister, log);
+    }
+
+    /** */
+    public BaseQueryContext context() {
+        return ctx;
+    }
+
+    /** */
+    public String sql() {
+        return sql;
+    }
+
+    /** */
+    public Object[] parameters() {
+        return params;
+    }
+
+    /**
+     * Starts maping phase for the query.
+     */
+    public void mapping() {
+        synchronized (mux) {
+            if (state == QueryState.CLOSED) {
+                throw new IgniteSQLException(
+                    "The query was cancelled while executing.",
+                    IgniteQueryErrorCode.QUERY_CANCELED
+                );
+            }
+
+            state = QueryState.MAPPING;
+        }
+    }
+
+    /**
+     * Starts execution phase for the query and setup remote fragments.
+     */
+    public void run(ExecutionContext<RowT> ctx, MultiStepPlan plan, Node<RowT> root) {
+        synchronized (mux) {
+            if (state == QueryState.CLOSED) {
+                throw new IgniteSQLException(
+                    "The query was cancelled while executing.",
+                    IgniteQueryErrorCode.QUERY_CANCELED
+                );
+            }
+
+            RootNode<RowT> rootNode = new RootNode<>(ctx, plan.fieldsMetadata().rowType(), this::tryClose);
+            rootNode.register(root);
+
+            addFragment(new RunningFragment<>(F.first(plan.fragments()).root(), rootNode, ctx));
+
+            this.root = rootNode;
+
+            for (int i = 1; i < plan.fragments().size(); i++) {
+                Fragment fragment = plan.fragments().get(i);
+                List<UUID> nodes = plan.mapping(fragment).nodeIds();
+
+                remotes.addAll(nodes);
+
+                for (UUID node : nodes)
+                    waiting.add(new RemoteFragmentKey(node, fragment.fragmentId()));
+            }
+
+            state = QueryState.EXECUTING;
+        }
+    }
+
+    /**
+     * Can be called multiple times after receive each error
+     * at {@link #onResponse(RemoteFragmentKey, Throwable)}.
+     */
+    @Override protected void tryClose() {
+        QueryState state0 = null;
+
+        synchronized (mux) {
+            if (state == QueryState.CLOSED)
+                return;
+
+            if (state == QueryState.INITED || state == QueryState.PLANNING) {
+                state = QueryState.CLOSED;
+
+                return;
+            }
+
+            if (state == QueryState.EXECUTING)
+                state0 = state = QueryState.CLOSING;
+
+            root.closeInternal();

Review comment:
       `tryClose` method can be executed several times, why do we closing the root node on each node response (it creates task to executor each time)? It's not enough to close the root node once?   

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/sql/SqlCommandProcessor.java
##########
@@ -90,15 +90,13 @@ public SqlCommandProcessor(GridKernalContext ctx, GridQuerySchemaManager schemaM
      *
      * @param sql SQL.
      * @param cmdNative Native command.
-     * @param cliCtx Client context.
      * @return Result.
      */
-    @Nullable public FieldsQueryCursor<List<?>> runCommand(String sql, SqlCommand cmdNative,
-        @Nullable SqlClientContext cliCtx) {

Review comment:
       Import should be removed too

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/sql/SqlCommandProcessor.java
##########
@@ -90,15 +90,13 @@ public SqlCommandProcessor(GridKernalContext ctx, GridQuerySchemaManager schemaM
      *
      * @param sql SQL.

Review comment:
       Parameter was removed

##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ *
+ */
+public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest {
+    /** */
+    private static IgniteEx srv;
+
+    /** Timeout in ms for async operations. */
+    private static final long TIMEOUT_IN_MS = 10_000;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        srv = grid(0);
+    }
+
+    /** */
+    @Test
+    public void testCancelAtPlanningPhase() throws IgniteCheckedException {
+        QueryEngine engine = queryProcessor(client);
+        int cnt = 9;
+
+        for (int i = 0; i < cnt; i++)
+            sql("CREATE TABLE test_tbl" + i + " (id int, val varchar)");
+
+        String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "test_tbl" + i + " p" + i).collect(joining(", "));
+        String sql = "SELECT * FROM " + bigJoin;
+
+        IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql));
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> !engine.runningQueries().isEmpty() || fut.isDone(), TIMEOUT_IN_MS));
+
+        Collection<? extends RunningQuery> running = engine.runningQueries();
+
+        assertEquals("Running: " + running, 1, running.size());
+
+        RunningQuery qry = F.first(running);
+
+        // Waits for planning.
+        Assert.assertTrue(GridTestUtils.waitForCondition(

Review comment:
       There are no barriers between planning and execution, there are no guarantees that the planning phase will be caught. The test can become flaky.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
##########
@@ -634,61 +453,57 @@ private FieldsMetadata explainFieldsMetadata(PlanningContext ctx) {
 
             case QUERY:
                 return mapAndExecutePlan(
-                    qryId,
-                    (MultiStepPlan)plan,
-                    pctx.unwrap(BaseQueryContext.class),
-                    pctx.parameters()
+                    qry,
+                    (MultiStepPlan)plan
                 );
 
             case EXPLAIN:
-                return executeExplain((ExplainPlan)plan, pctx);
+                return executeExplain((ExplainPlan)plan);
 
             case DDL:
-                return executeDdl(qryId, (DdlPlan)plan, pctx);
+                return executeDdl(qry, (DdlPlan)plan);
 
             default:
                 throw new AssertionError("Unexpected plan type: " + plan);
         }
     }
 
     /** */
-    private FieldsQueryCursor<List<?>> executeDdl(UUID qryId, DdlPlan plan, PlanningContext pctx) {
+    private FieldsQueryCursor<List<?>> executeDdl(RootQuery qry, DdlPlan plan) {

Review comment:
       `RootQuery` -> `RootQuery<Row>`

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
##########
@@ -271,9 +300,85 @@ public FailureProcessor failureProcessor() {
 
     /** {@inheritDoc} */
     @Override public List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext qryCtx, @Nullable String schemaName,
-        String qry, Object... params) throws IgniteSQLException {
+        String sql, Object... params) throws IgniteSQLException {
+        SchemaPlus schema = schemaHolder.schema(schemaName);
+
+        QueryPlan plan = queryPlanCache().queryPlan(new CacheKey(schema.getName(), sql));
+
+        if (plan != null) {
+            RootQuery<Object[]> qry = new RootQuery<>(
+                sql,
+                schema,
+                params,
+                qryCtx,
+                exchangeSvc,
+                (q) -> qryReg.unregister(q.id()),
+                log
+            );
+
+            qryReg.register(qry);
+
+            try {
+                return Collections.singletonList(executionSvc.executePlan(
+                    qry,
+                    plan
+                ));
+            }
+                catch (Exception e) {

Review comment:
       Wrong indent

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
##########
@@ -271,9 +300,85 @@ public FailureProcessor failureProcessor() {
 
     /** {@inheritDoc} */
     @Override public List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext qryCtx, @Nullable String schemaName,
-        String qry, Object... params) throws IgniteSQLException {
+        String sql, Object... params) throws IgniteSQLException {
+        SchemaPlus schema = schemaHolder.schema(schemaName);
+
+        QueryPlan plan = queryPlanCache().queryPlan(new CacheKey(schema.getName(), sql));
+
+        if (plan != null) {
+            RootQuery<Object[]> qry = new RootQuery<>(
+                sql,
+                schema,
+                params,
+                qryCtx,
+                exchangeSvc,
+                (q) -> qryReg.unregister(q.id()),
+                log
+            );
+
+            qryReg.register(qry);
+
+            try {
+                return Collections.singletonList(executionSvc.executePlan(
+                    qry,
+                    plan
+                ));
+            }
+                catch (Exception e) {
+                qryReg.unregister(qry.id());
+
+                if (qry.isCancelled())
+                    throw new IgniteSQLException("The query was cancelled while planning", IgniteQueryErrorCode.QUERY_CANCELED, e);
+                else
+                    throw e;
+            }
+        }
+
+        SqlNodeList qryList = Commons.parse(sql, FRAMEWORK_CONFIG.getParserConfig());
+        List<FieldsQueryCursor<List<?>>> cursors = new ArrayList<>(qryList.size());
+
+        List<RootQuery<Object[]>> qrys = new ArrayList<>(qryList.size());
 
-        return executionSvc.executeQuery(qryCtx, schemaName, qry, params);
+        for (final SqlNode sqlNode: qryList) {
+            RootQuery<Object[]> qry = new RootQuery<>(
+                sqlNode.toString(),
+                schemaHolder.schema(schemaName), // Update schema for each query in multiple statements.
+                params,
+                qryCtx,
+                exchangeSvc,
+                (q) -> qryReg.unregister(q.id()),
+                log
+            );
+
+            qrys.add(qry);
+
+            qryReg.register(qry);
+            try {
+                if (qryList.size() == 1) {
+                    plan = queryPlanCache().queryPlan(
+                        new CacheKey(schemaName, qry.sql()),
+                        () -> prepareSvc.prepareSingle(sqlNode, qry.planningContext()));
+                }
+                else
+                    plan = prepareSvc.prepareSingle(sqlNode, qry.planningContext());
+
+                cursors.add(executionSvc.executePlan(qry, plan));
+            }
+            catch (Exception e) {
+                boolean isCanceled = qry.isCancelled();
+
+                qrys.forEach(RootQuery::cancel);

Review comment:
       `RootQuery.cancel` can throw an exception, in this case query will not be unregistered and will be leaked in `qryReg`. Also, I think a test should be added for multistatement queries.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RunningFragment.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.Objects;
+
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.AbstractNode;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/** */
+public class RunningFragment<Row> {
+    /** */
+    private final IgniteRel rootRel;
+
+    /** */
+    private final AbstractNode<Row> root;
+
+    /** */
+    private final ExecutionContext<Row> ectx;
+
+    /** */
+    public RunningFragment(
+        IgniteRel rootRel,
+        AbstractNode<Row> root,
+        ExecutionContext<Row> ectx) {
+        this.rootRel = rootRel;
+        this.root = root;
+        this.ectx = ectx;
+    }
+
+    /** */
+    public ExecutionContext<Row> context() {
+        return ectx;
+    }
+
+    /** */
+    public IgniteRel rootRel() {
+        return rootRel;
+    }
+
+    /** */
+    public AbstractNode<Row> root() {
+        return root;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        RunningFragment fragment = (RunningFragment)o;

Review comment:
       `RunningFragment<Row> fragment = (RunningFragment<Row>)o;` to avoid warnings

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.util.CancelFlag;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
+import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
+
+/**
+ * The RootQuery is created on the qury initiator (originator) node as the first step of a query run;
+ * It contains the information about query state, contexts, remote fragments;
+ * It provides 'cancel' functionality for running query like a base query class.
+ */
+public class RootQuery<RowT> extends Query<RowT> {
+    /** SQL query. */
+    private final String sql;
+
+    /** Parameters. */
+    private final Object[] params;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** remote nodes */
+    private final Set<UUID> remotes;
+
+    /** node to fragment */
+    private final Set<RemoteFragmentKey> waiting;
+
+    /** */
+    private volatile RootNode<RowT> root;
+
+    /** */
+    private volatile PlanningContext pctx;
+
+    /** */
+    private final BaseQueryContext ctx;
+
+    /** */
+    private final ExchangeService exch;
+
+    /** */
+    public RootQuery(
+        String sql,
+        SchemaPlus schema,
+        Object[] params,
+        QueryContext qryCtx,
+        ExchangeService exch,
+        Consumer<Query<RowT>> unregister,
+        IgniteLogger log
+    ) {
+        super(UUID.randomUUID(), qryCtx != null ? qryCtx.unwrap(GridQueryCancel.class) : null, unregister);
+
+        this.sql = sql;
+        this.params = params;
+        this.exch = exch;
+        this.log = log;
+
+        remotes = new HashSet<>();
+        waiting = new HashSet<>();
+
+        Context parent = Commons.convert(qryCtx);
+
+        ctx = BaseQueryContext.builder()
+            .parentContext(parent)
+            .frameworkConfig(
+                Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+                    .defaultSchema(schema)
+                    .build()
+            )
+            .logger(log)
+            .build();
+    }
+
+    /**
+     * Creates the new root that inherits the query parameters from {@code this} query.
+     * Is used to execute DML query immediately after (inside) DDL.
+     * e.g.:
+     *      CREATE TABLE MY_TABLE AS SELECT ... FROM ...;
+     *
+     * @param schema new schema.
+     */
+    public RootQuery<RowT> childQuery(SchemaPlus schema) {
+        return new RootQuery<>(sql, schema, params, QueryContext.of(cancel), exch, unregister, log);
+    }
+
+    /** */
+    public BaseQueryContext context() {
+        return ctx;
+    }
+
+    /** */
+    public String sql() {
+        return sql;
+    }
+
+    /** */
+    public Object[] parameters() {
+        return params;
+    }
+
+    /**
+     * Starts maping phase for the query.
+     */
+    public void mapping() {
+        synchronized (mux) {
+            if (state == QueryState.CLOSED) {
+                throw new IgniteSQLException(
+                    "The query was cancelled while executing.",
+                    IgniteQueryErrorCode.QUERY_CANCELED
+                );
+            }
+
+            state = QueryState.MAPPING;
+        }
+    }
+
+    /**
+     * Starts execution phase for the query and setup remote fragments.
+     */
+    public void run(ExecutionContext<RowT> ctx, MultiStepPlan plan, Node<RowT> root) {
+        synchronized (mux) {
+            if (state == QueryState.CLOSED) {
+                throw new IgniteSQLException(
+                    "The query was cancelled while executing.",
+                    IgniteQueryErrorCode.QUERY_CANCELED
+                );
+            }
+
+            RootNode<RowT> rootNode = new RootNode<>(ctx, plan.fieldsMetadata().rowType(), this::tryClose);
+            rootNode.register(root);
+
+            addFragment(new RunningFragment<>(F.first(plan.fragments()).root(), rootNode, ctx));
+
+            this.root = rootNode;
+
+            for (int i = 1; i < plan.fragments().size(); i++) {
+                Fragment fragment = plan.fragments().get(i);
+                List<UUID> nodes = plan.mapping(fragment).nodeIds();
+
+                remotes.addAll(nodes);
+
+                for (UUID node : nodes)
+                    waiting.add(new RemoteFragmentKey(node, fragment.fragmentId()));
+            }
+
+            state = QueryState.EXECUTING;
+        }
+    }
+
+    /**
+     * Can be called multiple times after receive each error
+     * at {@link #onResponse(RemoteFragmentKey, Throwable)}.
+     */
+    @Override protected void tryClose() {
+        QueryState state0 = null;
+
+        synchronized (mux) {
+            if (state == QueryState.CLOSED)
+                return;
+
+            if (state == QueryState.INITED || state == QueryState.PLANNING) {

Review comment:
       What about MAPPING state?

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
##########
@@ -354,4 +347,21 @@ public boolean cancel() {
     public boolean isCancelled() {
         return cancelFlag.get();
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ExecutionContext<?> context = (ExecutionContext<?>)o;
+
+        return qryId.equals(context.qryId) && fragmentDesc.fragmentId() == context.fragmentDesc.fragmentId();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {

Review comment:
       Why do we need it? I see no maps or sets with ExecutionContext key (but perhaps I miss something)

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractQueryContext.java
##########
@@ -38,4 +38,9 @@ public AbstractQueryContext(Context parentCtx) {
 
         return parentCtx.unwrap(aCls);
     }
+
+    /** */
+    public Context parent() {

Review comment:
       Not used

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.util.CancelFlag;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
+import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
+
+/**
+ * The RootQuery is created on the qury initiator (originator) node as the first step of a query run;

Review comment:
       qury -> query

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
##########
@@ -375,46 +351,22 @@ public ClosableIteratorsHolder iteratorsHolder() {
         return iteratorsHolder;
     }
 
-    /** {@inheritDoc} */
-    @Override public List<FieldsQueryCursor<List<?>>> executeQuery(
-        @Nullable QueryContext ctx,
-        String schema,
-        String qry,
-        Object[] params
-    ) {
-        QueryPlan plan = queryPlanCache().queryPlan(new CacheKey(getDefaultSchema(schema).getName(), qry));
-        if (plan != null) {
-            PlanningContext pctx = createContext(ctx, schema, qry, params);
-
-            return Collections.singletonList(executePlan(UUID.randomUUID(), pctx, plan));
-        }
-
-        SqlNodeList qryList = Commons.parse(qry, FRAMEWORK_CONFIG.getParserConfig());
-        List<FieldsQueryCursor<List<?>>> cursors = new ArrayList<>(qryList.size());
-
-        for (final SqlNode qry0: qryList) {
-            final PlanningContext pctx = createContext(ctx, schema, qry0.toString(), params);
-
-            if (qryList.size() == 1) {
-                plan = queryPlanCache().queryPlan(
-                    new CacheKey(pctx.schemaName(), pctx.query()),
-                    () -> prepareSingle(qry0, pctx));
-            }
-            else
-                plan = prepareSingle(qry0, pctx);
-
-            cursors.add(executePlan(UUID.randomUUID(), pctx, plan));
-        }
+    /** */
+    public QueryRegistry<Row> queryRegistry() {

Review comment:
       Method never used

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
##########
@@ -108,6 +113,16 @@ public MessageService messageService() {
         return msgSvc;
     }
 
+    /** */
+    public QueryRegistry queryRegistry() {

Review comment:
       Not used

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
##########
@@ -634,61 +453,57 @@ private FieldsMetadata explainFieldsMetadata(PlanningContext ctx) {
 
             case QUERY:
                 return mapAndExecutePlan(
-                    qryId,
-                    (MultiStepPlan)plan,
-                    pctx.unwrap(BaseQueryContext.class),
-                    pctx.parameters()
+                    qry,
+                    (MultiStepPlan)plan
                 );
 
             case EXPLAIN:
-                return executeExplain((ExplainPlan)plan, pctx);
+                return executeExplain((ExplainPlan)plan);
 
             case DDL:
-                return executeDdl(qryId, (DdlPlan)plan, pctx);
+                return executeDdl(qry, (DdlPlan)plan);
 
             default:
                 throw new AssertionError("Unexpected plan type: " + plan);
         }
     }
 
     /** */
-    private FieldsQueryCursor<List<?>> executeDdl(UUID qryId, DdlPlan plan, PlanningContext pctx) {
+    private FieldsQueryCursor<List<?>> executeDdl(RootQuery qry, DdlPlan plan) {
         try {
-            ddlCmdHnd.handle(qryId, plan.command(), pctx);
+            ddlCmdHnd.handle(qry.id(), plan.command());
         }
         catch (IgniteCheckedException e) {
-            throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + pctx.query() +
+            throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qry.sql() +
                 ", err=" + e.getMessage() + ']', e);
         }
+        finally {
+            qryReg.unregister(qry.id());
+        }
 
-        if (plan.command() instanceof CreateTableCommand && ((CreateTableCommand)plan.command()).insertStatement() != null) {
-            SqlInsert insertStmt = ((CreateTableCommand)plan.command()).insertStatement();
+        if (plan.command() instanceof CreateTableCommand
+            && ((CreateTableCommand)plan.command()).insertStatement() != null) {
+            RootQuery<Row> insQry = qry.childQuery(schemaHolder.schema(qry.context().schemaName()));

Review comment:
       This query will not be registered in query registry (at least in the planning phase) and we can't kill it. Is it ok?

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RunningFragment.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.Objects;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.AbstractNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Downstream;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/** */
+public class RunningFragment<Row> {
+    /** */
+    private final IgniteRel rootRel;
+
+    /** */
+    private final AbstractNode<Row> root;
+
+    /** */
+    private final ExecutionContext<Row> ectx;
+
+    /** */
+    public RunningFragment(
+        IgniteRel rootRel,

Review comment:
       Getter method is not required then




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org