You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/10/27 07:58:09 UTC

[GitHub] [ignite] Berkof commented on a change in pull request #9046: IGNITE-12991: Calcite integration. Query Cancellation

Berkof commented on a change in pull request #9046:
URL: https://github.com/apache/ignite/pull/9046#discussion_r737146994



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
##########
@@ -132,6 +137,9 @@
     /** */
     private final ExecutionService executionSvc;
 
+    /** */
+    private final RunningQueryService runningQryScv;

Review comment:
       ```suggestion
       private final RunningQueryService runningQrySvc;
   ```

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
##########
@@ -71,16 +71,31 @@
     private final ExpressionFactory<Row> expressionFactory;
 
     /** */
-    private final AtomicBoolean cancelFlag = new AtomicBoolean();
+    private final AtomicBoolean finishFlag = new AtomicBoolean();
 
     /** */
     private Object[] correlations = new Object[16];
 
+    /** */
+    private final RunnableX finishAct;
+
+    public ExecutionContext(

Review comment:
       JavaDoc

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RunningQueryService.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.internal.processors.query.RunningFragmentInfo;
+import org.apache.ignite.internal.processors.query.RunningQueryInfo;
+import org.apache.ignite.internal.processors.query.RunningStage;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Service;
+
+/** */
+public class RunningQueryService implements Service {
+    /** */
+    private final Map<UUID, RunningQueryInfo> queries;
+
+    /** */
+    private final Map<UUID, List<RunningFragmentInfo>> fragments;
+
+    /** */
+    public RunningQueryService() {
+        queries = new ConcurrentHashMap<>();
+        fragments = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Register running query on planing stage.
+     *
+     * @param pctx Planning context.
+     * @return Id of registered query.
+     */
+    public UUID register(PlanningContext pctx) {
+        UUID qryId = UUID.randomUUID();
+
+        RunningQueryInfo run = new RunningQueryInfo(
+            RunningStage.PLANNING,
+            qryId,
+            pctx.query(),
+            pctx.schemaName(),
+            pctx.queryCancel());
+
+        queries.put(run.qryId(), run);
+
+        return qryId;
+    }
+
+    /**
+     * Register running query on execution stage.
+     *
+     * @param qry Query text.
+     * @param pctx Planning context.
+     * @return Id of registered query.
+     */
+    public UUID register(String qry, PlanningContext pctx) {
+        UUID qryId = UUID.randomUUID();
+
+        RunningQueryInfo run = new RunningQueryInfo(
+            RunningStage.EXECUTION,
+            qryId,
+            qry,
+            pctx.schemaName(),
+            pctx.queryCancel());
+
+        RunningQueryInfo prev = queries.put(run.qryId(), run);
+
+        assert prev == null;
+
+        return qryId;
+    }
+
+    /**
+     * Deregister running query or their fragment.
+     *
+     * @param qryId id of the query, which is given by {@link #register register} method.

Review comment:
       ```suggestion
        * @param qryId Id of the query, which is given by {@link #register register} method.
   ```

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
##########
@@ -308,6 +319,20 @@ public MappingService mappingService() {
         return mappingSvc;
     }
 
+    /**
+     * @return Running query service.
+     */
+    public RunningQueryService RunningQueryService() {

Review comment:
       ```suggestion
       public RunningQueryService runningQueryService() {
   ```

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
##########
@@ -308,6 +319,20 @@ public MappingService mappingService() {
         return mappingSvc;
     }
 
+    /**
+     * @return Running query service.
+     */
+    public RunningQueryService RunningQueryService() {

Review comment:
       And method never used.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryInfo.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Represents information about running query.
+ */
+public class RunningQueryInfo {
+    /** Unique query identifier. */
+    private final UUID qryId;
+
+    /** Query text. */
+    private final String qry;
+
+    /** Schema name. */
+    private final String schemaName;
+
+    /**
+     *  Time of starting execution of query in milliseconds between the current time and midnight, January 1, 1970 UTC.
+     */
+    private final long startTime;
+
+    /** Query cancel. */
+    private final GridQueryCancel cancel;
+
+    /** Running stage. */
+    private final RunningStage stage;
+
+    /**
+     * Constructor.
+     *
+     * @param stage Stage of execution.
+     * @param qryId Unique query identifier.
+     * @param qry Query text.
+     * @param schemaName Schema name.
+     * @param cancel Query cancel.
+     */
+    public RunningQueryInfo(
+        RunningStage stage,
+        UUID qryId,
+        String qry,
+        String schemaName,
+        GridQueryCancel cancel) {
+        this.qryId = qryId;
+        this.qry = qry;
+        this.schemaName = schemaName;
+        this.cancel = cancel;
+        this.stage = stage;
+
+        startTime = U.currentTimeMillis();
+    }
+
+    /**
+     * Stage of query execution.

Review comment:
       ```suggestion
        * @return Stage of query execution.
   ```

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RunningQueryService.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.internal.processors.query.RunningFragmentInfo;
+import org.apache.ignite.internal.processors.query.RunningQueryInfo;
+import org.apache.ignite.internal.processors.query.RunningStage;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Service;
+
+/** */

Review comment:
       JavaDoc

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
##########
@@ -36,4 +37,25 @@
      */
     List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext ctx, String schemaName, String qry, Object... params)
         throws IgniteSQLException;
+
+//    /**

Review comment:
       Commented code

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryInfo.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Represents information about running query.
+ */
+public class RunningQueryInfo {
+    /** Unique query identifier. */
+    private final UUID qryId;
+
+    /** Query text. */
+    private final String qry;
+
+    /** Schema name. */
+    private final String schemaName;
+
+    /**
+     *  Time of starting execution of query in milliseconds between the current time and midnight, January 1, 1970 UTC.
+     */
+    private final long startTime;
+
+    /** Query cancel. */
+    private final GridQueryCancel cancel;
+
+    /** Running stage. */
+    private final RunningStage stage;
+
+    /**
+     * Constructor.
+     *
+     * @param stage Stage of execution.
+     * @param qryId Unique query identifier.
+     * @param qry Query text.
+     * @param schemaName Schema name.
+     * @param cancel Query cancel.
+     */
+    public RunningQueryInfo(
+        RunningStage stage,
+        UUID qryId,
+        String qry,
+        String schemaName,
+        GridQueryCancel cancel) {
+        this.qryId = qryId;
+        this.qry = qry;
+        this.schemaName = schemaName;
+        this.cancel = cancel;
+        this.stage = stage;
+
+        startTime = U.currentTimeMillis();
+    }
+
+    /**
+     * Stage of query execution.
+     */
+    public RunningStage stage() {
+        return stage;
+    }
+
+    /**
+     * @return Query ID.
+     */
+    public UUID qryId() {
+        return qryId;
+    }
+
+    /**
+     * @return Query text.
+     */
+    public String query() {
+        return qry;
+    }
+
+    /**
+     * @return Schema name.
+     */
+    public String schemaName() {
+        return schemaName;
+    }
+
+    /**
+     * @return Query start time.
+     */
+    public long startTime() {
+        return startTime;
+    }
+
+    /**
+     * Cancel query.
+     */
+    public void cancel() {
+        if (cancel != null)
+            cancel.cancel();
+    }
+
+    /**
+     * @return {@code true} if query can be cancelled.
+     */
+    public boolean cancelable() {

Review comment:
       This method is never used.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningFragmentInfo.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.UUID;
+
+/**
+ * Represents information about running query.
+ */
+public final class RunningFragmentInfo extends RunningQueryInfo {
+    /** Originating node ID (the node, who started the execution). */
+    private UUID originatingNodeId;
+
+    /** Fragment id. */
+    private long fragmentId;
+
+    /**
+     * Constructor.
+     *
+     * @param qryId Unique query identifier.
+     * @param qry Query text.
+     * @param schemaName Schema name.
+     * @param cancel Query cancel.
+     * @param originatingNodeId Originating node ID (the node, who started the execution).
+     * @param fragmentId Fragment id.
+     */
+    public RunningFragmentInfo(
+        UUID qryId,
+        String qry,
+        String schemaName,
+        GridQueryCancel cancel,
+        UUID originatingNodeId,
+        long fragmentId) {

Review comment:
       ") {" at new line, according to [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines#CodingGuidelines-MethodParameters)

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RunningQueryService.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.internal.processors.query.RunningFragmentInfo;
+import org.apache.ignite.internal.processors.query.RunningQueryInfo;
+import org.apache.ignite.internal.processors.query.RunningStage;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Service;
+
+/** */
+public class RunningQueryService implements Service {
+    /** */
+    private final Map<UUID, RunningQueryInfo> queries;
+
+    /** */
+    private final Map<UUID, List<RunningFragmentInfo>> fragments;
+
+    /** */
+    public RunningQueryService() {
+        queries = new ConcurrentHashMap<>();
+        fragments = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Register running query on planing stage.
+     *
+     * @param pctx Planning context.
+     * @return Id of registered query.
+     */
+    public UUID register(PlanningContext pctx) {
+        UUID qryId = UUID.randomUUID();
+
+        RunningQueryInfo run = new RunningQueryInfo(
+            RunningStage.PLANNING,
+            qryId,
+            pctx.query(),
+            pctx.schemaName(),
+            pctx.queryCancel());
+
+        queries.put(run.qryId(), run);
+
+        return qryId;
+    }
+
+    /**
+     * Register running query on execution stage.
+     *
+     * @param qry Query text.
+     * @param pctx Planning context.
+     * @return Id of registered query.
+     */
+    public UUID register(String qry, PlanningContext pctx) {
+        UUID qryId = UUID.randomUUID();
+
+        RunningQueryInfo run = new RunningQueryInfo(
+            RunningStage.EXECUTION,
+            qryId,
+            qry,
+            pctx.schemaName(),
+            pctx.queryCancel());
+
+        RunningQueryInfo prev = queries.put(run.qryId(), run);
+
+        assert prev == null;
+
+        return qryId;
+    }
+
+    /**
+     * Deregister running query or their fragment.
+     *
+     * @param qryId id of the query, which is given by {@link #register register} method.
+     */
+    public void deregister(UUID qryId) {
+        queries.remove(qryId);
+        fragments.remove(qryId);
+    }
+
+    /**
+     * Register running fragment of query
+     *
+     * @param qryId id of the query related to fragment,
+     * which is given by {@link #register register} method on originator node.
+     * @param pctx Planning context.
+     */
+    public synchronized void registerFragment(UUID qryId, long fragmentId, PlanningContext pctx) {
+        RunningFragmentInfo fragmentInfo = new RunningFragmentInfo(
+            qryId,
+            pctx.query(),
+            pctx.schemaName(),
+            pctx.queryCancel(),
+            pctx.originatingNodeId(),
+            fragmentId);
+
+        fragments.computeIfAbsent(qryId, k -> new ArrayList<>()).add(fragmentInfo);
+    }
+
+    /**
+     * Deregister running fragment
+     *
+     * @param qryId id of the query, which is given by {@link #register register} method.
+     * @param fragmentId if of fragment related to query id.
+     */
+    public synchronized void deregisterFragment(UUID qryId, long fragmentId) {
+        List<RunningFragmentInfo> frs = fragments.get(qryId);

Review comment:
       why not to use computeIfPresent without synchronizing method? 

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RunningQueryService.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.internal.processors.query.RunningFragmentInfo;
+import org.apache.ignite.internal.processors.query.RunningQueryInfo;
+import org.apache.ignite.internal.processors.query.RunningStage;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Service;
+
+/** */
+public class RunningQueryService implements Service {
+    /** */
+    private final Map<UUID, RunningQueryInfo> queries;
+
+    /** */
+    private final Map<UUID, List<RunningFragmentInfo>> fragments;
+
+    /** */
+    public RunningQueryService() {
+        queries = new ConcurrentHashMap<>();
+        fragments = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Register running query on planing stage.
+     *
+     * @param pctx Planning context.
+     * @return Id of registered query.
+     */
+    public UUID register(PlanningContext pctx) {
+        UUID qryId = UUID.randomUUID();
+
+        RunningQueryInfo run = new RunningQueryInfo(
+            RunningStage.PLANNING,
+            qryId,
+            pctx.query(),
+            pctx.schemaName(),
+            pctx.queryCancel());
+
+        queries.put(run.qryId(), run);
+
+        return qryId;
+    }
+
+    /**
+     * Register running query on execution stage.
+     *
+     * @param qry Query text.
+     * @param pctx Planning context.
+     * @return Id of registered query.
+     */
+    public UUID register(String qry, PlanningContext pctx) {
+        UUID qryId = UUID.randomUUID();
+
+        RunningQueryInfo run = new RunningQueryInfo(
+            RunningStage.EXECUTION,
+            qryId,
+            qry,
+            pctx.schemaName(),
+            pctx.queryCancel());
+
+        RunningQueryInfo prev = queries.put(run.qryId(), run);
+
+        assert prev == null;
+
+        return qryId;
+    }
+
+    /**
+     * Deregister running query or their fragment.
+     *
+     * @param qryId id of the query, which is given by {@link #register register} method.
+     */
+    public void deregister(UUID qryId) {
+        queries.remove(qryId);
+        fragments.remove(qryId);
+    }
+
+    /**
+     * Register running fragment of query
+     *
+     * @param qryId id of the query related to fragment,

Review comment:
       ```suggestion
        * @param qryId Id of the query related to fragment,
   ```

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCancelRequest.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.message;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *

Review comment:
       JavaDoc

##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteTestUtils.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.List;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+public class CalciteTestUtils {

Review comment:
       JavaDoc

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
##########
@@ -422,14 +425,23 @@ public Builder logger(@NotNull IgniteLogger log) {
             return this;
         }
 
+        /**
+         * @param qryCancel Query Cancel hook.
+         * @return Builder for chaining.
+         */
+        public Builder qryCancel(GridQueryCancel qryCancel) {
+            this.qryCancel = qryCancel;
+            return this;

Review comment:
       Empty line before return.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RunningQueryService.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.internal.processors.query.RunningFragmentInfo;
+import org.apache.ignite.internal.processors.query.RunningQueryInfo;
+import org.apache.ignite.internal.processors.query.RunningStage;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Service;
+
+/** */
+public class RunningQueryService implements Service {
+    /** */
+    private final Map<UUID, RunningQueryInfo> queries;
+
+    /** */
+    private final Map<UUID, List<RunningFragmentInfo>> fragments;
+
+    /** */
+    public RunningQueryService() {
+        queries = new ConcurrentHashMap<>();
+        fragments = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Register running query on planing stage.
+     *
+     * @param pctx Planning context.
+     * @return Id of registered query.
+     */
+    public UUID register(PlanningContext pctx) {
+        UUID qryId = UUID.randomUUID();
+
+        RunningQueryInfo run = new RunningQueryInfo(
+            RunningStage.PLANNING,
+            qryId,
+            pctx.query(),
+            pctx.schemaName(),
+            pctx.queryCancel());
+
+        queries.put(run.qryId(), run);
+
+        return qryId;
+    }
+
+    /**
+     * Register running query on execution stage.
+     *
+     * @param qry Query text.
+     * @param pctx Planning context.
+     * @return Id of registered query.
+     */
+    public UUID register(String qry, PlanningContext pctx) {
+        UUID qryId = UUID.randomUUID();
+
+        RunningQueryInfo run = new RunningQueryInfo(
+            RunningStage.EXECUTION,
+            qryId,
+            qry,
+            pctx.schemaName(),
+            pctx.queryCancel());
+
+        RunningQueryInfo prev = queries.put(run.qryId(), run);
+
+        assert prev == null;
+
+        return qryId;
+    }
+
+    /**
+     * Deregister running query or their fragment.
+     *
+     * @param qryId id of the query, which is given by {@link #register register} method.
+     */
+    public void deregister(UUID qryId) {
+        queries.remove(qryId);
+        fragments.remove(qryId);
+    }
+
+    /**
+     * Register running fragment of query
+     *
+     * @param qryId id of the query related to fragment,
+     * which is given by {@link #register register} method on originator node.
+     * @param pctx Planning context.
+     */
+    public synchronized void registerFragment(UUID qryId, long fragmentId, PlanningContext pctx) {
+        RunningFragmentInfo fragmentInfo = new RunningFragmentInfo(
+            qryId,
+            pctx.query(),
+            pctx.schemaName(),
+            pctx.queryCancel(),
+            pctx.originatingNodeId(),
+            fragmentId);
+
+        fragments.computeIfAbsent(qryId, k -> new ArrayList<>()).add(fragmentInfo);
+    }
+
+    /**
+     * Deregister running fragment
+     *
+     * @param qryId id of the query, which is given by {@link #register register} method.
+     * @param fragmentId if of fragment related to query id.
+     */
+    public synchronized void deregisterFragment(UUID qryId, long fragmentId) {
+        List<RunningFragmentInfo> frs = fragments.get(qryId);
+        if (frs != null && !frs.isEmpty()) {

Review comment:
       Empty line before if.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
##########
@@ -390,6 +394,21 @@ protected VolcanoPlannerExt(RelOptCostFactory costFactory, Context externalCtx)
             setTopDownOpt(true);
         }
 
+        /** */
+        public void registerCancelHook(PlanningContext externalCtx) {
+            try {
+                GridQueryCancel cancel = externalCtx.queryCancel();
+
+                if (cancel != null)

Review comment:
       ```suggestion
                   if (cancel != null) {
   ```

##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/GridCommonCalciteAbstractTest.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.RunningFragmentInfo;
+import org.apache.ignite.internal.processors.query.RunningQueryInfo;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+import static org.apache.ignite.internal.processors.query.calcite.CalciteTestUtils.queryProcessor;
+
+/**
+ *

Review comment:
       javaDoc




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org