You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2022/03/22 09:05:27 UTC

[ignite] branch sql-calcite updated: IGNITE-16722 Fix initiator ID in running queries view - Fixes #9905.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new 090f78f  IGNITE-16722 Fix initiator ID in running queries view - Fixes #9905.
090f78f is described below

commit 090f78f8dda5acf509658682e6dc6f20131cb438
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Tue Mar 22 12:03:16 2022 +0300

    IGNITE-16722 Fix initiator ID in running queries view - Fixes #9905.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../query/calcite/QueryRegistryImpl.java           | 32 ++++++++-------
 .../processors/query/calcite/RootQuery.java        | 13 ++++++
 .../integration/RunningQueriesIntegrationTest.java | 48 ++++++++++++++++++++++
 3 files changed, 79 insertions(+), 14 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
index 82e93e5..58c5df1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
@@ -21,9 +21,8 @@ import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.stream.Collectors;
-import org.apache.calcite.util.Pair;
 import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
@@ -37,7 +36,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
  */
 public class QueryRegistryImpl extends AbstractService implements QueryRegistry {
     /** */
-    private final ConcurrentMap<UUID, Pair<Long, RunningQuery>> runningQrys = new ConcurrentHashMap<>();
+    private final ConcurrentMap<UUID, RunningQuery> runningQrys = new ConcurrentHashMap<>();
 
     /** */
     protected final GridKernalContext kctx;
@@ -53,40 +52,45 @@ public class QueryRegistryImpl extends AbstractService implements QueryRegistry
     @Override public RunningQuery register(RunningQuery qry) {
         return runningQrys.computeIfAbsent(qry.id(), k -> {
             if (!(qry instanceof RootQuery))
-                return Pair.of(RunningQueryManager.UNDEFINED_QUERY_ID, qry);
+                return qry;
 
             RootQuery<?> rootQry = (RootQuery<?>)qry;
 
             RunningQueryManager qryMgr = kctx.query().runningQueryManager();
 
+            SqlFieldsQuery fieldsQry = rootQry.context().unwrap(SqlFieldsQuery.class);
+
+            String initiatorId = fieldsQry != null ? fieldsQry.getQueryInitiatorId() : null;
+
             long locId = qryMgr.register(rootQry.sql(), GridCacheQueryType.SQL_FIELDS, rootQry.context().schemaName(),
-                false, createCancelToken(qry), kctx.localNodeId().toString());
+                false, createCancelToken(qry), initiatorId);
+
+            rootQry.localQueryId(locId);
 
-            return Pair.of(locId, qry);
-        }).right;
+            return qry;
+        });
     }
 
     /** {@inheritDoc} */
     @Override public RunningQuery query(UUID id) {
-        Pair<Long, RunningQuery> value = runningQrys.get(id);
-        return value != null ? value.right : null;
+        return runningQrys.get(id);
     }
 
     /** {@inheritDoc} */
     @Override public void unregister(UUID id) {
-        Pair<Long, RunningQuery> value = runningQrys.remove(id);
-        if (value != null)
-            kctx.query().runningQueryManager().unregister(value.left, null);
+        RunningQuery val = runningQrys.remove(id);
+        if (val instanceof RootQuery<?>)
+            kctx.query().runningQueryManager().unregister(((RootQuery<?>)val).localQueryId(), null);
     }
 
     /** {@inheritDoc} */
     @Override public Collection<? extends RunningQuery> runningQueries() {
-        return runningQrys.values().stream().map(Pair::getValue).collect(Collectors.toList());
+        return runningQrys.values();
     }
 
     /** {@inheritDoc} */
     @Override public void tearDown() {
-        runningQrys.values().forEach(q -> IgniteUtils.close(q.right::cancel, log));
+        runningQrys.values().forEach(q -> IgniteUtils.close(q::cancel, log));
         runningQrys.clear();
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
index 868ebbc..1b81f63 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
@@ -86,6 +86,9 @@ public class RootQuery<RowT> extends Query<RowT> {
     private final long plannerTimeout;
 
     /** */
+    private volatile long locQryId;
+
+    /** */
     public RootQuery(
         String sql,
         SchemaPlus schema,
@@ -318,6 +321,16 @@ public class RootQuery<RowT> extends Query<RowT> {
     }
 
     /** */
+    public long localQueryId() {
+        return locQryId;
+    }
+
+    /** */
+    public void localQueryId(long locQryId) {
+        this.locQryId = locQryId;
+    }
+
+    /** */
     @Override public void onNodeLeft(UUID nodeId) {
         List<RemoteFragmentKey> fragments = null;
 
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
index f7b83b6..7df14b3 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
@@ -34,6 +34,8 @@ import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -48,6 +50,8 @@ import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableImpl
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteCacheTable;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.systemview.view.SqlQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.jetbrains.annotations.NotNull;
@@ -57,6 +61,7 @@ import org.junit.Test;
 
 import static java.util.stream.Collectors.joining;
 import static org.apache.ignite.IgniteSystemProperties.getLong;
+import static org.apache.ignite.internal.processors.query.RunningQueryManager.SQL_QRY_VIEW;
 import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_PLANNER_TIMEOUT;
 
 /**
@@ -282,4 +287,47 @@ public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest
             assertTrue("Unexpected exception: " + e, e instanceof RelOptPlanner.CannotPlanException);
         }
     }
+
+    /**
+     * Test propagation of query initiator ID.
+     */
+    @Test
+    public void testQueryInitiator() throws IgniteCheckedException {
+        CalciteQueryProcessor engine = queryProcessor(client);
+
+        sql("CREATE TABLE t(id int, val varchar) WITH cache_name=\"cache\"");
+
+        IgniteCacheTable tbl = (IgniteCacheTable)engine.schemaHolder().schema("PUBLIC").getTable("T");
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        tbl.addIndex(new DelegatingIgniteIndex(tbl.getIndex(QueryUtils.PRIMARY_KEY_INDEX)) {
+            @Override public RelCollation collation() {
+                try {
+                    latch.await(getTestTimeout(), TimeUnit.MILLISECONDS);
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteException(e);
+                }
+
+                return delegate.collation();
+            }
+        });
+
+        String initiatorId = "initiator";
+
+        GridTestUtils.runAsync(() -> client.cache("cache").query(new SqlFieldsQuery("SELECT * FROM t")
+            .setQueryInitiatorId(initiatorId)));
+
+        try {
+            SystemView<SqlQueryView> view = client.context().systemView().view(SQL_QRY_VIEW);
+
+            assertTrue(GridTestUtils.waitForCondition(() -> !F.isEmpty(view), 1_000));
+
+            assertEquals(1, F.size(view.iterator(), v -> initiatorId.equals(v.initiatorId())));
+        }
+        finally {
+            latch.countDown();
+        }
+    }
 }