You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/10/28 05:44:47 UTC

[GitHub] [ignite] Berkof commented on a change in pull request #9476: IGNITE-12991 Calcite integration. Introduce running query registry & cancellation refactoring

Berkof commented on a change in pull request #9476:
URL: https://github.com/apache/ignite/pull/9476#discussion_r737999397



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
##########
@@ -271,9 +300,85 @@ public FailureProcessor failureProcessor() {
 
     /** {@inheritDoc} */
     @Override public List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext qryCtx, @Nullable String schemaName,
-        String qry, Object... params) throws IgniteSQLException {
+        String sql, Object... params) throws IgniteSQLException {
+        SchemaPlus schema = schemaHolder.schema(schemaName);
+
+        QueryPlan plan = queryPlanCache().queryPlan(new CacheKey(schema.getName(), sql));
+
+        if (plan != null) {
+            RootQuery<Object[]> qry = new RootQuery<>(
+                sql,
+                schema,
+                params,
+                qryCtx,
+                exchangeSvc,
+                (q) -> qryReg.unregister(q.id()),
+                log
+            );
+
+            qryReg.register(qry);
+
+            try {
+                return Collections.singletonList(executionSvc.executePlan(
+                    qry,
+                    plan
+                ));
+            }
+                catch (Exception e) {
+                qryReg.unregister(qry.id());
+
+                if (qry.isCancelled())
+                    throw new IgniteSQLException("The query was cancelled while planning", IgniteQueryErrorCode.QUERY_CANCELED, e);

Review comment:
       Can we get some other exception here? For example - while query cancellation.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
##########
@@ -271,9 +300,85 @@ public FailureProcessor failureProcessor() {
 
     /** {@inheritDoc} */
     @Override public List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext qryCtx, @Nullable String schemaName,
-        String qry, Object... params) throws IgniteSQLException {
+        String sql, Object... params) throws IgniteSQLException {
+        SchemaPlus schema = schemaHolder.schema(schemaName);
+
+        QueryPlan plan = queryPlanCache().queryPlan(new CacheKey(schema.getName(), sql));
+
+        if (plan != null) {
+            RootQuery<Object[]> qry = new RootQuery<>(
+                sql,
+                schema,
+                params,
+                qryCtx,
+                exchangeSvc,
+                (q) -> qryReg.unregister(q.id()),
+                log
+            );
+
+            qryReg.register(qry);
+
+            try {
+                return Collections.singletonList(executionSvc.executePlan(
+                    qry,
+                    plan
+                ));
+            }
+                catch (Exception e) {
+                qryReg.unregister(qry.id());
+
+                if (qry.isCancelled())
+                    throw new IgniteSQLException("The query was cancelled while planning", IgniteQueryErrorCode.QUERY_CANCELED, e);
+                else
+                    throw e;
+            }
+        }
+
+        SqlNodeList qryList = Commons.parse(sql, FRAMEWORK_CONFIG.getParserConfig());
+        List<FieldsQueryCursor<List<?>>> cursors = new ArrayList<>(qryList.size());
+
+        List<RootQuery<Object[]>> qrys = new ArrayList<>(qryList.size());
 
-        return executionSvc.executeQuery(qryCtx, schemaName, qry, params);
+        for (final SqlNode sqlNode: qryList) {
+            RootQuery<Object[]> qry = new RootQuery<>(
+                sqlNode.toString(),
+                schemaHolder.schema(schemaName), // Update schema for each query in multiple statements.
+                params,
+                qryCtx,
+                exchangeSvc,
+                (q) -> qryReg.unregister(q.id()),
+                log
+            );
+
+            qrys.add(qry);
+
+            qryReg.register(qry);
+            try {

Review comment:
       Empty line before try block.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RunningFragment.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.Objects;
+
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.AbstractNode;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/** */

Review comment:
       JavaDoc

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;
+import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
+
+/** */
+public class Query<RowT> implements RunningQuery {
+    /** Completable futures empty array. */
+    private static final CompletableFuture<?>[] COMPLETABLE_FUTURES_EMPTY_ARRAY = new CompletableFuture<?>[0];
+
+    /** */
+    private final UUID id;
+
+    /** */
+    protected final Object mux = new Object();
+
+    /** */
+    protected final Set<RunningFragment<RowT>> fragments;
+
+    /** */
+    protected final GridQueryCancel cancel;
+
+    /** */
+    protected final Consumer<Query<RowT>> unregister;
+
+    /** */
+    protected volatile QueryState state = QueryState.INITED;
+
+    /** */
+    public Query(UUID id, GridQueryCancel cancel, Consumer<Query<RowT>> unregister) {
+        this.id = id;
+        this.unregister = unregister;
+
+        this.cancel = cancel != null ? cancel : new GridQueryCancel();
+
+        fragments = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    }
+
+    /** */
+    @Override public UUID id() {
+        return id;
+    }
+
+    /** */
+    @Override public QueryState state() {
+        return state;
+    }
+
+    /** */
+    protected void tryClose() {
+        List<CompletableFuture<?>> futs = new ArrayList<>();
+        for (RunningFragment<RowT> frag : fragments) {
+            CompletableFuture<?> f = frag.context().submit(frag.root()::close, frag.root()::onError);
+
+            CompletableFuture<?> fCancel = f.thenApply((u) -> frag.context().submit(frag.context()::cancel, frag.root()::onError));
+            futs.add(fCancel);
+        }
+
+        CompletableFuture.allOf(futs.toArray(COMPLETABLE_FUTURES_EMPTY_ARRAY))
+            .thenAccept((u) -> unregister.accept(this));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        synchronized (mux) {
+            if (state == QueryState.CLOSED)
+                return;
+
+            if (state == QueryState.EXECUTING)
+                state = QueryState.CLOSED;
+        }
+
+        for (RunningFragment<RowT> frag : fragments)
+            frag.context().execute(() -> frag.root().onError(new ExecutionCancelledException()), frag.root()::onError);
+
+        tryClose();
+    }
+
+    /** */

Review comment:
       JavaDoc

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQuery.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.UUID;
+
+/**
+ *
+ */
+public interface RunningQuery {
+    /** */
+    UUID id();

Review comment:
       Should we use public modifiers here?

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
##########
@@ -291,4 +396,19 @@ private void onStop(Service... services) {
                 ((LifecycleAware) service).onStop();
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public RunningQuery runningQuery(UUID id) {

Review comment:
       No usage found.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
##########
@@ -271,9 +300,85 @@ public FailureProcessor failureProcessor() {
 
     /** {@inheritDoc} */
     @Override public List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext qryCtx, @Nullable String schemaName,

Review comment:
       Contains 2.5 copy of execution code:
   1) if there are cached plan
   2) if there is no cached plan for single statement qry (with caching)
   2.5) if there is no cached plan for... multistatement qry? (without caching)
   With same exception handling. Let's refactor it with some "getOrCreateQryPlan(qry)" and "executeQry(RootQuery)"

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
##########
@@ -271,9 +300,85 @@ public FailureProcessor failureProcessor() {
 
     /** {@inheritDoc} */
     @Override public List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext qryCtx, @Nullable String schemaName,
-        String qry, Object... params) throws IgniteSQLException {
+        String sql, Object... params) throws IgniteSQLException {

Review comment:
       Parameters should be alined one by line:
   @Override public List<FieldsQueryCursor<List<?>>> query(
       @Nullable QueryContext qryCtx, 
       @Nullable String schemaName,
       String SQL, 
       Object... params
   ) throws IgniteSQLException {

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;
+import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
+
+/** */

Review comment:
       JavaDoc

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQuery.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.UUID;
+
+/**
+ *

Review comment:
       JavaDoc

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;
+import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
+
+/** */
+public class Query<RowT> implements RunningQuery {
+    /** Completable futures empty array. */
+    private static final CompletableFuture<?>[] COMPLETABLE_FUTURES_EMPTY_ARRAY = new CompletableFuture<?>[0];
+
+    /** */
+    private final UUID id;
+
+    /** */
+    protected final Object mux = new Object();
+
+    /** */
+    protected final Set<RunningFragment<RowT>> fragments;
+
+    /** */
+    protected final GridQueryCancel cancel;
+
+    /** */
+    protected final Consumer<Query<RowT>> unregister;
+
+    /** */
+    protected volatile QueryState state = QueryState.INITED;
+
+    /** */
+    public Query(UUID id, GridQueryCancel cancel, Consumer<Query<RowT>> unregister) {
+        this.id = id;
+        this.unregister = unregister;
+
+        this.cancel = cancel != null ? cancel : new GridQueryCancel();
+
+        fragments = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    }
+
+    /** */
+    @Override public UUID id() {
+        return id;
+    }
+
+    /** */
+    @Override public QueryState state() {
+        return state;
+    }
+
+    /** */
+    protected void tryClose() {
+        List<CompletableFuture<?>> futs = new ArrayList<>();
+        for (RunningFragment<RowT> frag : fragments) {

Review comment:
       Empty line before for block.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
##########
@@ -828,16 +623,13 @@ private FieldsMetadata queryFieldsMetadata(PlanningContext ctx, RelDataType sqlT
         return new FieldsMetadataImpl(resultType, origins);
     }
 
-    /** */
-    private boolean single(SqlNode sqlNode) {
-        return !(sqlNode instanceof SqlNodeList);
-    }
-
     /** */
     private void onMessage(UUID nodeId, final QueryStartRequest msg) {
         assert nodeId != null && msg != null;
 
         try {
+            Query<Row> qry = (Query<Row>)qryReg.register(new Query<>(msg.queryId(), null, (q) -> qryReg.unregister(q.id())));

Review comment:
       Too long line of code.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
##########
@@ -36,4 +39,10 @@
      */
     List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext ctx, String schemaName, String qry, Object... params)
         throws IgniteSQLException;
+
+    /** */
+    Collection<? extends RunningQuery> runningQueries();
+
+    /** */
+    RunningQuery runningQuery(UUID id);

Review comment:
       1) No usage found.
   2) Should it be public?
   

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Registry of the running queries.
+ */
+public class QueryRegistryImpl implements QueryRegistry {
+    /** */
+    private final ConcurrentMap<UUID, RunningQuery> runningQrys = new ConcurrentHashMap<>();
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    public QueryRegistryImpl(IgniteLogger log) {
+        this.log = log;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RunningQuery register(RunningQuery qry) {
+        return runningQrys.computeIfAbsent(qry.id(), k -> qry);

Review comment:
       Why not putIfAbsent?

##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ *
+ */
+public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest {
+    /** */
+    private static IgniteEx srv;
+
+    /** Timeout in ms for async operations. */
+    private static final long TIMEOUT_IN_MS = 10_000;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        srv = grid(0);
+    }
+
+    /** */
+    @Test
+    public void testCancelAtPlanningPhase() throws IgniteCheckedException {
+        QueryEngine engine = queryProcessor(client);
+        int cnt = 9;
+
+        for (int i = 0; i < cnt; i++)
+            sql("CREATE TABLE test_tbl" + i + " (id int, val varchar)");
+
+        String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "test_tbl" + i + " p" + i).collect(joining(", "));
+        String sql = "SELECT * FROM " + bigJoin;
+
+        IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql));
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> !engine.runningQueries().isEmpty() || fut.isDone(), TIMEOUT_IN_MS));
+
+        Collection<? extends RunningQuery> running = engine.runningQueries();
+
+        assertEquals("Running: " + running, 1, running.size());
+
+        RunningQuery qry = F.first(running);
+
+        // Waits for planning.
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> qry.state() == QueryState.PLANNING, TIMEOUT_IN_MS));
+
+        qry.cancel();
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> engine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(0), IgniteSQLException.class, "The query was cancelled while planning");
+    }
+
+    /** */
+    @Test
+    public void testCancelAtExecutionPhase() throws IgniteCheckedException {
+        QueryEngine engine = queryProcessor(client);
+        int cnt = 6;
+
+        sql("CREATE TABLE person (id int, val varchar)");
+
+        String data = IntStream.range(0, 1000).mapToObj((i) -> "(" + i + "," + i + ")").collect(joining(", "));
+        String insertSql = "INSERT INTO person (id, val) VALUES " + data;
+
+        sql(insertSql);
+
+        String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "person p" + i).collect(joining(", "));
+        String sql = "SELECT * FROM " + bigJoin;
+
+        IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql));
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> {
+                Collection<? extends RunningQuery> queries = engine.runningQueries();
+
+                return !queries.isEmpty() && F.first(queries).state() == QueryState.EXECUTING;
+            },
+            TIMEOUT_IN_MS));
+
+        Collection<? extends RunningQuery> running = engine.runningQueries();
+
+        assertEquals(1, running.size());
+
+        RunningQuery qry = F.first(running);
+        qry.cancel();
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> engine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(100), IgniteSQLException.class, "The query was cancelled while executing.");
+    }
+
+    /** */
+    @Test
+    public void testCancelByRemoteFragment() throws IgniteCheckedException {

Review comment:
       Test fails with an error:
   java.lang.AssertionError
   	at org.junit.Assert.fail(Assert.java:86)
   	at org.junit.Assert.assertTrue(Assert.java:41)
   	at org.junit.Assert.assertTrue(Assert.java:52)
   	at org.apache.ignite.internal.processors.query.calcite.integration.RunningQueriesIntegrationTest.testCancelByRemoteFragment(RunningQueriesIntegrationTest.java:149)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
   	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
   	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
   	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
   	at org.apache.ignite.testframework.junits.GridAbstractTest$7.run(GridAbstractTest.java:2432)
   	at java.lang.Thread.run(Thread.java:748)

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;
+import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
+
+/** */
+public class Query<RowT> implements RunningQuery {
+    /** Completable futures empty array. */
+    private static final CompletableFuture<?>[] COMPLETABLE_FUTURES_EMPTY_ARRAY = new CompletableFuture<?>[0];
+
+    /** */
+    private final UUID id;
+
+    /** */
+    protected final Object mux = new Object();
+
+    /** */
+    protected final Set<RunningFragment<RowT>> fragments;
+
+    /** */
+    protected final GridQueryCancel cancel;
+
+    /** */
+    protected final Consumer<Query<RowT>> unregister;
+
+    /** */
+    protected volatile QueryState state = QueryState.INITED;
+
+    /** */
+    public Query(UUID id, GridQueryCancel cancel, Consumer<Query<RowT>> unregister) {
+        this.id = id;
+        this.unregister = unregister;
+
+        this.cancel = cancel != null ? cancel : new GridQueryCancel();
+
+        fragments = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    }
+
+    /** */
+    @Override public UUID id() {
+        return id;
+    }
+
+    /** */
+    @Override public QueryState state() {
+        return state;
+    }
+
+    /** */
+    protected void tryClose() {
+        List<CompletableFuture<?>> futs = new ArrayList<>();
+        for (RunningFragment<RowT> frag : fragments) {
+            CompletableFuture<?> f = frag.context().submit(frag.root()::close, frag.root()::onError);
+
+            CompletableFuture<?> fCancel = f.thenApply((u) -> frag.context().submit(frag.context()::cancel, frag.root()::onError));

Review comment:
       Line longer than 120 symbols.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RemoteFragmentKey.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.UUID;
+
+/** */
+final class RemoteFragmentKey {
+    /** */
+    private final UUID nodeId;
+
+    /** */
+    private final long fragmentId;
+
+    /** */
+    RemoteFragmentKey(UUID nodeId, long fragmentId) {

Review comment:
       Not public?

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RemoteFragmentKey.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.UUID;
+
+/** */

Review comment:
       JavaDoc

##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ *
+ */
+public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest {
+    /** */
+    private static IgniteEx srv;
+
+    /** Timeout in ms for async operations. */
+    private static final long TIMEOUT_IN_MS = 10_000;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        srv = grid(0);
+    }
+
+    /** */
+    @Test
+    public void testCancelAtPlanningPhase() throws IgniteCheckedException {
+        QueryEngine engine = queryProcessor(client);
+        int cnt = 9;
+
+        for (int i = 0; i < cnt; i++)
+            sql("CREATE TABLE test_tbl" + i + " (id int, val varchar)");
+
+        String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "test_tbl" + i + " p" + i).collect(joining(", "));
+        String sql = "SELECT * FROM " + bigJoin;
+
+        IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql));
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> !engine.runningQueries().isEmpty() || fut.isDone(), TIMEOUT_IN_MS));
+
+        Collection<? extends RunningQuery> running = engine.runningQueries();
+
+        assertEquals("Running: " + running, 1, running.size());
+
+        RunningQuery qry = F.first(running);
+
+        // Waits for planning.
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> qry.state() == QueryState.PLANNING, TIMEOUT_IN_MS));
+
+        qry.cancel();
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> engine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(0), IgniteSQLException.class, "The query was cancelled while planning");
+    }
+
+    /** */
+    @Test
+    public void testCancelAtExecutionPhase() throws IgniteCheckedException {
+        QueryEngine engine = queryProcessor(client);
+        int cnt = 6;
+
+        sql("CREATE TABLE person (id int, val varchar)");
+
+        String data = IntStream.range(0, 1000).mapToObj((i) -> "(" + i + "," + i + ")").collect(joining(", "));
+        String insertSql = "INSERT INTO person (id, val) VALUES " + data;
+
+        sql(insertSql);
+
+        String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "person p" + i).collect(joining(", "));
+        String sql = "SELECT * FROM " + bigJoin;
+
+        IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql));
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> {
+                Collection<? extends RunningQuery> queries = engine.runningQueries();
+
+                return !queries.isEmpty() && F.first(queries).state() == QueryState.EXECUTING;
+            },
+            TIMEOUT_IN_MS));
+
+        Collection<? extends RunningQuery> running = engine.runningQueries();
+
+        assertEquals(1, running.size());
+
+        RunningQuery qry = F.first(running);
+        qry.cancel();
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> engine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(100), IgniteSQLException.class, "The query was cancelled while executing.");
+    }
+
+    /** */

Review comment:
       JavaDoc

##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ *
+ */
+public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest {
+    /** */
+    private static IgniteEx srv;
+
+    /** Timeout in ms for async operations. */
+    private static final long TIMEOUT_IN_MS = 10_000;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        srv = grid(0);
+    }
+
+    /** */
+    @Test
+    public void testCancelAtPlanningPhase() throws IgniteCheckedException {
+        QueryEngine engine = queryProcessor(client);
+        int cnt = 9;
+
+        for (int i = 0; i < cnt; i++)
+            sql("CREATE TABLE test_tbl" + i + " (id int, val varchar)");
+
+        String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "test_tbl" + i + " p" + i).collect(joining(", "));
+        String sql = "SELECT * FROM " + bigJoin;
+
+        IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql));
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> !engine.runningQueries().isEmpty() || fut.isDone(), TIMEOUT_IN_MS));
+
+        Collection<? extends RunningQuery> running = engine.runningQueries();
+
+        assertEquals("Running: " + running, 1, running.size());
+
+        RunningQuery qry = F.first(running);
+
+        // Waits for planning.
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> qry.state() == QueryState.PLANNING, TIMEOUT_IN_MS));
+
+        qry.cancel();
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> engine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(0), IgniteSQLException.class, "The query was cancelled while planning");
+    }
+
+    /** */
+    @Test
+    public void testCancelAtExecutionPhase() throws IgniteCheckedException {

Review comment:
       Test fails with an error:
   org.apache.ignite.internal.processors.query.calcite.integration.RunningQueriesIntegrationTest.lambda$testCancelAtExecutionPhase$8(RunningQueriesIntegrationTest.java:107)
   	at org.apache.ignite.testframework.GridTestUtils.lambda$runAsync$3(GridTestUtils.java:1175)
   	at org.apache.ignite.testframework.GridTestUtils$7.call(GridTestUtils.java:1496)
   	at org.apache.ignite.testframework.GridTestThread.run(GridTestThread.java:88)
   Caused by: java.lang.NullPointerException
   	at org.apache.ignite.internal.processors.query.calcite.exec.ExecutionServiceImpl.executeFragment(ExecutionServiceImpl.java:604)
   	at org.apache.ignite.internal.processors.query.calcite.exec.ExecutionServiceImpl.onMessage(ExecutionServiceImpl.java:654)
   	at org.apache.ignite.internal.processors.query.calcite.exec.ExecutionServiceImpl.lambda$init$2(ExecutionServiceImpl.java:400)
   	at org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl.onMessageInternal(MessageServiceImpl.java:276)
   	at org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl.lam

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
##########
@@ -375,46 +352,22 @@ public ClosableIteratorsHolder iteratorsHolder() {
         return iteratorsHolder;
     }
 
-    /** {@inheritDoc} */
-    @Override public List<FieldsQueryCursor<List<?>>> executeQuery(
-        @Nullable QueryContext ctx,
-        String schema,
-        String qry,
-        Object[] params
-    ) {
-        QueryPlan plan = queryPlanCache().queryPlan(new CacheKey(getDefaultSchema(schema).getName(), qry));
-        if (plan != null) {
-            PlanningContext pctx = createContext(ctx, schema, qry, params);
-
-            return Collections.singletonList(executePlan(UUID.randomUUID(), pctx, plan));
-        }
-
-        SqlNodeList qryList = Commons.parse(qry, FRAMEWORK_CONFIG.getParserConfig());
-        List<FieldsQueryCursor<List<?>>> cursors = new ArrayList<>(qryList.size());
-
-        for (final SqlNode qry0: qryList) {
-            final PlanningContext pctx = createContext(ctx, schema, qry0.toString(), params);
-
-            if (qryList.size() == 1) {
-                plan = queryPlanCache().queryPlan(
-                    new CacheKey(pctx.schemaName(), pctx.query()),
-                    () -> prepareSingle(qry0, pctx));
-            }
-            else
-                plan = prepareSingle(qry0, pctx);
-
-            cursors.add(executePlan(UUID.randomUUID(), pctx, plan));
-        }
+    /** */
+    public QueryRegistry queryRegistry() {

Review comment:
       No usages found.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareServiceImpl.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlDdl;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.DdlSqlToCommandConverter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Collections.singletonList;
+import static org.apache.calcite.rel.type.RelDataType.PRECISION_NOT_SPECIFIED;
+import static org.apache.ignite.internal.processors.query.calcite.prepare.PlannerHelper.optimize;
+
+/**
+ *
+ */
+@SuppressWarnings("TypeMayBeWeakened")
+public class PrepareServiceImpl extends AbstractService implements PrepareService {
+    /** */
+    private final DdlSqlToCommandConverter ddlConverter;
+
+    /**
+     * @param ctx Kernal.
+     */
+    public PrepareServiceImpl(GridKernalContext ctx) {
+        super(ctx);
+
+        ddlConverter = new DdlSqlToCommandConverter();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onStart(GridKernalContext ctx) {
+        super.onStart(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public QueryPlan prepareSingle(SqlNode sqlNode, PlanningContext ctx) {
+        try {
+            assert single(sqlNode);
+
+            ctx.planner().reset();
+
+            if (SqlKind.DDL.contains(sqlNode.getKind()))
+                return prepareDdl(sqlNode, ctx);
+
+            switch (sqlNode.getKind()) {
+                case SELECT:
+                case ORDER_BY:
+                case WITH:
+                case VALUES:
+                case UNION:
+                case EXCEPT:
+                case INTERSECT:
+                    return prepareQuery(sqlNode, ctx);
+
+                case INSERT:
+                case DELETE:
+                case UPDATE:
+                    return prepareDml(sqlNode, ctx);
+
+                case EXPLAIN:
+                    return prepareExplain(sqlNode, ctx);
+
+                default:
+                    throw new IgniteSQLException("Unsupported operation [" +
+                        "sqlNodeKind=" + sqlNode.getKind() + "; " +
+                        "querySql=\"" + ctx.query() + "\"]", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+            }
+        }
+        catch (ValidationException | CalciteContextException e) {
+            throw new IgniteSQLException("Failed to validate query. " + e.getMessage(), IgniteQueryErrorCode.PARSING, e);
+        }
+    }
+
+    /**
+     *
+     */
+    private QueryPlan prepareDdl(SqlNode sqlNode, PlanningContext ctx) {
+        assert sqlNode instanceof SqlDdl : sqlNode == null ? "null" : sqlNode.getClass().getName();
+
+        return new DdlPlan(ddlConverter.convert((SqlDdl)sqlNode, ctx));
+    }
+
+    /**
+     *
+     */
+    private QueryPlan prepareExplain(SqlNode explain, PlanningContext ctx) throws ValidationException {
+        IgnitePlanner planner = ctx.planner();
+
+        SqlNode sql = ((SqlExplain)explain).getExplicandum();
+
+        // Validate
+        sql = planner.validate(sql);
+
+        // Convert to Relational operators graph
+        IgniteRel igniteRel = optimize(sql, planner, log);
+
+        String plan = RelOptUtil.toString(igniteRel, SqlExplainLevel.ALL_ATTRIBUTES);
+
+        return new ExplainPlan(plan, explainFieldsMetadata(ctx));
+    }
+
+    /** */
+    private boolean single(SqlNode sqlNode) {
+        return !(sqlNode instanceof SqlNodeList);
+    }
+
+    /** */
+    private QueryPlan prepareQuery(SqlNode sqlNode, PlanningContext ctx) {
+        IgnitePlanner planner = ctx.planner();
+
+        // Validate
+        ValidationResult validated = planner.validateAndGetTypeMetadata(sqlNode);
+
+        sqlNode = validated.sqlNode();
+
+        IgniteRel igniteRel = optimize(sqlNode, planner, log);
+
+        // Split query plan to query fragments.
+        List<Fragment> fragments = new Splitter().go(igniteRel);
+
+        QueryTemplate template = new QueryTemplate(fragments);
+
+        return new MultiStepQueryPlan(template, queryFieldsMetadata(ctx, validated.dataType(), validated.origins()));
+    }
+
+    /** */
+    private QueryPlan prepareDml(SqlNode sqlNode, PlanningContext ctx) throws ValidationException {
+        IgnitePlanner planner = ctx.planner();
+
+        // Validate
+        sqlNode = planner.validate(sqlNode);
+
+        // Convert to Relational operators graph
+        IgniteRel igniteRel = optimize(sqlNode, planner, log);
+
+        // Split query plan to query fragments.
+        List<Fragment> fragments = new Splitter().go(igniteRel);
+
+        QueryTemplate template = new QueryTemplate(fragments);
+
+        return new MultiStepDmlPlan(template, queryFieldsMetadata(ctx, igniteRel.getRowType(), null));
+    }
+
+    /** */
+    private FieldsMetadata queryFieldsMetadata(PlanningContext ctx, RelDataType sqlType,
+        @Nullable List<List<String>> origins) {
+        RelDataType resultType = TypeUtils.getResultType(
+            ctx.typeFactory(), ctx.catalogReader(), sqlType, origins);
+        return new FieldsMetadataImpl(resultType, origins);

Review comment:
       New line before return.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org