You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2022/03/22 09:05:27 UTC
[ignite] branch sql-calcite updated: IGNITE-16722 Fix initiator ID in running queries view - Fixes #9905.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new 090f78f IGNITE-16722 Fix initiator ID in running queries view - Fixes #9905.
090f78f is described below
commit 090f78f8dda5acf509658682e6dc6f20131cb438
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Tue Mar 22 12:03:16 2022 +0300
IGNITE-16722 Fix initiator ID in running queries view - Fixes #9905.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../query/calcite/QueryRegistryImpl.java | 32 ++++++++-------
.../processors/query/calcite/RootQuery.java | 13 ++++++
.../integration/RunningQueriesIntegrationTest.java | 48 ++++++++++++++++++++++
3 files changed, 79 insertions(+), 14 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
index 82e93e5..58c5df1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
@@ -21,9 +21,8 @@ import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.stream.Collectors;
-import org.apache.calcite.util.Pair;
import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
@@ -37,7 +36,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
*/
public class QueryRegistryImpl extends AbstractService implements QueryRegistry {
/** */
- private final ConcurrentMap<UUID, Pair<Long, RunningQuery>> runningQrys = new ConcurrentHashMap<>();
+ private final ConcurrentMap<UUID, RunningQuery> runningQrys = new ConcurrentHashMap<>();
/** */
protected final GridKernalContext kctx;
@@ -53,40 +52,45 @@ public class QueryRegistryImpl extends AbstractService implements QueryRegistry
@Override public RunningQuery register(RunningQuery qry) {
return runningQrys.computeIfAbsent(qry.id(), k -> {
if (!(qry instanceof RootQuery))
- return Pair.of(RunningQueryManager.UNDEFINED_QUERY_ID, qry);
+ return qry;
RootQuery<?> rootQry = (RootQuery<?>)qry;
RunningQueryManager qryMgr = kctx.query().runningQueryManager();
+ SqlFieldsQuery fieldsQry = rootQry.context().unwrap(SqlFieldsQuery.class);
+
+ String initiatorId = fieldsQry != null ? fieldsQry.getQueryInitiatorId() : null;
+
long locId = qryMgr.register(rootQry.sql(), GridCacheQueryType.SQL_FIELDS, rootQry.context().schemaName(),
- false, createCancelToken(qry), kctx.localNodeId().toString());
+ false, createCancelToken(qry), initiatorId);
+
+ rootQry.localQueryId(locId);
- return Pair.of(locId, qry);
- }).right;
+ return qry;
+ });
}
/** {@inheritDoc} */
@Override public RunningQuery query(UUID id) {
- Pair<Long, RunningQuery> value = runningQrys.get(id);
- return value != null ? value.right : null;
+ return runningQrys.get(id);
}
/** {@inheritDoc} */
@Override public void unregister(UUID id) {
- Pair<Long, RunningQuery> value = runningQrys.remove(id);
- if (value != null)
- kctx.query().runningQueryManager().unregister(value.left, null);
+ RunningQuery val = runningQrys.remove(id);
+ if (val instanceof RootQuery<?>)
+ kctx.query().runningQueryManager().unregister(((RootQuery<?>)val).localQueryId(), null);
}
/** {@inheritDoc} */
@Override public Collection<? extends RunningQuery> runningQueries() {
- return runningQrys.values().stream().map(Pair::getValue).collect(Collectors.toList());
+ return runningQrys.values();
}
/** {@inheritDoc} */
@Override public void tearDown() {
- runningQrys.values().forEach(q -> IgniteUtils.close(q.right::cancel, log));
+ runningQrys.values().forEach(q -> IgniteUtils.close(q::cancel, log));
runningQrys.clear();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
index 868ebbc..1b81f63 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
@@ -86,6 +86,9 @@ public class RootQuery<RowT> extends Query<RowT> {
private final long plannerTimeout;
/** */
+ private volatile long locQryId;
+
+ /** */
public RootQuery(
String sql,
SchemaPlus schema,
@@ -318,6 +321,16 @@ public class RootQuery<RowT> extends Query<RowT> {
}
/** */
+ public long localQueryId() {
+ return locQryId;
+ }
+
+ /** */
+ public void localQueryId(long locQryId) {
+ this.locQryId = locQryId;
+ }
+
+ /** */
@Override public void onNodeLeft(UUID nodeId) {
List<RemoteFragmentKey> fragments = null;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
index f7b83b6..7df14b3 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
@@ -34,6 +34,8 @@ import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -48,6 +50,8 @@ import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableImpl
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteCacheTable;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.systemview.view.SqlQueryView;
+import org.apache.ignite.spi.systemview.view.SystemView;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.jetbrains.annotations.NotNull;
@@ -57,6 +61,7 @@ import org.junit.Test;
import static java.util.stream.Collectors.joining;
import static org.apache.ignite.IgniteSystemProperties.getLong;
+import static org.apache.ignite.internal.processors.query.RunningQueryManager.SQL_QRY_VIEW;
import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_PLANNER_TIMEOUT;
/**
@@ -282,4 +287,47 @@ public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest
assertTrue("Unexpected exception: " + e, e instanceof RelOptPlanner.CannotPlanException);
}
}
+
+ /**
+ * Test propagation of query initiator ID.
+ */
+ @Test
+ public void testQueryInitiator() throws IgniteCheckedException {
+ CalciteQueryProcessor engine = queryProcessor(client);
+
+ sql("CREATE TABLE t(id int, val varchar) WITH cache_name=\"cache\"");
+
+ IgniteCacheTable tbl = (IgniteCacheTable)engine.schemaHolder().schema("PUBLIC").getTable("T");
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ tbl.addIndex(new DelegatingIgniteIndex(tbl.getIndex(QueryUtils.PRIMARY_KEY_INDEX)) {
+ @Override public RelCollation collation() {
+ try {
+ latch.await(getTestTimeout(), TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException(e);
+ }
+
+ return delegate.collation();
+ }
+ });
+
+ String initiatorId = "initiator";
+
+ GridTestUtils.runAsync(() -> client.cache("cache").query(new SqlFieldsQuery("SELECT * FROM t")
+ .setQueryInitiatorId(initiatorId)));
+
+ try {
+ SystemView<SqlQueryView> view = client.context().systemView().view(SQL_QRY_VIEW);
+
+ assertTrue(GridTestUtils.waitForCondition(() -> !F.isEmpty(view), 1_000));
+
+ assertEquals(1, F.size(view.iterator(), v -> initiatorId.equals(v.initiatorId())));
+ }
+ finally {
+ latch.countDown();
+ }
+ }
}