You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/04/26 12:51:50 UTC

[GitHub] [ignite] ygerzhedovich opened a new pull request #9046: IGNITE-12991: Calcite integration. Query Cancellation

ygerzhedovich opened a new pull request #9046:
URL: https://github.com/apache/ignite/pull/9046


   Thank you for submitting the pull request to the Apache Ignite.
   
   In order to streamline the review of the contribution 
   we ask you to ensure the following steps have been taken:
   
   ### The Contribution Checklist
   - [ ] There is a single JIRA ticket related to the pull request. 
   - [ ] The web-link to the pull request is attached to the JIRA ticket.
   - [ ] The JIRA ticket has the _Patch Available_ state.
   - [ ] The pull request body describes changes that have been made. 
   The description explains _WHAT_ and _WHY_ was made instead of _HOW_.
   - [ ] The pull request title is treated as the final commit message. 
   The following pattern must be used: `IGNITE-XXXX Change summary` where `XXXX` - number of JIRA issue.
   - [ ] A reviewer has been mentioned through the JIRA comments 
   (see [the Maintainers list](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute#HowtoContribute-ReviewProcessandMaintainers)) 
   - [ ] The pull request has been checked by the Teamcity Bot and 
   the `green visa` attached to the JIRA ticket (see [TC.Bot: Check PR](https://mtcga.gridgain.com/prs.html))
   
   ### Notes
   - [How to Contribute](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute)
   - [Coding abbreviation rules](https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules)
   - [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines)
   - [Apache Ignite Teamcity Bot](https://cwiki.apache.org/confluence/display/IGNITE/Apache+Ignite+Teamcity+Bot)
   
   If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com _#ignite_ channel.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] tledkov-gridgain commented on a change in pull request #9046: IGNITE-12991: Calcite integration. Query Cancellation

Posted by GitBox <gi...@apache.org>.
tledkov-gridgain commented on a change in pull request #9046:
URL: https://github.com/apache/ignite/pull/9046#discussion_r635033265



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
##########
@@ -36,4 +37,25 @@
      */
     List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext ctx, String schemaName, String qry, Object... params)
         throws IgniteSQLException;
+
+    /**
+     * Return SQL running queries.
+     *
+     * @return SQL running queries.
+     */
+    List<RunningQueryInfo> runningQueries();
+
+    /**
+     * Return running queries fragments.
+     *
+     * @return Running queries fragments.
+     */
+    List<RunningFragmentInfo> runningFragments();
+
+    /**
+     * Cancel specified query.
+     *
+     * @param qryId Queries ID to cancel.
+     */
+    void cancelQuery(UUID qryId);

Review comment:
       Great! I think the default behavior would be async.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] AMashenkov commented on a change in pull request #9046: IGNITE-12991: Calcite integration. Query Cancellation

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #9046:
URL: https://github.com/apache/ignite/pull/9046#discussion_r735601034



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
##########
@@ -277,16 +295,26 @@ public void execute(RunnableX task, Consumer<Throwable> onError) {
     }
 
     /**
-     * Sets cancel flag, returns {@code true} if flag was changed by this call.
+     * Sets finish flag, returns {@code true} if flag was changed by this call.
      *
      * @return {@code True} if flag was changed by this call.
      */
-    public boolean cancel() {
-        return !cancelFlag.get() && cancelFlag.compareAndSet(false, true);
+    public boolean finish() {
+        boolean finishedRightNow = !finishFlag.get() && finishFlag.compareAndSet(false, true);

Review comment:
       ```suggestion
           boolean finishedRightNow = finishFlag.compareAndSet(false, true);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ygerzhedovich commented on a change in pull request #9046: IGNITE-12991: Calcite integration. Query Cancellation

Posted by GitBox <gi...@apache.org>.
ygerzhedovich commented on a change in pull request #9046:
URL: https://github.com/apache/ignite/pull/9046#discussion_r635062259



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
##########
@@ -36,4 +37,25 @@
      */
     List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext ctx, String schemaName, String qry, Object... params)
         throws IgniteSQLException;
+
+    /**
+     * Return SQL running queries.
+     *
+     * @return SQL running queries.
+     */
+    List<RunningQueryInfo> runningQueries();
+
+    /**
+     * Return running queries fragments.
+     *
+     * @return Running queries fragments.
+     */
+    List<RunningFragmentInfo> runningFragments();
+
+    /**
+     * Cancel specified query.
+     *
+     * @param qryId Queries ID to cancel.
+     */
+    void cancelQuery(UUID qryId);

Review comment:
       Ok, I will create separate ticket 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ygerzhedovich commented on a change in pull request #9046: IGNITE-12991: Calcite integration. Query Cancellation

Posted by GitBox <gi...@apache.org>.
ygerzhedovich commented on a change in pull request #9046:
URL: https://github.com/apache/ignite/pull/9046#discussion_r634323532



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
##########
@@ -36,4 +37,25 @@
      */
     List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext ctx, String schemaName, String qry, Object... params)
         throws IgniteSQLException;
+
+    /**
+     * Return SQL running queries.
+     *
+     * @return SQL running queries.
+     */
+    List<RunningQueryInfo> runningQueries();
+
+    /**
+     * Return running queries fragments.
+     *
+     * @return Running queries fragments.
+     */
+    List<RunningFragmentInfo> runningFragments();
+
+    /**
+     * Cancel specified query.
+     *
+     * @param qryId Queries ID to cancel.
+     */
+    void cancelQuery(UUID qryId);

Review comment:
       Seems the return type can be a Future, WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Berkof commented on a change in pull request #9046: IGNITE-12991: Calcite integration. Query Cancellation

Posted by GitBox <gi...@apache.org>.
Berkof commented on a change in pull request #9046:
URL: https://github.com/apache/ignite/pull/9046#discussion_r737146994



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
##########
@@ -132,6 +137,9 @@
     /** */
     private final ExecutionService executionSvc;
 
+    /** */
+    private final RunningQueryService runningQryScv;

Review comment:
       ```suggestion
       private final RunningQueryService runningQrySvc;
   ```

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
##########
@@ -71,16 +71,31 @@
     private final ExpressionFactory<Row> expressionFactory;
 
     /** */
-    private final AtomicBoolean cancelFlag = new AtomicBoolean();
+    private final AtomicBoolean finishFlag = new AtomicBoolean();
 
     /** */
     private Object[] correlations = new Object[16];
 
+    /** */
+    private final RunnableX finishAct;
+
+    public ExecutionContext(

Review comment:
       JavaDoc

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RunningQueryService.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.internal.processors.query.RunningFragmentInfo;
+import org.apache.ignite.internal.processors.query.RunningQueryInfo;
+import org.apache.ignite.internal.processors.query.RunningStage;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Service;
+
+/** */
+public class RunningQueryService implements Service {
+    /** */
+    private final Map<UUID, RunningQueryInfo> queries;
+
+    /** */
+    private final Map<UUID, List<RunningFragmentInfo>> fragments;
+
+    /** */
+    public RunningQueryService() {
+        queries = new ConcurrentHashMap<>();
+        fragments = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Register running query on planing stage.
+     *
+     * @param pctx Planning context.
+     * @return Id of registered query.
+     */
+    public UUID register(PlanningContext pctx) {
+        UUID qryId = UUID.randomUUID();
+
+        RunningQueryInfo run = new RunningQueryInfo(
+            RunningStage.PLANNING,
+            qryId,
+            pctx.query(),
+            pctx.schemaName(),
+            pctx.queryCancel());
+
+        queries.put(run.qryId(), run);
+
+        return qryId;
+    }
+
+    /**
+     * Register running query on execution stage.
+     *
+     * @param qry Query text.
+     * @param pctx Planning context.
+     * @return Id of registered query.
+     */
+    public UUID register(String qry, PlanningContext pctx) {
+        UUID qryId = UUID.randomUUID();
+
+        RunningQueryInfo run = new RunningQueryInfo(
+            RunningStage.EXECUTION,
+            qryId,
+            qry,
+            pctx.schemaName(),
+            pctx.queryCancel());
+
+        RunningQueryInfo prev = queries.put(run.qryId(), run);
+
+        assert prev == null;
+
+        return qryId;
+    }
+
+    /**
+     * Deregister running query or their fragment.
+     *
+     * @param qryId id of the query, which is given by {@link #register register} method.

Review comment:
       ```suggestion
        * @param qryId Id of the query, which is given by {@link #register register} method.
   ```

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
##########
@@ -308,6 +319,20 @@ public MappingService mappingService() {
         return mappingSvc;
     }
 
+    /**
+     * @return Running query service.
+     */
+    public RunningQueryService RunningQueryService() {

Review comment:
       ```suggestion
       public RunningQueryService runningQueryService() {
   ```

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
##########
@@ -308,6 +319,20 @@ public MappingService mappingService() {
         return mappingSvc;
     }
 
+    /**
+     * @return Running query service.
+     */
+    public RunningQueryService RunningQueryService() {

Review comment:
       And method never used.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryInfo.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Represents information about running query.
+ */
+public class RunningQueryInfo {
+    /** Unique query identifier. */
+    private final UUID qryId;
+
+    /** Query text. */
+    private final String qry;
+
+    /** Schema name. */
+    private final String schemaName;
+
+    /**
+     *  Time of starting execution of query in milliseconds between the current time and midnight, January 1, 1970 UTC.
+     */
+    private final long startTime;
+
+    /** Query cancel. */
+    private final GridQueryCancel cancel;
+
+    /** Running stage. */
+    private final RunningStage stage;
+
+    /**
+     * Constructor.
+     *
+     * @param stage Stage of execution.
+     * @param qryId Unique query identifier.
+     * @param qry Query text.
+     * @param schemaName Schema name.
+     * @param cancel Query cancel.
+     */
+    public RunningQueryInfo(
+        RunningStage stage,
+        UUID qryId,
+        String qry,
+        String schemaName,
+        GridQueryCancel cancel) {
+        this.qryId = qryId;
+        this.qry = qry;
+        this.schemaName = schemaName;
+        this.cancel = cancel;
+        this.stage = stage;
+
+        startTime = U.currentTimeMillis();
+    }
+
+    /**
+     * Stage of query execution.

Review comment:
       ```suggestion
        * @return Stage of query execution.
   ```

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RunningQueryService.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.internal.processors.query.RunningFragmentInfo;
+import org.apache.ignite.internal.processors.query.RunningQueryInfo;
+import org.apache.ignite.internal.processors.query.RunningStage;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Service;
+
+/** */

Review comment:
       JavaDoc

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
##########
@@ -36,4 +37,25 @@
      */
     List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext ctx, String schemaName, String qry, Object... params)
         throws IgniteSQLException;
+
+//    /**

Review comment:
       Commented code

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryInfo.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Represents information about running query.
+ */
+public class RunningQueryInfo {
+    /** Unique query identifier. */
+    private final UUID qryId;
+
+    /** Query text. */
+    private final String qry;
+
+    /** Schema name. */
+    private final String schemaName;
+
+    /**
+     *  Time of starting execution of query in milliseconds between the current time and midnight, January 1, 1970 UTC.
+     */
+    private final long startTime;
+
+    /** Query cancel. */
+    private final GridQueryCancel cancel;
+
+    /** Running stage. */
+    private final RunningStage stage;
+
+    /**
+     * Constructor.
+     *
+     * @param stage Stage of execution.
+     * @param qryId Unique query identifier.
+     * @param qry Query text.
+     * @param schemaName Schema name.
+     * @param cancel Query cancel.
+     */
+    public RunningQueryInfo(
+        RunningStage stage,
+        UUID qryId,
+        String qry,
+        String schemaName,
+        GridQueryCancel cancel) {
+        this.qryId = qryId;
+        this.qry = qry;
+        this.schemaName = schemaName;
+        this.cancel = cancel;
+        this.stage = stage;
+
+        startTime = U.currentTimeMillis();
+    }
+
+    /**
+     * Stage of query execution.
+     */
+    public RunningStage stage() {
+        return stage;
+    }
+
+    /**
+     * @return Query ID.
+     */
+    public UUID qryId() {
+        return qryId;
+    }
+
+    /**
+     * @return Query text.
+     */
+    public String query() {
+        return qry;
+    }
+
+    /**
+     * @return Schema name.
+     */
+    public String schemaName() {
+        return schemaName;
+    }
+
+    /**
+     * @return Query start time.
+     */
+    public long startTime() {
+        return startTime;
+    }
+
+    /**
+     * Cancel query.
+     */
+    public void cancel() {
+        if (cancel != null)
+            cancel.cancel();
+    }
+
+    /**
+     * @return {@code true} if query can be cancelled.
+     */
+    public boolean cancelable() {

Review comment:
       This method is never used.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningFragmentInfo.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.UUID;
+
+/**
+ * Represents information about running query.
+ */
+public final class RunningFragmentInfo extends RunningQueryInfo {
+    /** Originating node ID (the node, who started the execution). */
+    private UUID originatingNodeId;
+
+    /** Fragment id. */
+    private long fragmentId;
+
+    /**
+     * Constructor.
+     *
+     * @param qryId Unique query identifier.
+     * @param qry Query text.
+     * @param schemaName Schema name.
+     * @param cancel Query cancel.
+     * @param originatingNodeId Originating node ID (the node, who started the execution).
+     * @param fragmentId Fragment id.
+     */
+    public RunningFragmentInfo(
+        UUID qryId,
+        String qry,
+        String schemaName,
+        GridQueryCancel cancel,
+        UUID originatingNodeId,
+        long fragmentId) {

Review comment:
       ") {" at new line, according to [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines#CodingGuidelines-MethodParameters)

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RunningQueryService.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.internal.processors.query.RunningFragmentInfo;
+import org.apache.ignite.internal.processors.query.RunningQueryInfo;
+import org.apache.ignite.internal.processors.query.RunningStage;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Service;
+
+/** */
+public class RunningQueryService implements Service {
+    /** */
+    private final Map<UUID, RunningQueryInfo> queries;
+
+    /** */
+    private final Map<UUID, List<RunningFragmentInfo>> fragments;
+
+    /** */
+    public RunningQueryService() {
+        queries = new ConcurrentHashMap<>();
+        fragments = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Register running query on planing stage.
+     *
+     * @param pctx Planning context.
+     * @return Id of registered query.
+     */
+    public UUID register(PlanningContext pctx) {
+        UUID qryId = UUID.randomUUID();
+
+        RunningQueryInfo run = new RunningQueryInfo(
+            RunningStage.PLANNING,
+            qryId,
+            pctx.query(),
+            pctx.schemaName(),
+            pctx.queryCancel());
+
+        queries.put(run.qryId(), run);
+
+        return qryId;
+    }
+
+    /**
+     * Register running query on execution stage.
+     *
+     * @param qry Query text.
+     * @param pctx Planning context.
+     * @return Id of registered query.
+     */
+    public UUID register(String qry, PlanningContext pctx) {
+        UUID qryId = UUID.randomUUID();
+
+        RunningQueryInfo run = new RunningQueryInfo(
+            RunningStage.EXECUTION,
+            qryId,
+            qry,
+            pctx.schemaName(),
+            pctx.queryCancel());
+
+        RunningQueryInfo prev = queries.put(run.qryId(), run);
+
+        assert prev == null;
+
+        return qryId;
+    }
+
+    /**
+     * Deregister running query or their fragment.
+     *
+     * @param qryId id of the query, which is given by {@link #register register} method.
+     */
+    public void deregister(UUID qryId) {
+        queries.remove(qryId);
+        fragments.remove(qryId);
+    }
+
+    /**
+     * Register running fragment of query
+     *
+     * @param qryId id of the query related to fragment,
+     * which is given by {@link #register register} method on originator node.
+     * @param pctx Planning context.
+     */
+    public synchronized void registerFragment(UUID qryId, long fragmentId, PlanningContext pctx) {
+        RunningFragmentInfo fragmentInfo = new RunningFragmentInfo(
+            qryId,
+            pctx.query(),
+            pctx.schemaName(),
+            pctx.queryCancel(),
+            pctx.originatingNodeId(),
+            fragmentId);
+
+        fragments.computeIfAbsent(qryId, k -> new ArrayList<>()).add(fragmentInfo);
+    }
+
+    /**
+     * Deregister running fragment
+     *
+     * @param qryId id of the query, which is given by {@link #register register} method.
+     * @param fragmentId if of fragment related to query id.
+     */
+    public synchronized void deregisterFragment(UUID qryId, long fragmentId) {
+        List<RunningFragmentInfo> frs = fragments.get(qryId);

Review comment:
       why not to use computeIfPresent without synchronizing method? 

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RunningQueryService.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.internal.processors.query.RunningFragmentInfo;
+import org.apache.ignite.internal.processors.query.RunningQueryInfo;
+import org.apache.ignite.internal.processors.query.RunningStage;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Service;
+
+/** */
+public class RunningQueryService implements Service {
+    /** */
+    private final Map<UUID, RunningQueryInfo> queries;
+
+    /** */
+    private final Map<UUID, List<RunningFragmentInfo>> fragments;
+
+    /** */
+    public RunningQueryService() {
+        queries = new ConcurrentHashMap<>();
+        fragments = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Register running query on planing stage.
+     *
+     * @param pctx Planning context.
+     * @return Id of registered query.
+     */
+    public UUID register(PlanningContext pctx) {
+        UUID qryId = UUID.randomUUID();
+
+        RunningQueryInfo run = new RunningQueryInfo(
+            RunningStage.PLANNING,
+            qryId,
+            pctx.query(),
+            pctx.schemaName(),
+            pctx.queryCancel());
+
+        queries.put(run.qryId(), run);
+
+        return qryId;
+    }
+
+    /**
+     * Register running query on execution stage.
+     *
+     * @param qry Query text.
+     * @param pctx Planning context.
+     * @return Id of registered query.
+     */
+    public UUID register(String qry, PlanningContext pctx) {
+        UUID qryId = UUID.randomUUID();
+
+        RunningQueryInfo run = new RunningQueryInfo(
+            RunningStage.EXECUTION,
+            qryId,
+            qry,
+            pctx.schemaName(),
+            pctx.queryCancel());
+
+        RunningQueryInfo prev = queries.put(run.qryId(), run);
+
+        assert prev == null;
+
+        return qryId;
+    }
+
+    /**
+     * Deregister running query or their fragment.
+     *
+     * @param qryId id of the query, which is given by {@link #register register} method.
+     */
+    public void deregister(UUID qryId) {
+        queries.remove(qryId);
+        fragments.remove(qryId);
+    }
+
+    /**
+     * Register running fragment of query
+     *
+     * @param qryId id of the query related to fragment,

Review comment:
       ```suggestion
        * @param qryId Id of the query related to fragment,
   ```

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCancelRequest.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.message;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *

Review comment:
       JavaDoc

##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteTestUtils.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.List;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+public class CalciteTestUtils {

Review comment:
       JavaDoc

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
##########
@@ -422,14 +425,23 @@ public Builder logger(@NotNull IgniteLogger log) {
             return this;
         }
 
+        /**
+         * @param qryCancel Query Cancel hook.
+         * @return Builder for chaining.
+         */
+        public Builder qryCancel(GridQueryCancel qryCancel) {
+            this.qryCancel = qryCancel;
+            return this;

Review comment:
       Empty line before return.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RunningQueryService.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.internal.processors.query.RunningFragmentInfo;
+import org.apache.ignite.internal.processors.query.RunningQueryInfo;
+import org.apache.ignite.internal.processors.query.RunningStage;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Service;
+
+/** */
+public class RunningQueryService implements Service {
+    /** */
+    private final Map<UUID, RunningQueryInfo> queries;
+
+    /** */
+    private final Map<UUID, List<RunningFragmentInfo>> fragments;
+
+    /** */
+    public RunningQueryService() {
+        queries = new ConcurrentHashMap<>();
+        fragments = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Register running query on planing stage.
+     *
+     * @param pctx Planning context.
+     * @return Id of registered query.
+     */
+    public UUID register(PlanningContext pctx) {
+        UUID qryId = UUID.randomUUID();
+
+        RunningQueryInfo run = new RunningQueryInfo(
+            RunningStage.PLANNING,
+            qryId,
+            pctx.query(),
+            pctx.schemaName(),
+            pctx.queryCancel());
+
+        queries.put(run.qryId(), run);
+
+        return qryId;
+    }
+
+    /**
+     * Register running query on execution stage.
+     *
+     * @param qry Query text.
+     * @param pctx Planning context.
+     * @return Id of registered query.
+     */
+    public UUID register(String qry, PlanningContext pctx) {
+        UUID qryId = UUID.randomUUID();
+
+        RunningQueryInfo run = new RunningQueryInfo(
+            RunningStage.EXECUTION,
+            qryId,
+            qry,
+            pctx.schemaName(),
+            pctx.queryCancel());
+
+        RunningQueryInfo prev = queries.put(run.qryId(), run);
+
+        assert prev == null;
+
+        return qryId;
+    }
+
+    /**
+     * Deregister running query or their fragment.
+     *
+     * @param qryId id of the query, which is given by {@link #register register} method.
+     */
+    public void deregister(UUID qryId) {
+        queries.remove(qryId);
+        fragments.remove(qryId);
+    }
+
+    /**
+     * Register running fragment of query
+     *
+     * @param qryId id of the query related to fragment,
+     * which is given by {@link #register register} method on originator node.
+     * @param pctx Planning context.
+     */
+    public synchronized void registerFragment(UUID qryId, long fragmentId, PlanningContext pctx) {
+        RunningFragmentInfo fragmentInfo = new RunningFragmentInfo(
+            qryId,
+            pctx.query(),
+            pctx.schemaName(),
+            pctx.queryCancel(),
+            pctx.originatingNodeId(),
+            fragmentId);
+
+        fragments.computeIfAbsent(qryId, k -> new ArrayList<>()).add(fragmentInfo);
+    }
+
+    /**
+     * Deregister running fragment
+     *
+     * @param qryId id of the query, which is given by {@link #register register} method.
+     * @param fragmentId if of fragment related to query id.
+     */
+    public synchronized void deregisterFragment(UUID qryId, long fragmentId) {
+        List<RunningFragmentInfo> frs = fragments.get(qryId);
+        if (frs != null && !frs.isEmpty()) {

Review comment:
       Empty line before if.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
##########
@@ -390,6 +394,21 @@ protected VolcanoPlannerExt(RelOptCostFactory costFactory, Context externalCtx)
             setTopDownOpt(true);
         }
 
+        /** */
+        public void registerCancelHook(PlanningContext externalCtx) {
+            try {
+                GridQueryCancel cancel = externalCtx.queryCancel();
+
+                if (cancel != null)

Review comment:
       ```suggestion
                   if (cancel != null) {
   ```

##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/GridCommonCalciteAbstractTest.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.RunningFragmentInfo;
+import org.apache.ignite.internal.processors.query.RunningQueryInfo;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+import static org.apache.ignite.internal.processors.query.calcite.CalciteTestUtils.queryProcessor;
+
+/**
+ *

Review comment:
       javaDoc




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org