You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/11/09 08:15:56 UTC
[ignite] branch sql-calcite updated: IGNITE-12991 Calcite
integration. Introduce running query registry & cancellation refactoring
(#9476)
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new 9adf8e8 IGNITE-12991 Calcite integration. Introduce running query registry & cancellation refactoring (#9476)
9adf8e8 is described below
commit 9adf8e8e4b1ab5dfa95af42c5bb46d32459a1423
Author: Taras Ledkov <tl...@gridgain.com>
AuthorDate: Tue Nov 9 11:15:19 2021 +0300
IGNITE-12991 Calcite integration. Introduce running query registry & cancellation refactoring (#9476)
---
.../query/calcite/CalciteQueryProcessor.java | 140 ++++-
.../internal/processors/query/calcite/Query.java | 188 +++++++
.../ExecutionService.java => QueryRegistry.java} | 40 +-
.../query/calcite/QueryRegistryImpl.java | 70 +++
.../query/calcite/RemoteFragmentKey.java | 66 +++
.../processors/query/calcite/RootQuery.java | 348 ++++++++++++
.../processors/query/calcite/RunningFragment.java | 79 +++
.../query/calcite/exec/ExchangeService.java | 4 +-
.../query/calcite/exec/ExchangeServiceImpl.java | 41 +-
.../query/calcite/exec/ExecutionContext.java | 29 +-
.../query/calcite/exec/ExecutionService.java | 26 +-
.../query/calcite/exec/ExecutionServiceImpl.java | 619 ++++-----------------
.../query/calcite/exec/ddl/DdlCommandHandler.java | 33 +-
.../calcite/exec/ddl/NativeCommandHandler.java | 7 +-
.../processors/query/calcite/exec/rel/Outbox.java | 8 +-
.../query/calcite/message/MessageType.java | 2 +-
...boxCloseMessage.java => QueryCloseMessage.java} | 74 +--
.../calcite/prepare/AbstractMultiStepPlan.java | 5 +-
.../processors/query/calcite/prepare/DdlPlan.java | 5 +
.../query/calcite/prepare/MultiStepPlan.java | 3 +-
.../calcite/{exec => prepare}/PlannerHelper.java | 5 +-
.../query/calcite/prepare/PlanningContext.java | 8 +
.../PrepareService.java} | 10 +-
.../query/calcite/prepare/PrepareServiceImpl.java | 198 +++++++
.../query/calcite/prepare/QueryTemplate.java | 19 +-
.../AggregateExpandDistinctAggregatesRule.java | 1 +
.../query/calcite/schema/SchemaHolder.java | 5 +-
.../query/calcite/schema/SchemaHolderImpl.java | 10 +-
.../query/calcite/CalciteQueryProcessorTest.java | 12 +-
.../processors/query/calcite/CancelTest.java | 9 +
.../processors/query/calcite/QueryChecker.java | 2 +-
.../integration/AbstractBasicIntegrationTest.java | 20 +
.../integration/AbstractDdlIntegrationTest.java | 48 +-
.../integration/IndexDdlIntegrationTest.java | 38 +-
.../integration/KillCommandDdlIntegrationTest.java | 25 +-
.../integration/RunningQueriesIntegrationTest.java | 203 +++++++
.../calcite/integration/SetOpIntegrationTest.java | 1 +
.../integration/TableDdlIntegrationTest.java | 243 ++++----
.../integration/UserDdlIntegrationTest.java | 6 +-
.../query/calcite/planner/AbstractPlannerTest.java | 3 +-
.../query/calcite/planner/PlannerTest.java | 40 +-
.../internal/processors/query/GridQueryCancel.java | 5 +
.../internal/processors/query/NoOpQueryEngine.java | 12 +
.../internal/processors/query/QueryEngine.java | 9 +
.../internal/processors/query/QueryState.java} | 30 +-
.../internal/processors/query/RunningQuery.java} | 19 +-
.../ignite/internal/sql/SqlCommandProcessor.java | 14 +-
.../processors/query/h2/CommandProcessor.java | 2 +-
48 files changed, 1828 insertions(+), 956 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index d4fd450..28746f2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -17,7 +17,12 @@
package org.apache.ignite.internal.processors.query.calcite;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+import java.util.UUID;
+
import org.apache.calcite.DataContexts;
import org.apache.calcite.config.Lex;
import org.apache.calcite.config.NullCollation;
@@ -27,6 +32,9 @@ import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.hint.HintStrategyTable;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.fun.SqlLibrary;
import org.apache.calcite.sql.fun.SqlLibraryOperatorTableFactory;
import org.apache.calcite.sql.parser.SqlParser;
@@ -38,10 +46,12 @@ import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryContext;
import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.RunningQuery;
import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeServiceImpl;
@@ -59,7 +69,10 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityServ
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import org.apache.ignite.internal.processors.query.calcite.prepare.CacheKey;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteConvertletTable;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PrepareServiceImpl;
+import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteTypeCoercion;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCache;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCacheImpl;
@@ -72,6 +85,7 @@ import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrai
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTraitDef;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.LifecycleAware;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
import org.jetbrains.annotations.Nullable;
@@ -158,7 +172,13 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
private final MailboxRegistry mailboxRegistry;
/** */
- private final ExecutionService executionSvc;
+ private final ExecutionService<Object[]> executionSvc;
+
+ /** */
+ private final PrepareServiceImpl prepareSvc;
+
+ /** */
+ private final QueryRegistry qryReg;
/**
* @param ctx Kernal context.
@@ -176,6 +196,8 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
msgSvc = new MessageServiceImpl(ctx);
mappingSvc = new MappingServiceImpl(ctx);
exchangeSvc = new ExchangeServiceImpl(ctx);
+ prepareSvc = new PrepareServiceImpl(ctx);
+ qryReg = new QueryRegistryImpl(ctx.log(QueryRegistry.class));
}
/**
@@ -241,6 +263,11 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
return failureProcessor;
}
+ /** */
+ public PrepareServiceImpl prepareService() {
+ return prepareSvc;
+ }
+
/** {@inheritDoc} */
@Override public void start() {
onStart(ctx,
@@ -252,13 +279,15 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
taskExecutor,
mappingSvc,
qryPlanCache,
- exchangeSvc
+ exchangeSvc,
+ qryReg
);
}
/** {@inheritDoc} */
@Override public void stop(boolean cancel) {
onStop(
+ qryReg,
executionSvc,
mailboxRegistry,
partSvc,
@@ -272,10 +301,96 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
}
/** {@inheritDoc} */
- @Override public List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext qryCtx, @Nullable String schemaName,
- String qry, Object... params) throws IgniteSQLException {
+ @Override public List<FieldsQueryCursor<List<?>>> query(
+ @Nullable QueryContext qryCtx,
+ @Nullable String schemaName,
+ String sql,
+ Object... params
+ ) throws IgniteSQLException {
+ SchemaPlus schema = schemaHolder.schema(schemaName);
+
+ QueryPlan plan = queryPlanCache().queryPlan(new CacheKey(schema.getName(), sql));
+
+ if (plan != null) {
+ RootQuery<Object[]> qry = new RootQuery<>(
+ sql,
+ schema,
+ params,
+ qryCtx,
+ exchangeSvc,
+ (q) -> qryReg.unregister(q.id()),
+ log
+ );
+
+ qryReg.register(qry);
+
+ try {
+ return Collections.singletonList(executionSvc.executePlan(
+ qry,
+ plan
+ ));
+ }
+ catch (Exception e) {
+ boolean isCanceled = qry.isCancelled();
+
+ qry.cancel();
+
+ qryReg.unregister(qry.id());
+
+ if (isCanceled)
+ throw new IgniteSQLException("The query was cancelled while planning", IgniteQueryErrorCode.QUERY_CANCELED, e);
+ else
+ throw e;
+
+ }
+ }
+
+ SqlNodeList qryList = Commons.parse(sql, FRAMEWORK_CONFIG.getParserConfig());
+ List<FieldsQueryCursor<List<?>>> cursors = new ArrayList<>(qryList.size());
- return executionSvc.executeQuery(qryCtx, schemaName, qry, params);
+ List<RootQuery<Object[]>> qrys = new ArrayList<>(qryList.size());
+
+ for (final SqlNode sqlNode: qryList) {
+ RootQuery<Object[]> qry = new RootQuery<>(
+ sqlNode.toString(),
+ schemaHolder.schema(schemaName), // Update schema for each query in multiple statements.
+ params,
+ qryCtx,
+ exchangeSvc,
+ (q) -> qryReg.unregister(q.id()),
+ log
+ );
+
+ qrys.add(qry);
+
+ qryReg.register(qry);
+
+ try {
+ if (qryList.size() == 1) {
+ plan = queryPlanCache().queryPlan(
+ new CacheKey(schemaName, qry.sql()),
+ () -> prepareSvc.prepareSingle(sqlNode, qry.planningContext()));
+ }
+ else
+ plan = prepareSvc.prepareSingle(sqlNode, qry.planningContext());
+
+ cursors.add(executionSvc.executePlan(qry, plan));
+ }
+ catch (Exception e) {
+ boolean isCanceled = qry.isCancelled();
+
+ qrys.forEach(RootQuery::cancel);
+
+ qryReg.unregister(qry.id());
+
+ if (isCanceled)
+ throw new IgniteSQLException("The query was cancelled while planning", IgniteQueryErrorCode.QUERY_CANCELED, e);
+ else
+ throw e;
+ }
+ }
+
+ return cursors;
}
/** */
@@ -293,4 +408,19 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
((LifecycleAware) service).onStop();
}
}
+
+ /** {@inheritDoc} */
+ @Override public RunningQuery runningQuery(UUID id) {
+ return qryReg.query(id);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<? extends RunningQuery> runningQueries() {
+ return qryReg.runningQueries();
+ }
+
+ /** */
+ public QueryRegistry queryRegistry() {
+ return qryReg;
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
new file mode 100644
index 0000000..1c48bd1
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/** */
+public class Query<RowT> implements RunningQuery {
+ /** Completable futures empty array. */
+ private static final CompletableFuture<?>[] COMPLETABLE_FUTURES_EMPTY_ARRAY = new CompletableFuture<?>[0];
+
+ /** */
+ private final UUID initNodeId;
+
+ /** */
+ private final UUID id;
+
+ /** */
+ protected final Object mux = new Object();
+
+ /** */
+ protected final Set<RunningFragment<RowT>> fragments;
+
+ /** */
+ protected final GridQueryCancel cancel;
+
+ /** */
+ protected final Consumer<Query<RowT>> unregister;
+
+ /** */
+ protected volatile QueryState state = QueryState.INITED;
+
+ /** */
+ protected final ExchangeService exch;
+
+ /** Logger. */
+ protected final IgniteLogger log;
+
+ /** */
+ public Query(
+ UUID id,
+ UUID initNodeId,
+ GridQueryCancel cancel,
+ ExchangeService exch,
+ Consumer<Query<RowT>> unregister,
+ IgniteLogger log
+ ) {
+ this.id = id;
+ this.unregister = unregister;
+ this.initNodeId = initNodeId;
+ this.exch = exch;
+ this.log = log;
+
+ this.cancel = cancel != null ? cancel : new GridQueryCancel();
+
+ fragments = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ }
+
+ /** */
+ @Override public UUID id() {
+ return id;
+ }
+
+ /** */
+ @Override public QueryState state() {
+ return state;
+ }
+
+ /** */
+ public UUID initiatorNodeId() {
+ return initNodeId;
+ }
+
+ /** */
+ protected void tryClose() {
+ List<RunningFragment<RowT>> fragments = new ArrayList<>(this.fragments);
+
+ AtomicInteger cntDown = new AtomicInteger(fragments.size());
+
+ for (RunningFragment<RowT> frag : fragments) {
+ frag.context().execute(() -> {
+ frag.root().close();
+ frag.context().cancel();
+
+ if (cntDown.decrementAndGet() == 0)
+ unregister.accept(this);
+
+ }, frag.root()::onError);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ synchronized (mux) {
+ if (state == QueryState.CLOSED)
+ return;
+
+ if (state == QueryState.INITED) {
+ state = QueryState.CLOSING;
+
+ try {
+ exch.closeQuery(initNodeId, id);
+
+ return;
+ }
+ catch (IgniteCheckedException e) {
+ log.warning("Cannot send cancel request to query initiator", e);
+ }
+ }
+
+ if (state == QueryState.EXECUTING || state == QueryState.CLOSING)
+ state = QueryState.CLOSED;
+ }
+
+ for (RunningFragment<RowT> frag : fragments)
+ frag.context().execute(() -> frag.root().onError(new ExecutionCancelledException()), frag.root()::onError);
+
+ tryClose();
+ }
+
+ /** */
+ public void addFragment(RunningFragment<RowT> f) {
+ synchronized (mux) {
+ if (state == QueryState.INITED)
+ state = QueryState.EXECUTING;
+
+ if (state == QueryState.CLOSING || state == QueryState.CLOSED) {
+ throw new IgniteSQLException(
+ "The query was cancelled",
+ IgniteQueryErrorCode.QUERY_CANCELED,
+ new ExecutionCancelledException()
+ );
+ }
+
+ fragments.add(f);
+ }
+ }
+
+ /** */
+ public boolean isCancelled() {
+ return cancel.isCanceled();
+ }
+
+ /** */
+ public void onNodeLeft(UUID nodeId) {
+ if (initNodeId.equals(nodeId))
+ cancel();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Query.class, this, "state", state, "fragments", fragments);
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistry.java
similarity index 53%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistry.java
index 64e2f71..70573a8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistry.java
@@ -15,35 +15,41 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.exec;
+package org.apache.ignite.internal.processors.query.calcite;
-import java.util.List;
+import java.util.Collection;
import java.util.UUID;
-import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.RunningQuery;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
-import org.jetbrains.annotations.Nullable;
/**
- *
+ * Registry of the running queries.
*/
-public interface ExecutionService extends Service {
+public interface QueryRegistry extends Service {
+ /**
+ * Register the query or return the existing one with the same identifier.
+ *
+ * @param qry Query to register.
+ * @return Registered query.
+ */
+ RunningQuery register(RunningQuery qry);
+
/**
- * Executes a query.
+ * Lookup query by identifier.
*
- * @param ctx Query external context, contains flags and connection settings like a locale or a timezone.
- * @param schema Schema name.
- * @param query Query.
- * @param params Query parameters.
- * @return Query cursor.
+ * @param id Query identified.
+ * @return Registered query or {@code null} if the query with specified identifier isn't found.
*/
- List<FieldsQueryCursor<List<?>>> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params);
+ RunningQuery query(UUID id);
/**
- * Cancels a running query.
+ * Unregister query by identifier.
*
- * @param queryId Query ID.
+ * @param id Query identifier.
*/
- void cancelQuery(UUID queryId);
+ void unregister(UUID id);
+
+ /** */
+ Collection<? extends RunningQuery> runningQueries();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
new file mode 100644
index 0000000..429d05f
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Registry of the running queries.
+ */
+public class QueryRegistryImpl implements QueryRegistry {
+ /** */
+ private final ConcurrentMap<UUID, RunningQuery> runningQrys = new ConcurrentHashMap<>();
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ public QueryRegistryImpl(IgniteLogger log) {
+ this.log = log;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RunningQuery register(RunningQuery qry) {
+ return runningQrys.computeIfAbsent(qry.id(), k -> qry);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RunningQuery query(UUID id) {
+ return runningQrys.get(id);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unregister(UUID id) {
+ runningQrys.remove(id);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<? extends RunningQuery> runningQueries() {
+ return runningQrys.values();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void tearDown() {
+ runningQrys.values().forEach(q -> IgniteUtils.close(q::cancel, log));
+
+ runningQrys.clear();
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RemoteFragmentKey.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RemoteFragmentKey.java
new file mode 100644
index 0000000..9b1eac7
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RemoteFragmentKey.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.UUID;
+
+/** */
+final class RemoteFragmentKey {
+ /** */
+ private final UUID nodeId;
+
+ /** */
+ private final long fragmentId;
+
+ /** */
+ RemoteFragmentKey(UUID nodeId, long fragmentId) {
+ this.nodeId = nodeId;
+ this.fragmentId = fragmentId;
+ }
+
+ /** */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /** */
+ public long fragmentId() {
+ return fragmentId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ RemoteFragmentKey that = (RemoteFragmentKey) o;
+
+ if (fragmentId != that.fragmentId)
+ return false;
+ return nodeId.equals(that.nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = nodeId.hashCode();
+ res = 31 * res + (int) (fragmentId ^ (fragmentId >>> 32));
+ return res;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
new file mode 100644
index 0000000..449b050
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.util.CancelFlag;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
+import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
+
+/**
+ * The RootQuery is created on the query initiator (originator) node as the first step of a query run;
+ * It contains the information about query state, contexts, remote fragments;
+ * It provides 'cancel' functionality for running query like a base query class.
+ */
+public class RootQuery<RowT> extends Query<RowT> {
+ /** SQL query. */
+ private final String sql;
+
+ /** Parameters. */
+ private final Object[] params;
+
+ /** remote nodes */
+ private final Set<UUID> remotes;
+
+ /** node to fragment */
+ private final Set<RemoteFragmentKey> waiting;
+
+ /** */
+ private volatile RootNode<RowT> root;
+
+ /** */
+ private volatile PlanningContext pctx;
+
+ /** */
+ private final BaseQueryContext ctx;
+
+ /** */
+ public RootQuery(
+ String sql,
+ SchemaPlus schema,
+ Object[] params,
+ QueryContext qryCtx,
+ ExchangeService exch,
+ Consumer<Query<RowT>> unregister,
+ IgniteLogger log
+ ) {
+ super(
+ UUID.randomUUID(),
+ null,
+ qryCtx != null ? qryCtx.unwrap(GridQueryCancel.class) : null,
+ exch,
+ unregister,
+ log
+ );
+
+ this.sql = sql;
+ this.params = params;
+
+ remotes = new HashSet<>();
+ waiting = new HashSet<>();
+
+ Context parent = Commons.convert(qryCtx);
+
+ ctx = BaseQueryContext.builder()
+ .parentContext(parent)
+ .frameworkConfig(
+ Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .build()
+ )
+ .logger(log)
+ .build();
+ }
+
+ /**
+ * Creates the new root that inherits the query parameters from {@code this} query.
+ * Is used to execute DML query immediately after (inside) DDL.
+ * e.g.:
+ * CREATE TABLE MY_TABLE AS SELECT ... FROM ...;
+ *
+ * @param schema new schema.
+ */
+ public RootQuery<RowT> childQuery(SchemaPlus schema) {
+ return new RootQuery<>(sql, schema, params, QueryContext.of(cancel), exch, unregister, log);
+ }
+
+ /** */
+ public BaseQueryContext context() {
+ return ctx;
+ }
+
+ /** */
+ public String sql() {
+ return sql;
+ }
+
+ /** */
+ public Object[] parameters() {
+ return params;
+ }
+
+ /**
+ * Starts maping phase for the query.
+ */
+ public void mapping() {
+ synchronized (mux) {
+ if (state == QueryState.CLOSED) {
+ throw new IgniteSQLException(
+ "The query was cancelled while executing.",
+ IgniteQueryErrorCode.QUERY_CANCELED
+ );
+ }
+
+ state = QueryState.MAPPING;
+ }
+ }
+
+ /**
+ * Starts execution phase for the query and setup remote fragments.
+ */
+ public void run(ExecutionContext<RowT> ctx, MultiStepPlan plan, Node<RowT> root) {
+ synchronized (mux) {
+ if (state == QueryState.CLOSED) {
+ throw new IgniteSQLException(
+ "The query was cancelled while executing.",
+ IgniteQueryErrorCode.QUERY_CANCELED
+ );
+ }
+
+ RootNode<RowT> rootNode = new RootNode<>(ctx, plan.fieldsMetadata().rowType(), this::tryClose);
+ rootNode.register(root);
+
+ addFragment(new RunningFragment<>(F.first(plan.fragments()).root(), rootNode, ctx));
+
+ this.root = rootNode;
+
+ for (int i = 1; i < plan.fragments().size(); i++) {
+ Fragment fragment = plan.fragments().get(i);
+ List<UUID> nodes = plan.mapping(fragment).nodeIds();
+
+ remotes.addAll(nodes);
+
+ for (UUID node : nodes)
+ waiting.add(new RemoteFragmentKey(node, fragment.fragmentId()));
+ }
+
+ state = QueryState.EXECUTING;
+ }
+ }
+
+ /**
+ * Can be called multiple times after receive each error
+ * at {@link #onResponse(RemoteFragmentKey, Throwable)}.
+ */
+ @Override protected void tryClose() {
+ QueryState state0 = null;
+
+ synchronized (mux) {
+ if (state == QueryState.CLOSED)
+ return;
+
+ if (state == QueryState.INITED || state == QueryState.PLANNING || state == QueryState.MAPPING) {
+ state = QueryState.CLOSED;
+
+ return;
+ }
+
+ if (state == QueryState.EXECUTING) {
+ state0 = state = QueryState.CLOSING;
+
+ root.closeInternal();
+ }
+
+ if (state == QueryState.CLOSING && waiting.isEmpty())
+ state0 = state = QueryState.CLOSED;
+ }
+
+ if (state0 == QueryState.CLOSED) {
+ try {
+ IgniteException wrpEx = null;
+
+ for (UUID nodeId : remotes) {
+ try {
+ if (!nodeId.equals(root.context().localNodeId()))
+ exch.closeQuery(nodeId, id());
+ }
+ catch (IgniteCheckedException e) {
+ if (wrpEx == null)
+ wrpEx = new IgniteException("Failed to send cancel message. [nodeId=" + nodeId + ']', e);
+ else
+ wrpEx.addSuppressed(e);
+ }
+ }
+
+ if (wrpEx != null)
+ log.warning("An exception occures during the query cancel", wrpEx);
+ }
+ finally {
+ super.tryClose();
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ cancel.cancel();
+
+ tryClose();
+ }
+
+ /** */
+ public PlanningContext planningContext() {
+ synchronized (mux) {
+ if (state == QueryState.CLOSED || state == QueryState.CLOSING) {
+ throw new IgniteSQLException(
+ "The query was cancelled while executing.",
+ IgniteQueryErrorCode.QUERY_CANCELED
+ );
+ }
+
+ if (state == QueryState.EXECUTING || state == QueryState.MAPPING) {
+ throw new IgniteSQLException(
+ "Invalid query flow",
+ IgniteQueryErrorCode.UNKNOWN
+ );
+ }
+
+ if (pctx == null) {
+ state = QueryState.PLANNING;
+
+ pctx = PlanningContext.builder()
+ .parentContext(ctx)
+ .query(sql)
+ .parameters(params)
+ .build();
+
+ try {
+ cancel.add(() -> pctx.unwrap(CancelFlag.class).requestCancel());
+ }
+ catch (QueryCancelledException e) {
+ throw new IgniteSQLException(e.getMessage(), IgniteQueryErrorCode.QUERY_CANCELED, e);
+ }
+ }
+
+ return pctx;
+ }
+ }
+
+ /** */
+ public Iterator<RowT> iterator() {
+ return root;
+ }
+
+ /** */
+ @Override public void onNodeLeft(UUID nodeId) {
+ List<RemoteFragmentKey> fragments = null;
+
+ synchronized (mux) {
+ fragments = waiting.stream().filter(f -> f.nodeId().equals(nodeId)).collect(Collectors.toList());
+ }
+
+ if (!F.isEmpty(fragments)) {
+ ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException(
+ "Failed to start query, node left. nodeId=" + nodeId);
+
+ for (RemoteFragmentKey fragment : fragments)
+ onResponse(fragment, ex);
+ }
+ }
+
+ /** */
+ public void onResponse(UUID nodeId, long fragmentId, Throwable error) {
+ onResponse(new RemoteFragmentKey(nodeId, fragmentId), error);
+ }
+
+ /** */
+ private void onResponse(RemoteFragmentKey fragment, Throwable error) {
+ QueryState state;
+ synchronized (mux) {
+ waiting.remove(fragment);
+
+ state = this.state;
+ }
+
+ if (error != null)
+ onError(error);
+ else if (state == QueryState.CLOSING)
+ tryClose();
+ }
+
+ /** */
+ public void onError(Throwable error) {
+ root.onError(error);
+
+ tryClose();
+ }
+
+ /** */
+ @Override public String toString() {
+ return S.toString(RootQuery.class, this);
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RunningFragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RunningFragment.java
new file mode 100644
index 0000000..2627e022
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RunningFragment.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.Objects;
+
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.AbstractNode;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/** */
+public class RunningFragment<Row> {
+ /** Relation tree of the fragment is used to generate fragment human-readable description. */
+ private final IgniteRel rootRel;
+
+ /** */
+ private final AbstractNode<Row> root;
+
+ /** */
+ private final ExecutionContext<Row> ectx;
+
+ /** */
+ public RunningFragment(
+ IgniteRel rootRel,
+ AbstractNode<Row> root,
+ ExecutionContext<Row> ectx) {
+ this.rootRel = rootRel;
+ this.root = root;
+ this.ectx = ectx;
+ }
+
+ /** */
+ public ExecutionContext<Row> context() {
+ return ectx;
+ }
+
+ /** */
+ public AbstractNode<Row> root() {
+ return root;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ RunningFragment<Row> fragment = (RunningFragment<Row>)o;
+
+ return Objects.equals(ectx, fragment.ectx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(ectx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(RunningFragment.class, this);
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
index ac1f58f..ffbf5bc 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
@@ -63,10 +63,8 @@ public interface ExchangeService extends Service {
* Sends cancel request.
* @param nodeId Target node ID.
* @param qryId Query ID.
- * @param fragmentId Target fragment ID.
- * @param exchangeId Exchange ID.
*/
- void closeOutbox(UUID nodeId, UUID qryId, long fragmentId, long exchangeId) throws IgniteCheckedException;
+ void closeQuery(UUID nodeId, UUID qryId) throws IgniteCheckedException;
/**
* @param nodeId Target node ID.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index 6c1fca4..bd942552 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -21,20 +21,23 @@ import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
+
import com.google.common.collect.ImmutableMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.RunningQuery;
import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import org.apache.ignite.internal.processors.query.calcite.QueryRegistry;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
import org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
import org.apache.ignite.internal.processors.query.calcite.message.InboxCloseMessage;
import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
-import org.apache.ignite.internal.processors.query.calcite.message.OutboxCloseMessage;
import org.apache.ignite.internal.processors.query.calcite.message.QueryBatchAcknowledgeMessage;
import org.apache.ignite.internal.processors.query.calcite.message.QueryBatchMessage;
+import org.apache.ignite.internal.processors.query.calcite.message.QueryCloseMessage;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
@@ -57,6 +60,9 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
/** */
private MessageService msgSvc;
+ /** */
+ private QueryRegistry qryRegistry;
+
/**
* @param ctx Kernal context.
*/
@@ -108,6 +114,11 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
return msgSvc;
}
+ /** */
+ public void queryRegistry(QueryRegistry qryRegistry) {
+ this.qryRegistry = qryRegistry;
+ }
+
/** {@inheritDoc} */
@Override public <Row> void sendBatch(UUID nodeId, UUID qryId, long fragmentId, long exchangeId, int batchId,
boolean last, List<Row> rows) throws IgniteCheckedException {
@@ -121,8 +132,8 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
}
/** {@inheritDoc} */
- @Override public void closeOutbox(UUID nodeId, UUID qryId, long fragmentId, long exchangeId) throws IgniteCheckedException {
- messageService().send(nodeId, new OutboxCloseMessage(qryId, fragmentId, exchangeId));
+ @Override public void closeQuery(UUID nodeId, UUID qryId) throws IgniteCheckedException {
+ messageService().send(nodeId, new QueryCloseMessage(qryId));
}
/** {@inheritDoc} */
@@ -143,6 +154,7 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
taskExecutor(proc.taskExecutor());
mailboxRegistry(proc.mailboxRegistry());
messageService(proc.messageService());
+ queryRegistry(proc.queryRegistry());
init();
}
@@ -150,9 +162,9 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
/** {@inheritDoc} */
@Override public void init() {
messageService().register((n, m) -> onMessage(n, (InboxCloseMessage) m), MessageType.QUERY_INBOX_CANCEL_MESSAGE);
- messageService().register((n, m) -> onMessage(n, (OutboxCloseMessage) m), MessageType.QUERY_OUTBOX_CANCEL_MESSAGE);
messageService().register((n, m) -> onMessage(n, (QueryBatchAcknowledgeMessage) m), MessageType.QUERY_ACKNOWLEDGE_MESSAGE);
messageService().register((n, m) -> onMessage(n, (QueryBatchMessage) m), MessageType.QUERY_BATCH_MESSAGE);
+ messageService().register((n, m) -> onMessage(n, (QueryCloseMessage) m), MessageType.QUERY_CLOSE_MESSAGE);
}
/** {@inheritDoc} */
@@ -178,22 +190,15 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
}
/** */
- protected void onMessage(UUID nodeId, OutboxCloseMessage msg) {
- Collection<Outbox<?>> outboxes = mailboxRegistry().outboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
+ protected void onMessage(UUID nodeId, QueryCloseMessage msg) {
+ RunningQuery qry = qryRegistry.query(msg.queryId());
- if (!F.isEmpty(outboxes)) {
- for (Outbox<?> outbox : outboxes)
- outbox.context().execute(outbox::close, outbox::onError);
-
- for (Outbox<?> outbox : outboxes)
- outbox.context().execute(outbox.context()::cancel, outbox::onError);
- }
- else if (log.isDebugEnabled()) {
- log.debug("Stale oubox cancel message received: [" +
+ if (qry != null)
+ qry.cancel();
+ else {
+ log.warning("Stale query close message received: [" +
"nodeId=" + nodeId +
- ", queryId=" + msg.queryId() +
- ", fragmentId=" + msg.fragmentId() +
- ", exchangeId=" + msg.exchangeId() + "]");
+ ", queryId=" + msg.queryId() + "]");
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 45f6cea..2a94b21 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.calcite.exec;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -34,7 +35,6 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactoryImpl;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
@@ -236,13 +236,6 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
return unwrap(BaseQueryContext.class).typeFactory();
}
- /**
- * @return Query cancel.
- */
- public GridQueryCancel queryCancel() {
- return unwrap(BaseQueryContext.class).queryCancel();
- }
-
/** {@inheritDoc} */
@Override public QueryProvider getQueryProvider() {
return null; // TODO
@@ -301,7 +294,8 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
executor.execute(qryId, fragmentId(), () -> {
try {
- task.run();
+ if (!isCancelled())
+ task.run();
}
catch (Throwable e) {
onError.accept(e);
@@ -354,4 +348,21 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
public boolean isCancelled() {
return cancelFlag.get();
}
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ExecutionContext<?> context = (ExecutionContext<?>)o;
+
+ return qryId.equals(context.qryId) && fragmentDesc.fragmentId() == context.fragmentDesc.fragmentId();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(qryId, fragmentDesc.fragmentId());
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
index 64e2f71..a3fbf2c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
@@ -18,32 +18,16 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
import java.util.List;
-import java.util.UUID;
import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.calcite.RootQuery;
+import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
-import org.jetbrains.annotations.Nullable;
/**
*
*/
-public interface ExecutionService extends Service {
- /**
- * Executes a query.
- *
- * @param ctx Query external context, contains flags and connection settings like a locale or a timezone.
- * @param schema Schema name.
- * @param query Query.
- * @param params Query parameters.
- * @return Query cursor.
- */
- List<FieldsQueryCursor<List<?>>> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params);
-
- /**
- * Cancels a running query.
- *
- * @param queryId Query ID.
- */
- void cancelQuery(UUID queryId);
+public interface ExecutionService<Row> extends Service {
+ /** */
+ FieldsQueryCursor<List<?>> executePlan(RootQuery<Row> qry, QueryPlan plan);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 46e7678..41ef0b1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -17,39 +17,20 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.runtime.CalciteContextException;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlDdl;
-import org.apache.calcite.sql.SqlExplain;
-import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlInsert;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.ValidationException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -57,16 +38,18 @@ import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeMan
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
-import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.QueryCancellable;
-import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.RunningQuery;
import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import org.apache.ignite.internal.processors.query.calcite.Query;
+import org.apache.ignite.internal.processors.query.calcite.QueryRegistry;
+import org.apache.ignite.internal.processors.query.calcite.RootQuery;
+import org.apache.ignite.internal.processors.query.calcite.RunningFragment;
import org.apache.ignite.internal.processors.query.calcite.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
-import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
import org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
@@ -85,44 +68,33 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.FieldsMetadat
import org.apache.ignite.internal.processors.query.calcite.prepare.FieldsMetadataImpl;
import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
import org.apache.ignite.internal.processors.query.calcite.prepare.FragmentPlan;
-import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
-import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepDmlPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
-import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepQueryPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PrepareServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCache;
-import org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate;
-import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
-import org.apache.ignite.internal.processors.query.calcite.prepare.ValidationResult;
import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.CreateTableCommand;
-import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.DdlSqlToCommandConverter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolder;
-import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryCursor;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static java.util.Collections.singletonList;
-import static org.apache.calcite.rel.type.RelDataType.PRECISION_NOT_SPECIFIED;
import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
-import static org.apache.ignite.internal.processors.query.calcite.exec.PlannerHelper.optimize;
import static org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader.fromJson;
/**
*
*/
@SuppressWarnings("TypeMayBeWeakened")
-public class ExecutionServiceImpl<Row> extends AbstractService implements ExecutionService {
+public class ExecutionServiceImpl<Row> extends AbstractService implements ExecutionService<Row> {
/** */
private final DiscoveryEventListener discoLsnr;
@@ -163,10 +135,13 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
private ExchangeService exchangeSvc;
/** */
+ private PrepareServiceImpl prepareSvc;
+
+ /** */
private ClosableIteratorsHolder iteratorsHolder;
/** */
- private final Map<UUID, QueryInfo> running;
+ private QueryRegistry qryReg;
/** */
private final RowHandler<Row> handler;
@@ -174,9 +149,6 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
/** */
private final DdlCommandHandler ddlCmdHnd;
- /** */
- private final DdlSqlToCommandConverter ddlConverter;
-
/**
* @param ctx Kernal.
*/
@@ -185,11 +157,9 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
this.handler = handler;
discoLsnr = (e, c) -> onNodeLeft(e.eventNode().id());
- running = new ConcurrentHashMap<>();
- ddlConverter = new DdlSqlToCommandConverter();
ddlCmdHnd = new DdlCommandHandler(
- ctx::query, ctx.cache(), ctx.security(), () -> schemaHolder().schema()
+ ctx::query, ctx.cache(), ctx.security(), () -> schemaHolder().schema(null)
);
}
@@ -327,6 +297,13 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
}
/**
+ * @param prepareSvc Prepare service.
+ */
+ public void prepareService(PrepareServiceImpl prepareSvc) {
+ this.prepareSvc = prepareSvc;
+ }
+
+ /**
* @return Exchange service.
*/
public ExchangeService exchangeService() {
@@ -375,46 +352,9 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
return iteratorsHolder;
}
- /** {@inheritDoc} */
- @Override public List<FieldsQueryCursor<List<?>>> executeQuery(
- @Nullable QueryContext ctx,
- String schema,
- String qry,
- Object[] params
- ) {
- QueryPlan plan = queryPlanCache().queryPlan(new CacheKey(getDefaultSchema(schema).getName(), qry));
- if (plan != null) {
- PlanningContext pctx = createContext(ctx, schema, qry, params);
-
- return Collections.singletonList(executePlan(UUID.randomUUID(), pctx, plan));
- }
-
- SqlNodeList qryList = Commons.parse(qry, FRAMEWORK_CONFIG.getParserConfig());
- List<FieldsQueryCursor<List<?>>> cursors = new ArrayList<>(qryList.size());
-
- for (final SqlNode qry0: qryList) {
- final PlanningContext pctx = createContext(ctx, schema, qry0.toString(), params);
-
- if (qryList.size() == 1) {
- plan = queryPlanCache().queryPlan(
- new CacheKey(pctx.schemaName(), pctx.query()),
- () -> prepareSingle(qry0, pctx));
- }
- else
- plan = prepareSingle(qry0, pctx);
-
- cursors.add(executePlan(UUID.randomUUID(), pctx, plan));
- }
-
- return cursors;
- }
-
- /** {@inheritDoc} */
- @Override public void cancelQuery(UUID qryId) {
- QueryInfo info = running.get(qryId);
-
- if (info != null)
- info.doCancel();
+ /** */
+ public void queryRegistry(QueryRegistry qryReg) {
+ this.qryReg = qryReg;
}
/** {@inheritDoc} */
@@ -436,6 +376,8 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
mappingService(proc.mappingService());
messageService(proc.messageService());
exchangeService(proc.exchangeService());
+ queryRegistry(proc.queryRegistry());
+ prepareService(proc.prepareService());
init();
}
@@ -455,8 +397,6 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
@Override public void tearDown() {
eventManager().removeDiscoveryEventListener(discoLsnr, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
- running.clear();
-
iteratorsHolder().tearDown();
}
@@ -466,17 +406,12 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
}
/** */
- private PlanningContext createContext(QueryContext ctx, @Nullable String schema, String qry, Object[] params) {
- return createContext(Commons.convert(ctx), schema, qry, params);
- }
-
- /** */
private BaseQueryContext createQueryContext(Context parent, @Nullable String schema) {
return BaseQueryContext.builder()
.parentContext(parent)
.frameworkConfig(
Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(getDefaultSchema(schema))
+ .defaultSchema(schemaHolder().schema(schema))
.build()
)
.logger(log)
@@ -484,148 +419,20 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
}
/** */
- private PlanningContext createContext(Context parent, @Nullable String schema, String qry, Object[] params) {
- return PlanningContext.builder()
- .parentContext(createQueryContext(parent, schema))
- .query(qry)
- .parameters(params)
- .build();
- }
-
- /** */
- private SchemaPlus getDefaultSchema(String schema) {
- return schema != null ? schemaHolder().schema().getSubSchema(schema) : schemaHolder().schema();
- }
-
- /** */
private QueryPlan prepareFragment(BaseQueryContext ctx, String jsonFragment) {
return new FragmentPlan(fromJson(ctx, jsonFragment));
}
- /** */
- private QueryPlan prepareSingle(SqlNode sqlNode, PlanningContext ctx) {
- try {
- assert single(sqlNode);
-
- ctx.planner().reset();
-
- if (SqlKind.DDL.contains(sqlNode.getKind()))
- return prepareDdl(sqlNode, ctx);
-
- switch (sqlNode.getKind()) {
- case SELECT:
- case ORDER_BY:
- case WITH:
- case VALUES:
- case UNION:
- case EXCEPT:
- case INTERSECT:
- return prepareQuery(sqlNode, ctx);
-
- case INSERT:
- case DELETE:
- case UPDATE:
- return prepareDml(sqlNode, ctx);
-
- case EXPLAIN:
- return prepareExplain(sqlNode, ctx);
-
- default:
- throw new IgniteSQLException("Unsupported operation [" +
- "sqlNodeKind=" + sqlNode.getKind() + "; " +
- "querySql=\"" + ctx.query() + "\"]", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
- }
- }
- catch (ValidationException | CalciteContextException e) {
- throw new IgniteSQLException("Failed to validate query: " + e.getMessage(), IgniteQueryErrorCode.PARSING, e);
- }
- }
-
- /** */
- private QueryPlan prepareQuery(SqlNode sqlNode, PlanningContext ctx) {
- IgnitePlanner planner = ctx.planner();
-
- // Validate
- ValidationResult validated = planner.validateAndGetTypeMetadata(sqlNode);
-
- sqlNode = validated.sqlNode();
-
- IgniteRel igniteRel = optimize(sqlNode, planner, log);
-
- return new MultiStepQueryPlan(queryTemplate(igniteRel),
- queryFieldsMetadata(ctx, validated.dataType(), validated.origins()));
- }
-
- /** */
- private QueryPlan prepareDml(SqlNode sqlNode, PlanningContext ctx) throws ValidationException {
- IgnitePlanner planner = ctx.planner();
-
- // Validate
- sqlNode = planner.validate(sqlNode);
-
- // Convert to Relational operators graph
- IgniteRel igniteRel = optimize(sqlNode, planner, log);
-
- return new MultiStepDmlPlan(queryTemplate(igniteRel),
- queryFieldsMetadata(ctx, igniteRel.getRowType(), null));
- }
-
- /** */
- private QueryPlan prepareDdl(SqlNode sqlNode, PlanningContext ctx) {
- assert sqlNode instanceof SqlDdl : sqlNode == null ? "null" : sqlNode.getClass().getName();
-
- return new DdlPlan(ddlConverter.convert((SqlDdl)sqlNode, ctx));
- }
-
- /** */
- private QueryPlan prepareExplain(SqlNode explain, PlanningContext ctx) throws ValidationException {
- IgnitePlanner planner = ctx.planner();
-
- SqlNode sql = ((SqlExplain)explain).getExplicandum();
-
- // Validate
- sql = planner.validate(sql);
-
- // Convert to Relational operators graph
- IgniteRel igniteRel = optimize(sql, planner, log);
-
- String plan = RelOptUtil.toString(igniteRel, SqlExplainLevel.ALL_ATTRIBUTES);
-
- return new ExplainPlan(plan, explainFieldsMetadata(ctx));
- }
-
- /** */
- private QueryTemplate queryTemplate(IgniteRel rel) {
- // Split query plan to query fragments.
- List<Fragment> fragments = new Splitter().go(rel);
-
- return new QueryTemplate(mappingSvc, fragments);
- }
-
- /** */
- private FieldsMetadata explainFieldsMetadata(PlanningContext ctx) {
- IgniteTypeFactory factory = ctx.typeFactory();
- RelDataType planStrDataType =
- factory.createSqlType(SqlTypeName.VARCHAR, PRECISION_NOT_SPECIFIED);
- T2<String, RelDataType> planField = new T2<>(ExplainPlan.PLAN_COL_NAME, planStrDataType);
- RelDataType planDataType = factory.createStructType(singletonList(planField));
-
- return queryFieldsMetadata(ctx, planDataType, null);
- }
-
- /** */
- private FieldsQueryCursor<List<?>> executePlan(
- UUID qryId,
- PlanningContext pctx,
+ /** {@inheritDoc} */
+ @Override public FieldsQueryCursor<List<?>> executePlan(
+ RootQuery<Row> qry,
QueryPlan plan
) {
switch (plan.type()) {
case DML:
ListFieldsQueryCursor<?> cur = mapAndExecutePlan(
- qryId,
- (MultiStepPlan)plan,
- pctx.unwrap(BaseQueryContext.class),
- pctx.parameters()
+ qry,
+ (MultiStepPlan)plan
);
cur.iterator().hasNext();
@@ -634,17 +441,15 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
case QUERY:
return mapAndExecutePlan(
- qryId,
- (MultiStepPlan)plan,
- pctx.unwrap(BaseQueryContext.class),
- pctx.parameters()
+ qry,
+ (MultiStepPlan)plan
);
case EXPLAIN:
- return executeExplain((ExplainPlan)plan, pctx);
+ return executeExplain((ExplainPlan)plan);
case DDL:
- return executeDdl(qryId, (DdlPlan)plan, pctx);
+ return executeDdl(qry, (DdlPlan)plan);
default:
throw new AssertionError("Unexpected plan type: " + plan);
@@ -652,29 +457,29 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
}
/** */
- private FieldsQueryCursor<List<?>> executeDdl(UUID qryId, DdlPlan plan, PlanningContext pctx) {
+ private FieldsQueryCursor<List<?>> executeDdl(RootQuery<Row> qry, DdlPlan plan) {
try {
- ddlCmdHnd.handle(qryId, plan.command(), pctx);
+ ddlCmdHnd.handle(qry.id(), plan.command());
}
catch (IgniteCheckedException e) {
- throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + pctx.query() +
+ throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qry.sql() +
", err=" + e.getMessage() + ']', e);
}
+ finally {
+ qryReg.unregister(qry.id());
+ }
- if (plan.command() instanceof CreateTableCommand && ((CreateTableCommand)plan.command()).insertStatement() != null) {
- SqlInsert insertStmt = ((CreateTableCommand)plan.command()).insertStatement();
+ if (plan.command() instanceof CreateTableCommand
+ && ((CreateTableCommand)plan.command()).insertStatement() != null) {
+ RootQuery<Row> insQry = qry.childQuery(schemaHolder.schema(qry.context().schemaName()));
- try {
- // Create new planning context containing created table in the schema.
- PlanningContext dmlCtx = createContext(pctx, pctx.schemaName(), pctx.query(), pctx.parameters());
+ qryReg.register(insQry);
- QueryPlan dmlPlan = prepareDml(insertStmt, dmlCtx);
+ SqlInsert insertStmt = ((CreateTableCommand)plan.command()).insertStatement();
- return executePlan(qryId, dmlCtx, dmlPlan);
- }
- catch (ValidationException e) {
- throw new IgniteSQLException("Failed to validate query.", IgniteQueryErrorCode.PARSING, e);
- }
+ QueryPlan dmlPlan = prepareSvc.prepareSingle(insertStmt, insQry.planningContext());
+
+ return executePlan(insQry, dmlPlan);
}
else
return H2Utils.zeroCursor();
@@ -682,13 +487,13 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
/** */
private ListFieldsQueryCursor<?> mapAndExecutePlan(
- UUID qryId,
- MultiStepPlan plan,
- BaseQueryContext qctx,
- Object[] params
+ RootQuery<Row> qry,
+ MultiStepPlan plan
) {
+ qry.mapping();
+
MappingQueryContext mapCtx = Commons.mapContext(locNodeId, topologyVersion());
- plan.init(mapCtx);
+ plan.init(mappingSvc, mapCtx);
List<Fragment> fragments = plan.fragments();
@@ -714,23 +519,20 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
plan.remotes(fragment));
ExecutionContext<Row> ectx = new ExecutionContext<>(
- qctx,
+ qry.context(),
taskExecutor(),
- qryId,
+ qry.id(),
locNodeId,
locNodeId,
mapCtx.topologyVersion(),
fragmentDesc,
handler,
- Commons.parametersMap(params));
+ Commons.parametersMap(qry.parameters()));
Node<Row> node = new LogicalRelImplementor<>(ectx, partitionService(), mailboxRegistry(),
exchangeService(), failureProcessor()).go(fragment.root());
- QueryInfo info = new QueryInfo(ectx, plan, node);
-
- // register query
- register(info);
+ qry.run(ectx, plan, node);
// start remote execution
for (int i = 1; i < fragments.size(); i++) {
@@ -744,51 +546,54 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
Throwable ex = null;
for (UUID nodeId : fragmentDesc.nodeIds()) {
if (ex != null)
- info.onResponse(nodeId, fragment.fragmentId(), ex);
+ qry.onResponse(nodeId, fragment.fragmentId(), ex);
else {
try {
QueryStartRequest req = new QueryStartRequest(
- qryId,
- qctx.schemaName(),
+ qry.id(),
+ qry.context().schemaName(),
fragment.serialized(),
ectx.topologyVersion(),
fragmentDesc,
- params);
+ qry.parameters());
messageService().send(nodeId, req);
}
catch (Throwable e) {
- info.onResponse(nodeId, fragment.fragmentId(), ex = e);
+ qry.onResponse(nodeId, fragment.fragmentId(), ex = e);
}
}
}
}
- return new ListFieldsQueryCursor<>(plan, info.iterator(), ectx);
+ return new ListFieldsQueryCursor<>(plan, iteratorsHolder().iterator(qry.iterator()), ectx);
}
/** */
- private FieldsQueryCursor<List<?>> executeExplain(ExplainPlan plan, PlanningContext pctx) {
+ private FieldsQueryCursor<List<?>> executeExplain(ExplainPlan plan) {
QueryCursorImpl<List<?>> cur = new QueryCursorImpl<>(singletonList(singletonList(plan.plan())));
- cur.fieldsMeta(plan.fieldsMeta().queryFieldsMetadata(pctx.typeFactory()));
+ cur.fieldsMeta(plan.fieldsMeta().queryFieldsMetadata(Commons.typeFactory()));
return cur;
}
/** */
- private void executeFragment(UUID qryId, FragmentPlan plan, ExecutionContext<Row> ectx) {
+ private void executeFragment(Query<Row> qry, FragmentPlan plan, ExecutionContext<Row> ectx) {
UUID origNodeId = ectx.originatingNodeId();
Outbox<Row> node = new LogicalRelImplementor<>(
- ectx,
- partitionService(),
- mailboxRegistry(),
- exchangeService(),
- failureProcessor())
- .go(plan.root());
+ ectx,
+ partitionService(),
+ mailboxRegistry(),
+ exchangeService(),
+ failureProcessor()
+ )
+ .go(plan.root());
+
+ qry.addFragment(new RunningFragment<>(plan.root(), node, ectx));
try {
- messageService().send(origNodeId, new QueryStartResponse(qryId, ectx.fragmentId()));
+ messageService().send(origNodeId, new QueryStartResponse(qry.id(), ectx.fragmentId()));
}
catch (IgniteCheckedException e) {
IgniteException wrpEx = new IgniteException("Failed to send reply. [nodeId=" + origNodeId + ']', e);
@@ -800,27 +605,6 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
}
/** */
- private void register(QueryInfo info) {
- UUID qryId = info.ctx.queryId();
-
- running.put(qryId, info);
-
- GridQueryCancel qryCancel = info.ctx.queryCancel();
-
- if (qryCancel == null)
- return;
-
- try {
- qryCancel.add(info);
- }
- catch (QueryCancelledException e) {
- running.remove(qryId);
-
- throw new IgniteSQLException(e.getMessage(), IgniteQueryErrorCode.QUERY_CANCELED);
- }
- }
-
- /** */
private FieldsMetadata queryFieldsMetadata(PlanningContext ctx, RelDataType sqlType,
@Nullable List<List<String>> origins) {
RelDataType resultType = TypeUtils.getResultType(
@@ -829,15 +613,21 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
}
/** */
- private boolean single(SqlNode sqlNode) {
- return !(sqlNode instanceof SqlNodeList);
- }
-
- /** */
private void onMessage(UUID nodeId, final QueryStartRequest msg) {
assert nodeId != null && msg != null;
try {
+ Query<Row> qry = (Query<Row>)qryReg.register(
+ new Query<>(
+ msg.queryId(),
+ nodeId,
+ null,
+ exchangeSvc,
+ (q) -> qryReg.unregister(q.id()),
+ log
+ )
+ );
+
final BaseQueryContext qctx = createQueryContext(Contexts.empty(), msg.schema());
QueryPlan qryPlan = queryPlanCache().queryPlan(
@@ -859,7 +649,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
Commons.parametersMap(msg.parameters())
);
- executeFragment(msg.queryId(), (FragmentPlan)qryPlan, ectx);
+ executeFragment(qry, (FragmentPlan)qryPlan, ectx);
}
catch (Throwable ex) {
U.error(log, "Failed to start query fragment ", ex);
@@ -879,6 +669,10 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
e.addSuppressed(ex);
+ Query<Row> qry = (Query<Row>)qryReg.query(msg.queryId());
+
+ qry.cancel();
+
throw wrpEx;
}
@@ -890,224 +684,41 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
private void onMessage(UUID nodeId, QueryStartResponse msg) {
assert nodeId != null && msg != null;
- QueryInfo info = running.get(msg.queryId());
+ RunningQuery qry = qryReg.query(msg.queryId());
+
+ if (qry != null) {
+ assert qry instanceof RootQuery : "Unexpected query object: " + qry;
- if (info != null)
- info.onResponse(nodeId, msg.fragmentId(), msg.error());
+ ((RootQuery<Row>)qry).onResponse(nodeId, msg.fragmentId(), msg.error());
+ }
}
/** */
private void onMessage(UUID nodeId, ErrorMessage msg) {
assert nodeId != null && msg != null;
- QueryInfo info = running.get(msg.queryId());
-
- if (info != null)
- info.onError(new RemoteException(nodeId, msg.queryId(), msg.fragmentId(), msg.error()));
- }
+ RunningQuery qry = qryReg.query(msg.queryId());
- /** */
- private void onNodeLeft(UUID nodeId) {
- running.forEach((uuid, queryInfo) -> queryInfo.onNodeLeft(nodeId));
- }
-
- /** */
- private enum QueryState {
- /** */
- RUNNING,
+ if (qry != null && qry.state() != QueryState.CLOSED) {
+ assert qry instanceof RootQuery : "Unexpected query object: " + qry;
- /** */
- CLOSING,
+ Exception e = new RemoteException(nodeId, msg.queryId(), msg.fragmentId(), msg.error());
- /** */
- CLOSED
- }
-
- /** */
- private static final class RemoteFragmentKey {
- /** */
- private final UUID nodeId;
-
- /** */
- private final long fragmentId;
-
- /** */
- private RemoteFragmentKey(UUID nodeId, long fragmentId) {
- this.nodeId = nodeId;
- this.fragmentId = fragmentId;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
-
- RemoteFragmentKey that = (RemoteFragmentKey) o;
-
- if (fragmentId != that.fragmentId)
- return false;
- return nodeId.equals(that.nodeId);
- }
+ if (X.hasCause(msg.error(), ExecutionCancelledException.class)) {
+ e = new IgniteSQLException(
+ "The query was cancelled while executing.",
+ IgniteQueryErrorCode.QUERY_CANCELED,
+ e
+ );
+ }
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = nodeId.hashCode();
- res = 31 * res + (int) (fragmentId ^ (fragmentId >>> 32));
- return res;
+ ((RootQuery<Row>)qry).onError(e);
}
}
/** */
- @SuppressWarnings("TypeMayBeWeakened")
- private final class QueryInfo implements QueryCancellable {
- /** */
- private final ExecutionContext<Row> ctx;
-
- /** */
- private final RootNode<Row> root;
-
- /** remote nodes */
- private final Set<UUID> remotes;
-
- /** node to fragment */
- private final Set<RemoteFragmentKey> waiting;
-
- /** */
- private volatile QueryState state;
-
- /** */
- private QueryInfo(ExecutionContext<Row> ctx, MultiStepPlan plan, Node<Row> root) {
- this.ctx = ctx;
-
- RootNode<Row> rootNode = new RootNode<>(ctx, plan.fieldsMetadata().rowType(), this::tryClose);
- rootNode.register(root);
-
- this.root = rootNode;
-
- remotes = new HashSet<>();
- waiting = new HashSet<>();
-
- for (int i = 1; i < plan.fragments().size(); i++) {
- Fragment fragment = plan.fragments().get(i);
- List<UUID> nodes = plan.mapping(fragment).nodeIds();
-
- remotes.addAll(nodes);
-
- for (UUID node : nodes)
- waiting.add(new RemoteFragmentKey(node, fragment.fragmentId()));
- }
-
- state = QueryState.RUNNING;
- }
-
- /** */
- public Iterator<Row> iterator() {
- return iteratorsHolder().iterator(root);
- }
-
- /** {@inheritDoc} */
- @Override public void doCancel() {
- root.close();
- }
-
- /**
- * Can be called multiple times after receive each error at {@link #onResponse(RemoteFragmentKey, Throwable)}.
- */
- private void tryClose() {
- QueryState state0 = null;
-
- synchronized (this) {
- if (state == QueryState.CLOSED)
- return;
-
- if (state == QueryState.RUNNING)
- state0 = state = QueryState.CLOSING;
-
- // 1) close local fragment
- root.closeInternal();
-
- if (state == QueryState.CLOSING && waiting.isEmpty())
- state0 = state = QueryState.CLOSED;
- }
-
- if (state0 == QueryState.CLOSED) {
- // 2) unregister runing query
- running.remove(ctx.queryId());
-
- IgniteException wrpEx = null;
-
- // 3) close remote fragments
- for (UUID nodeId : remotes) {
- try {
- exchangeService().closeOutbox(nodeId, ctx.queryId(), -1, -1);
- }
- catch (IgniteCheckedException e) {
- if (wrpEx == null)
- wrpEx = new IgniteException("Failed to send cancel message. [nodeId=" + nodeId + ']', e);
- else
- wrpEx.addSuppressed(e);
- }
- }
-
- // 4) Cancel local fragment
- root.context().execute(ctx::cancel, root::onError);
-
- if (wrpEx != null)
- throw wrpEx;
- }
- }
-
- /** */
- private void onNodeLeft(UUID nodeId) {
- List<RemoteFragmentKey> fragments = null;
-
- synchronized (this) {
- for (RemoteFragmentKey fragment : waiting) {
- if (!fragment.nodeId.equals(nodeId))
- continue;
-
- if (fragments == null)
- fragments = new ArrayList<>();
-
- fragments.add(fragment);
- }
- }
-
- if (!F.isEmpty(fragments)) {
- ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException(
- "Failed to start query, node left. nodeId=" + nodeId);
-
- for (RemoteFragmentKey fragment : fragments)
- onResponse(fragment, ex);
- }
- }
-
- /** */
- private void onResponse(UUID nodeId, long fragmentId, Throwable error) {
- onResponse(new RemoteFragmentKey(nodeId, fragmentId), error);
- }
-
- /** */
- private void onResponse(RemoteFragmentKey fragment, Throwable error) {
- QueryState state;
- synchronized (this) {
- waiting.remove(fragment);
- state = this.state;
- }
-
- if (error != null)
- onError(error);
- else if (state == QueryState.CLOSING)
- tryClose();
- }
-
- /** */
- private void onError(Throwable error) {
- root.onError(error);
-
- tryClose();
- }
+ private void onNodeLeft(UUID nodeId) {
+ qryReg.runningQueries()
+ .forEach((qry) -> ((Query<Row>)qry).onNodeLeft(nodeId));
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/DdlCommandHandler.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/DdlCommandHandler.java
index eedd0bc..f9ce1b0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/DdlCommandHandler.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/DdlCommandHandler.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Supplier;
+
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.ignite.IgniteCheckedException;
@@ -44,7 +45,6 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryEntityEx;
import org.apache.ignite.internal.processors.query.QueryField;
import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.AlterTableAddCommand;
import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.AlterTableDropCommand;
import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.ColumnDefinition;
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.DropTable
import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.NativeCommandWrapper;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import org.apache.ignite.internal.processors.security.IgniteSecurity;
import org.apache.ignite.internal.util.typedef.F;
@@ -95,27 +96,27 @@ public class DdlCommandHandler {
}
/** */
- public void handle(UUID qryId, DdlCommand cmd, PlanningContext pctx) throws IgniteCheckedException {
+ public void handle(UUID qryId, DdlCommand cmd) throws IgniteCheckedException {
try {
if (cmd instanceof CreateTableCommand)
- handle0(pctx, (CreateTableCommand)cmd);
+ handle0((CreateTableCommand)cmd);
else if (cmd instanceof DropTableCommand)
- handle0(pctx, (DropTableCommand)cmd);
+ handle0((DropTableCommand)cmd);
else if (cmd instanceof AlterTableAddCommand)
- handle0(pctx, (AlterTableAddCommand)cmd);
+ handle0((AlterTableAddCommand)cmd);
else if (cmd instanceof AlterTableDropCommand)
- handle0(pctx, (AlterTableDropCommand)cmd);
+ handle0((AlterTableDropCommand)cmd);
else if (cmd instanceof NativeCommandWrapper)
- nativeCmdHnd.handle(qryId, (NativeCommandWrapper)cmd, pctx);
+ nativeCmdHnd.handle(qryId, (NativeCommandWrapper)cmd);
else {
throw new IgniteSQLException("Unsupported DDL operation [" +
"cmdName=" + (cmd == null ? null : cmd.getClass().getSimpleName()) + "; " +
- "querySql=\"" + pctx.query() + "\"]", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+ "cmd=\"" + cmd + "\"]", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
}
}
catch (SchemaOperationException e) {
@@ -124,7 +125,7 @@ public class DdlCommandHandler {
}
/** */
- private void handle0(PlanningContext pctx, CreateTableCommand cmd) throws IgniteCheckedException {
+ private void handle0(CreateTableCommand cmd) throws IgniteCheckedException {
security.authorize(cmd.cacheName(), SecurityPermission.CACHE_CREATE);
isDdlOnSchemaSupported(cmd.schemaName());
@@ -138,7 +139,7 @@ public class DdlCommandHandler {
CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(cmd.tableName());
- QueryEntity e = toQueryEntity(cmd, pctx);
+ QueryEntity e = toQueryEntity(cmd);
ccfg.setQueryEntities(Collections.singleton(e));
ccfg.setSqlSchema(cmd.schemaName());
@@ -178,7 +179,7 @@ public class DdlCommandHandler {
}
/** */
- private void handle0(PlanningContext pctx, DropTableCommand cmd) throws IgniteCheckedException {
+ private void handle0(DropTableCommand cmd) throws IgniteCheckedException {
isDdlOnSchemaSupported(cmd.schemaName());
Table tbl = schemaSupp.get().getSubSchema(cmd.schemaName()).getTable(cmd.tableName());
@@ -198,7 +199,7 @@ public class DdlCommandHandler {
}
/** */
- private void handle0(PlanningContext pctx, AlterTableAddCommand cmd) throws IgniteCheckedException {
+ private void handle0(AlterTableAddCommand cmd) throws IgniteCheckedException {
isDdlOnSchemaSupported(cmd.schemaName());
GridQueryTypeDescriptor typeDesc = schemaMgr.typeDescriptorForTable(cmd.schemaName(), cmd.tableName());
@@ -225,7 +226,7 @@ public class DdlCommandHandler {
continue;
}
- Type javaType = pctx.typeFactory().getResultClass(col.type());
+ Type javaType = Commons.typeFactory().getResultClass(col.type());
String typeName = javaType instanceof Class ? ((Class<?>)javaType).getName() : javaType.getTypeName();
@@ -256,7 +257,7 @@ public class DdlCommandHandler {
}
/** */
- private void handle0(PlanningContext pctx, AlterTableDropCommand cmd) throws IgniteCheckedException {
+ private void handle0(AlterTableDropCommand cmd) throws IgniteCheckedException {
isDdlOnSchemaSupported(cmd.schemaName());
GridQueryTypeDescriptor typeDesc = schemaMgr.typeDescriptorForTable(cmd.schemaName(), cmd.tableName());
@@ -308,7 +309,7 @@ public class DdlCommandHandler {
}
/** */
- private QueryEntity toQueryEntity(CreateTableCommand cmd, PlanningContext pctx) {
+ private QueryEntity toQueryEntity(CreateTableCommand cmd) {
QueryEntity res = new QueryEntity();
res.setTableName(cmd.tableName());
@@ -320,7 +321,7 @@ public class DdlCommandHandler {
Map<String, Integer> precision = new HashMap<>();
Map<String, Integer> scale = new HashMap<>();
- IgniteTypeFactory tf = pctx.typeFactory();
+ IgniteTypeFactory tf = Commons.typeFactory();
for (ColumnDefinition col : cmd.columns()) {
String name = col.name();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/NativeCommandHandler.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/NativeCommandHandler.java
index d2b0738..3cca9d6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/NativeCommandHandler.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/NativeCommandHandler.java
@@ -22,8 +22,6 @@ import java.util.UUID;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.GridQuerySchemaManager;
-import org.apache.ignite.internal.processors.query.SqlClientContext;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.NativeCommandWrapper;
import org.apache.ignite.internal.sql.SqlCommandProcessor;
@@ -44,11 +42,10 @@ public class NativeCommandHandler {
/**
* @param qryId Query id.
* @param cmd Native command.
- * @param pctx Planning context.
*/
- public FieldsQueryCursor<List<?>> handle(UUID qryId, NativeCommandWrapper cmd, PlanningContext pctx) {
+ public FieldsQueryCursor<List<?>> handle(UUID qryId, NativeCommandWrapper cmd) {
assert proc.isCommandSupported(cmd.command()) : cmd.command();
- return proc.runCommand(pctx.query(), cmd.command(), pctx.unwrap(SqlClientContext.class));
+ return proc.runCommand(cmd.command());
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
index b2f036a..6b7ea84 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
@@ -150,10 +150,12 @@ public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Sing
}
/** {@inheritDoc} */
- @Override protected void onErrorInternal(Throwable e) {
- U.error(context().logger(),
- "Error occurred during execution: " + X.getFullStackTrace(e));
+ @Override public void onError(Throwable e) {
+ onErrorInternal(e);
+ }
+ /** {@inheritDoc} */
+ @Override protected void onErrorInternal(Throwable e) {
try {
sendError(e);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
index e3c6a86..b330e12 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
@@ -45,7 +45,7 @@ public enum MessageType {
QUERY_INBOX_CANCEL_MESSAGE(305, InboxCloseMessage::new),
/** */
- QUERY_OUTBOX_CANCEL_MESSAGE(306, OutboxCloseMessage::new),
+ QUERY_CLOSE_MESSAGE(306, QueryCloseMessage::new),
/** */
GENERIC_VALUE_MESSAGE(307, GenericValueMessage::new),
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/OutboxCloseMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCloseMessage.java
similarity index 58%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/OutboxCloseMessage.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCloseMessage.java
index ba6021c..b80869b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/OutboxCloseMessage.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCloseMessage.java
@@ -19,54 +19,31 @@ package org.apache.ignite.internal.processors.query.calcite.message;
import java.nio.ByteBuffer;
import java.util.UUID;
-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
*
*/
-public class OutboxCloseMessage implements CalciteMessage {
- /** */
- private UUID queryId;
-
- /** */
- private long fragmentId;
-
+public class QueryCloseMessage implements CalciteMessage {
/** */
- private long exchangeId;
+ private UUID qryId;
/** */
- public OutboxCloseMessage() {
+ public QueryCloseMessage() {
// No-op.
}
/** */
- public OutboxCloseMessage(UUID queryId, long fragmentId, long exchangeId) {
- this.queryId = queryId;
- this.fragmentId = fragmentId;
- this.exchangeId = exchangeId;
+ public QueryCloseMessage(UUID qryId) {
+ this.qryId = qryId;
}
/**
* @return Query ID.
*/
public UUID queryId() {
- return queryId;
- }
-
- /**
- * @return Fragment ID.
- */
- public long fragmentId() {
- return fragmentId;
- }
-
- /**
- * @return Exchange ID.
- */
- public long exchangeId() {
- return exchangeId;
+ return qryId;
}
/** {@inheritDoc} */
@@ -82,23 +59,10 @@ public class OutboxCloseMessage implements CalciteMessage {
switch (writer.state()) {
case 0:
- if (!writer.writeLong("exchangeId", exchangeId))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeLong("fragmentId", fragmentId))
+ if (!writer.writeUuid("queryId", qryId))
return false;
writer.incrementState();
-
- case 2:
- if (!writer.writeUuid("queryId", queryId))
- return false;
-
- writer.incrementState();
-
}
return true;
@@ -113,23 +77,7 @@ public class OutboxCloseMessage implements CalciteMessage {
switch (reader.state()) {
case 0:
- exchangeId = reader.readLong("exchangeId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- fragmentId = reader.readLong("fragmentId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- queryId = reader.readUuid("queryId");
+ qryId = reader.readUuid("queryId");
if (!reader.isLastRead())
return false;
@@ -138,16 +86,16 @@ public class OutboxCloseMessage implements CalciteMessage {
}
- return reader.afterMessageRead(OutboxCloseMessage.class);
+ return reader.afterMessageRead(QueryCloseMessage.class);
}
/** {@inheritDoc} */
@Override public MessageType type() {
- return MessageType.QUERY_OUTBOX_CANCEL_MESSAGE;
+ return MessageType.QUERY_CLOSE_MESSAGE;
}
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 3;
+ return 1;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
index 7fb55d8..76318e9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
@@ -24,6 +24,7 @@ import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.util.typedef.F;
@@ -88,8 +89,8 @@ public abstract class AbstractMultiStepPlan implements MultiStepPlan {
}
/** {@inheritDoc} */
- @Override public void init(MappingQueryContext ctx) {
- executionPlan = queryTemplate.map(ctx);
+ @Override public void init(MappingService mappingService, MappingQueryContext ctx) {
+ executionPlan = queryTemplate.map(mappingService, ctx);
}
/** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DdlPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DdlPlan.java
index 79d3880..c6302e5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DdlPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DdlPlan.java
@@ -43,4 +43,9 @@ public class DdlPlan implements QueryPlan {
@Override public QueryPlan copy() {
return this;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return cmd.toString();
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
index c370afe..fc27362 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
/**
* Regular query or DML
@@ -54,5 +55,5 @@ public interface MultiStepPlan extends QueryPlan {
*
* @param ctx Planner context.
*/
- void init(MappingQueryContext ctx);
+ void init(MappingService mappingService, MappingQueryContext ctx);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PlannerHelper.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerHelper.java
similarity index 96%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PlannerHelper.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerHelper.java
index 0872969..f46610a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PlannerHelper.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerHelper.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.exec;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
import java.util.ArrayList;
import java.util.List;
@@ -33,9 +33,6 @@ import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.util.Pair;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
-import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteRelShuttle;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
import org.apache.ignite.internal.processors.query.calcite.rel.AbstractIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
index cde49b9..36c762d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
@@ -28,6 +29,7 @@ import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.RuleSet;
+import org.apache.calcite.util.CancelFlag;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.jetbrains.annotations.NotNull;
@@ -45,6 +47,9 @@ public final class PlanningContext implements Context {
private final Object[] parameters;
/** */
+ private final CancelFlag cancelFlag = new CancelFlag(new AtomicBoolean());
+
+ /** */
private Function<RuleSet, RuleSet> rulesFilter;
/** */
@@ -144,6 +149,9 @@ public final class PlanningContext implements Context {
if (aCls == getClass())
return aCls.cast(this);
+ if (aCls == CancelFlag.class)
+ return aCls.cast(cancelFlag);
+
return parentCtx.unwrap(aCls);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareService.java
similarity index 78%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareService.java
index 311ef65..7d6f75d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareService.java
@@ -15,17 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.schema;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
-import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
/**
*
*/
-public interface SchemaHolder extends Service {
+public interface PrepareService extends Service {
/**
- * @return Schema.
+ * Prepare query plan.
*/
- SchemaPlus schema();
+ QueryPlan prepareSingle(SqlNode sqlNode, PlanningContext ctx);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareServiceImpl.java
new file mode 100644
index 0000000..a02a715
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareServiceImpl.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlDdl;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.DdlSqlToCommandConverter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Collections.singletonList;
+import static org.apache.calcite.rel.type.RelDataType.PRECISION_NOT_SPECIFIED;
+import static org.apache.ignite.internal.processors.query.calcite.prepare.PlannerHelper.optimize;
+
+/**
+ *
+ */
+@SuppressWarnings("TypeMayBeWeakened")
+public class PrepareServiceImpl extends AbstractService implements PrepareService {
+ /** */
+ private final DdlSqlToCommandConverter ddlConverter;
+
+ /**
+ * @param ctx Kernal.
+ */
+ public PrepareServiceImpl(GridKernalContext ctx) {
+ super(ctx);
+
+ ddlConverter = new DdlSqlToCommandConverter();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onStart(GridKernalContext ctx) {
+ super.onStart(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public QueryPlan prepareSingle(SqlNode sqlNode, PlanningContext ctx) {
+ try {
+ assert single(sqlNode);
+
+ ctx.planner().reset();
+
+ if (SqlKind.DDL.contains(sqlNode.getKind()))
+ return prepareDdl(sqlNode, ctx);
+
+ switch (sqlNode.getKind()) {
+ case SELECT:
+ case ORDER_BY:
+ case WITH:
+ case VALUES:
+ case UNION:
+ case EXCEPT:
+ case INTERSECT:
+ return prepareQuery(sqlNode, ctx);
+
+ case INSERT:
+ case DELETE:
+ case UPDATE:
+ return prepareDml(sqlNode, ctx);
+
+ case EXPLAIN:
+ return prepareExplain(sqlNode, ctx);
+
+ default:
+ throw new IgniteSQLException("Unsupported operation [" +
+ "sqlNodeKind=" + sqlNode.getKind() + "; " +
+ "querySql=\"" + ctx.query() + "\"]", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+ }
+ }
+ catch (ValidationException | CalciteContextException e) {
+ throw new IgniteSQLException("Failed to validate query. " + e.getMessage(), IgniteQueryErrorCode.PARSING, e);
+ }
+ }
+
+ /**
+ *
+ */
+ private QueryPlan prepareDdl(SqlNode sqlNode, PlanningContext ctx) {
+ assert sqlNode instanceof SqlDdl : sqlNode == null ? "null" : sqlNode.getClass().getName();
+
+ return new DdlPlan(ddlConverter.convert((SqlDdl)sqlNode, ctx));
+ }
+
+ /**
+ *
+ */
+ private QueryPlan prepareExplain(SqlNode explain, PlanningContext ctx) throws ValidationException {
+ IgnitePlanner planner = ctx.planner();
+
+ SqlNode sql = ((SqlExplain)explain).getExplicandum();
+
+ // Validate
+ sql = planner.validate(sql);
+
+ // Convert to Relational operators graph
+ IgniteRel igniteRel = optimize(sql, planner, log);
+
+ String plan = RelOptUtil.toString(igniteRel, SqlExplainLevel.ALL_ATTRIBUTES);
+
+ return new ExplainPlan(plan, explainFieldsMetadata(ctx));
+ }
+
+ /** */
+ private boolean single(SqlNode sqlNode) {
+ return !(sqlNode instanceof SqlNodeList);
+ }
+
+ /** */
+ private QueryPlan prepareQuery(SqlNode sqlNode, PlanningContext ctx) {
+ IgnitePlanner planner = ctx.planner();
+
+ // Validate
+ ValidationResult validated = planner.validateAndGetTypeMetadata(sqlNode);
+
+ sqlNode = validated.sqlNode();
+
+ IgniteRel igniteRel = optimize(sqlNode, planner, log);
+
+ // Split query plan to query fragments.
+ List<Fragment> fragments = new Splitter().go(igniteRel);
+
+ QueryTemplate template = new QueryTemplate(fragments);
+
+ return new MultiStepQueryPlan(template, queryFieldsMetadata(ctx, validated.dataType(), validated.origins()));
+ }
+
+ /** */
+ private QueryPlan prepareDml(SqlNode sqlNode, PlanningContext ctx) throws ValidationException {
+ IgnitePlanner planner = ctx.planner();
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ IgniteRel igniteRel = optimize(sqlNode, planner, log);
+
+ // Split query plan to query fragments.
+ List<Fragment> fragments = new Splitter().go(igniteRel);
+
+ QueryTemplate template = new QueryTemplate(fragments);
+
+ return new MultiStepDmlPlan(template, queryFieldsMetadata(ctx, igniteRel.getRowType(), null));
+ }
+
+ /** */
+ private FieldsMetadata queryFieldsMetadata(PlanningContext ctx, RelDataType sqlType,
+ @Nullable List<List<String>> origins) {
+ RelDataType resultType = TypeUtils.getResultType(
+ ctx.typeFactory(), ctx.catalogReader(), sqlType, origins);
+
+ return new FieldsMetadataImpl(resultType, origins);
+ }
+
+ /** */
+ private FieldsMetadata explainFieldsMetadata(PlanningContext ctx) {
+ IgniteTypeFactory factory = ctx.typeFactory();
+ RelDataType planStrDataType =
+ factory.createSqlType(SqlTypeName.VARCHAR, PRECISION_NOT_SPECIFIED);
+ T2<String, RelDataType> planField = new T2<>(ExplainPlan.PLAN_COL_NAME, planStrDataType);
+ RelDataType planDataType = factory.createStructType(singletonList(planField));
+
+ return queryFieldsMetadata(ctx, planDataType, null);
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
index 12c00f8..ac8bbc4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
+
import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -37,18 +38,13 @@ import org.jetbrains.annotations.NotNull;
/** */
public class QueryTemplate {
/** */
- private final MappingService mappingService;
-
- /** */
private final ImmutableList<Fragment> fragments;
/** */
private final AtomicReference<ExecutionPlan> executionPlan = new AtomicReference<>();
/** */
- public QueryTemplate(MappingService mappingService, List<Fragment> fragments) {
- this.mappingService = mappingService;
-
+ public QueryTemplate(List<Fragment> fragments) {
ImmutableList.Builder<Fragment> b = ImmutableList.builder();
for (Fragment fragment : fragments)
b.add(fragment.copy());
@@ -57,7 +53,7 @@ public class QueryTemplate {
}
/** */
- public ExecutionPlan map(MappingQueryContext ctx) {
+ public ExecutionPlan map(MappingService mappingService, MappingQueryContext ctx) {
ExecutionPlan executionPlan = this.executionPlan.get();
if (executionPlan != null && Objects.equals(executionPlan.topologyVersion(), ctx.topologyVersion()))
@@ -70,7 +66,7 @@ public class QueryTemplate {
for (int i = 0; i < 3; i++) {
try {
- ExecutionPlan executionPlan0 = new ExecutionPlan(ctx.topologyVersion(), map(fragments, ctx, mq));
+ ExecutionPlan executionPlan0 = new ExecutionPlan(ctx.topologyVersion(), map(mappingService, fragments, ctx, mq));
if (executionPlan == null || executionPlan.topologyVersion().before(executionPlan0.topologyVersion()))
this.executionPlan.compareAndSet(executionPlan, executionPlan0);
@@ -91,7 +87,12 @@ public class QueryTemplate {
}
/** */
- @NotNull private List<Fragment> map(List<Fragment> fragments, MappingQueryContext ctx, RelMetadataQuery mq) {
+ @NotNull private List<Fragment> map(
+ MappingService mappingService,
+ List<Fragment> fragments,
+ MappingQueryContext ctx,
+ RelMetadataQuery mq
+ ) {
ImmutableList.Builder<Fragment> b = ImmutableList.builder();
for (Fragment fragment : fragments)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/patch/AggregateExpandDistinctAggregatesRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/patch/AggregateExpandDistinctAggregatesRule.java
index f615bc8..d67c7ab 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/patch/AggregateExpandDistinctAggregatesRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/patch/AggregateExpandDistinctAggregatesRule.java
@@ -30,6 +30,7 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
index 311ef65..2cb4eec 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
@@ -19,13 +19,14 @@ package org.apache.ignite.internal.processors.query.calcite.schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
+import org.jetbrains.annotations.Nullable;
/**
*
*/
public interface SchemaHolder extends Service {
/**
- * @return Schema.
+ * @return Specified schema if the schema name is specified or default schema when {@code schema} is {@code null}.
*/
- SchemaPlus schema();
+ SchemaPlus schema(@Nullable String schema);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
index 84c597b..1e618ba 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
@@ -153,11 +153,6 @@ public class SchemaHolderImpl extends AbstractService implements SchemaHolder, S
}
/** {@inheritDoc} */
- @Override public SchemaPlus schema() {
- return calciteSchema;
- }
-
- /** {@inheritDoc} */
@Override public synchronized void onSchemaCreated(String schemaName) {
igniteSchemas.putIfAbsent(schemaName, new IgniteSchema(schemaName));
rebuild();
@@ -269,6 +264,11 @@ public class SchemaHolderImpl extends AbstractService implements SchemaHolder, S
rebuild();
}
+ /** {@inheritDoc} */
+ @Override public SchemaPlus schema(@Nullable String schema) {
+ return schema != null ? calciteSchema.getSubSchema(schema) : calciteSchema;
+ }
+
/** */
private void rebuild() {
SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index aacefa0..d9d5e4d 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -246,16 +246,16 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
assertEquals(1, res.size());
assertEquals(1, res.get(0).size());
assertEquals(40L, res.get(0).get(0));
+
+ awaitReservationsRelease("RISK");
+ awaitReservationsRelease("TRADE");
+ awaitReservationsRelease("BATCH");
+
+ assertFalse(lsnr.check());
}
}
- assertFalse(lsnr.check());
-
listeningLog.clearListeners();
-
- awaitReservationsRelease("RISK");
- awaitReservationsRelease("TRADE");
- awaitReservationsRelease("BATCH");
}
/**
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
index be86124..899747d 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
import org.junit.Test;
import static java.util.Collections.singletonList;
@@ -147,6 +148,9 @@ public class CancelTest extends GridCommonAbstractTest {
fail("Unexpected exception: " + ex);
}
+ Assert.assertTrue(GridTestUtils.waitForCondition(
+ () -> engine.runningQueries().isEmpty(), 10_000));
+
awaitReservationsRelease(grid(0), "TEST");
}
@@ -168,6 +172,11 @@ public class CancelTest extends GridCommonAbstractTest {
stopGrid(0);
+ QueryEngine engine1 = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
+
+ Assert.assertTrue(GridTestUtils.waitForCondition(
+ () -> engine1.runningQueries().isEmpty(), 10_000));
+
awaitReservationsRelease(grid(1), "TEST");
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
index e814c8c..4649703 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
@@ -485,6 +485,6 @@ public abstract class QueryChecker {
}, PART_RELEASE_TIMEOUT);
for (GridDhtLocalPartition p : parts)
- assertEquals("Partition is reserved: " + p, 0, p.reservations());
+ assertEquals("Partition is reserved: [node=" + node.name() + ", part=" + p, 0, p.reservations());
}
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 588602e..b733b6f 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.integration;
import java.util.List;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.QueryEntity;
@@ -131,6 +132,25 @@ public class AbstractBasicIntegrationTest extends GridCommonAbstractTest {
}
/** */
+ protected CalciteQueryProcessor queryProcessor(IgniteEx ignite) {
+ return Commons.lookupComponent(ignite.context(), CalciteQueryProcessor.class);
+ }
+
+ /** */
+ protected List<List<?>> sql(String sql, Object... params) {
+ return sql(client, sql, params);
+ }
+
+ /** */
+ protected List<List<?>> sql(IgniteEx ignite, String sql, Object... params) {
+ List<FieldsQueryCursor<List<?>>> cur = queryProcessor(ignite).query(null, "PUBLIC", sql, params);
+
+ try (QueryCursor<List<?>> srvCursor = cur.get(0)) {
+ return srvCursor.getAll();
+ }
+ }
+
+ /** */
public static class Employer {
/** */
@QuerySqlField
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractDdlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractDdlIntegrationTest.java
index c89bbab..5496b9c 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractDdlIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractDdlIntegrationTest.java
@@ -16,40 +16,29 @@
*/
package org.apache.ignite.internal.processors.query.calcite.integration;
-import java.util.List;
-import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.SqlConfiguration;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.After;
-import org.junit.Before;
/** */
-public class AbstractDdlIntegrationTest extends GridCommonAbstractTest {
- /** */
- private static final String CLIENT_NODE_NAME = "client";
-
+public class AbstractDdlIntegrationTest extends AbstractBasicIntegrationTest {
/** */
protected static final String DATA_REGION_NAME = "test_data_region";
/** */
protected static final String PERSISTENT_DATA_REGION = "pds_data_region";
- /** */
- protected IgniteEx client;
+ /** {@inheritDoc} */
+ @Override protected int nodeCount() {
+ return 1;
+ }
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
- startGrids(2);
-
- client = startClientGrid(CLIENT_NODE_NAME);
+ super.beforeTestsStarted();
client.cluster().state(ClusterState.ACTIVE);
}
@@ -77,33 +66,8 @@ public class AbstractDdlIntegrationTest extends GridCommonAbstractTest {
}
/** */
- @Before
- public void init() {
- client = grid(CLIENT_NODE_NAME);
- }
-
- /** */
@After
public void cleanUp() {
client.destroyCaches(client.cacheNames());
}
-
- /** */
- protected List<List<?>> executeSql(String sql, Object... params) {
- return executeSql(client, sql, params);
- }
-
- /** */
- protected List<List<?>> executeSql(IgniteEx ignite, String sql, Object... params) {
- List<FieldsQueryCursor<List<?>>> cur = queryProcessor(ignite).query(null, "PUBLIC", sql, params);
-
- try (QueryCursor<List<?>> srvCursor = cur.get(0)) {
- return srvCursor.getAll();
- }
- }
-
- /** */
- private CalciteQueryProcessor queryProcessor(IgniteEx ignite) {
- return Commons.lookupComponent(ignite.context(), CalciteQueryProcessor.class);
- }
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexDdlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexDdlIntegrationTest.java
index a7940a3..cc89fc8 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexDdlIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexDdlIntegrationTest.java
@@ -37,10 +37,8 @@ public class IndexDdlIntegrationTest extends AbstractDdlIntegrationTest {
private static final String CACHE_NAME = "my_cache";
/** {@inheritDoc} */
- @Override public void init() {
- super.init();
-
- executeSql("create table my_table(id int, val_int int, val_str varchar) with cache_name=\"" + CACHE_NAME + "\"");
+ @Override protected void beforeTest() throws Exception {
+ sql("create table my_table(id int, val_int int, val_str varchar) with cache_name=\"" + CACHE_NAME + "\"");
}
/**
@@ -50,11 +48,11 @@ public class IndexDdlIntegrationTest extends AbstractDdlIntegrationTest {
public void createDropIndexSimpleCase() {
assertNull(findIndex(CACHE_NAME, "my_index"));
- executeSql("create index my_index on my_table(id)");
+ sql("create index my_index on my_table(id)");
assertNotNull(findIndex(CACHE_NAME, "my_index"));
- executeSql("drop index my_index");
+ sql("drop index my_index");
assertNull(findIndex(CACHE_NAME, "my_index"));
}
@@ -66,20 +64,20 @@ public class IndexDdlIntegrationTest extends AbstractDdlIntegrationTest {
public void createDropIndexWithSchema() {
String cacheName = "cache2";
- executeSql("create table my_schema.my_table2(id int) with cache_name=\"" + cacheName + "\"");
+ sql("create table my_schema.my_table2(id int) with cache_name=\"" + cacheName + "\"");
assertNull(findIndex(cacheName, "my_index2"));
- executeSql("create index my_index2 on my_schema.my_table2(id)");
+ sql("create index my_index2 on my_schema.my_table2(id)");
assertNotNull(findIndex(cacheName, "my_index2"));
- GridTestUtils.assertThrowsAnyCause(log, () -> executeSql("drop index my_index2"), IgniteSQLException.class,
+ GridTestUtils.assertThrowsAnyCause(log, () -> sql("drop index my_index2"), IgniteSQLException.class,
"Index doesn't exist");
assertNotNull(findIndex(cacheName, "my_index2"));
- executeSql("drop index my_schema.my_index2");
+ sql("drop index my_schema.my_index2");
assertNull(findIndex(cacheName, "my_index2"));
}
@@ -91,14 +89,14 @@ public class IndexDdlIntegrationTest extends AbstractDdlIntegrationTest {
public void createIndexWithIfNotExistsClause() {
assertNull(findIndex(CACHE_NAME, "my_index"));
- executeSql("create index if not exists my_index on my_table(id)");
+ sql("create index if not exists my_index on my_table(id)");
- GridTestUtils.assertThrowsAnyCause(log, () -> executeSql("create index my_index on my_table(val_int)"),
+ GridTestUtils.assertThrowsAnyCause(log, () -> sql("create index my_index on my_table(val_int)"),
IgniteSQLException.class, "Index already exists");
assertNotNull(findIndex(CACHE_NAME, "my_index"));
- executeSql("create index if not exists my_index on my_table(val_str)");
+ sql("create index if not exists my_index on my_table(val_str)");
Index idx = findIndex(CACHE_NAME, "my_index");
@@ -116,17 +114,17 @@ public class IndexDdlIntegrationTest extends AbstractDdlIntegrationTest {
public void dropIndexWithIfExistsClause() {
assertNull(findIndex(CACHE_NAME, "my_index"));
- executeSql("create index my_index on my_table(id)");
+ sql("create index my_index on my_table(id)");
assertNotNull(findIndex(CACHE_NAME, "my_index"));
- executeSql("drop index if exists my_index");
+ sql("drop index if exists my_index");
assertNull(findIndex(CACHE_NAME, "my_index"));
- executeSql("drop index if exists my_index");
+ sql("drop index if exists my_index");
- GridTestUtils.assertThrowsAnyCause(log, () -> executeSql("drop index my_index"), IgniteSQLException.class,
+ GridTestUtils.assertThrowsAnyCause(log, () -> sql("drop index my_index"), IgniteSQLException.class,
"Index doesn't exist");
}
@@ -137,7 +135,7 @@ public class IndexDdlIntegrationTest extends AbstractDdlIntegrationTest {
public void createIndexWithColumnsOrdering() {
assertNull(findIndex(CACHE_NAME, "my_index"));
- executeSql("create index my_index on my_table(id, val_int asc, val_str desc)");
+ sql("create index my_index on my_table(id, val_int asc, val_str desc)");
Index idx = findIndex(CACHE_NAME, "my_index");
@@ -161,7 +159,7 @@ public class IndexDdlIntegrationTest extends AbstractDdlIntegrationTest {
public void createIndexWithInlineSize() {
assertNull(findIndex(CACHE_NAME, "my_index"));
- executeSql("create index my_index on my_table(val_str) inline_size 10");
+ sql("create index my_index on my_table(val_str) inline_size 10");
Index idx = findIndex(CACHE_NAME, "my_index");
@@ -180,7 +178,7 @@ public class IndexDdlIntegrationTest extends AbstractDdlIntegrationTest {
public void createIndexWithParallel() {
assertNull(findIndex(CACHE_NAME, "my_index"));
- executeSql("create index my_index on my_table(val_str) parallel 10");
+ sql("create index my_index on my_table(val_str) parallel 10");
assertNotNull(findIndex(CACHE_NAME, "my_index"));
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillCommandDdlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillCommandDdlIntegrationTest.java
index 120df5a..cbd9dc1 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillCommandDdlIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillCommandDdlIntegrationTest.java
@@ -82,6 +82,11 @@ public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
cache.put(i, i);
}
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ // No-op.
+ }
+
/** */
@Override public void cleanUp() {
// No-op.
@@ -105,7 +110,7 @@ public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
long qryId = qryView.queryId();
UUID originNodeId = qryView.originNodeId();
- executeSql(client, "KILL SCAN '" + originNodeId + "' '" + DEFAULT_CACHE_NAME + "' " + qryId);
+ sql(client, "KILL SCAN '" + originNodeId + "' '" + DEFAULT_CACHE_NAME + "' " + qryId);
// Fetch all cached entries.
for (int i = 0; i < PAGE_SZ * servers().size() - 1; i++)
@@ -148,7 +153,7 @@ public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
assertTrue(res);
- executeSql(client, "KILL COMPUTE '" + jobViewHolder.get().id() + "'");
+ sql(client, "KILL COMPUTE '" + jobViewHolder.get().id() + "'");
assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class);
}
@@ -167,7 +172,7 @@ public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
try (Transaction tx = client.transactions().txStart()) {
cache.put(testKey, 1);
- executeSql(client, "KILL TRANSACTION '" + tx.xid() + "'");
+ sql(client, "KILL TRANSACTION '" + tx.xid() + "'");
assertThrowsWithCause(tx::commit, IgniteException.class);
}
@@ -191,7 +196,7 @@ public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
TestService svc = client.services().serviceProxy(serviceName, TestService.class, true);
assertNotNull(svc);
- executeSql(client, "KILL SERVICE '" + serviceName + "'");
+ sql(client, "KILL SERVICE '" + serviceName + "'");
boolean res = waitForCondition(() -> grid(0).context().systemView().view(SVCS_VIEW).size() == 0, TIMEOUT);
assertTrue(res);
@@ -232,7 +237,7 @@ public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
UUID nodeId = cqView.nodeId();
UUID routineId = cqView.routineId();
- executeSql(client, "KILL CONTINUOUS '" + nodeId + "' '" + routineId + "'");
+ sql(client, "KILL CONTINUOUS '" + nodeId + "' '" + routineId + "'");
long cnt = cntr.get();
@@ -247,31 +252,31 @@ public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
/** */
@Test
public void testCancelUnknownScanQuery() {
- executeSql(client, "KILL SCAN '" + client.localNode().id() + "' 'unknown' 1");
+ sql(client, "KILL SCAN '" + client.localNode().id() + "' 'unknown' 1");
}
/** */
@Test
public void testCancelUnknownComputeTask() {
- executeSql(client, "KILL COMPUTE '" + IgniteUuid.randomUuid() + "'");
+ sql(client, "KILL COMPUTE '" + IgniteUuid.randomUuid() + "'");
}
/** */
@Test
public void testCancelUnknownService() {
- executeSql(client, "KILL SERVICE 'unknown'");
+ sql(client, "KILL SERVICE 'unknown'");
}
/** */
@Test
public void testCancelUnknownTx() {
- executeSql(client, "KILL TRANSACTION '" + IgniteUuid.randomUuid() + "'");
+ sql(client, "KILL TRANSACTION '" + IgniteUuid.randomUuid() + "'");
}
/** */
@Test
public void testCancelUnknownContinuousQuery() {
- executeSql(client, "KILL CONTINUOUS '" + grid(0).localNode().id() + "' '" + UUID.randomUUID() + "'");
+ sql(client, "KILL CONTINUOUS '" + grid(0).localNode().id() + "' '" + UUID.randomUUID() + "'");
}
/** */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
new file mode 100644
index 0000000..380a1c2
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ *
+ */
+public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest {
+ /** */
+ private static IgniteEx srv;
+
+ /** Timeout in ms for async operations. */
+ private static final long TIMEOUT_IN_MS = 10_000;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ srv = grid(0);
+ }
+
+ /**
+ * Execute query with a lot of JOINs to produce very long planning phase.
+ * Cancel query on planning phase and check query registry is empty on the all nodes of the cluster.
+ */
+ @Test
+ public void testCancelAtPlanningPhase() throws IgniteCheckedException {
+ QueryEngine engine = queryProcessor(client);
+ int cnt = 9;
+
+ for (int i = 0; i < cnt; i++)
+ sql("CREATE TABLE test_tbl" + i + " (id int, val varchar)");
+
+ String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "test_tbl" + i + " p" + i).collect(joining(", "));
+ String sql = "SELECT * FROM " + bigJoin;
+
+ IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql));
+
+ Assert.assertTrue(GridTestUtils.waitForCondition(
+ () -> !engine.runningQueries().isEmpty() || fut.isDone(), TIMEOUT_IN_MS));
+
+ Collection<? extends RunningQuery> running = engine.runningQueries();
+
+ assertEquals("Running: " + running, 1, running.size());
+
+ RunningQuery qry = F.first(running);
+
+ assertSame(qry, engine.runningQuery(qry.id()));
+
+ // Waits for planning.
+ Assert.assertTrue(GridTestUtils.waitForCondition(
+ () -> qry.state() == QueryState.PLANNING, TIMEOUT_IN_MS));
+
+ qry.cancel();
+
+ Assert.assertTrue(GridTestUtils.waitForCondition(
+ () -> engine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+ GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(0), IgniteSQLException.class, "The query was cancelled while planning");
+ }
+
+ /**
+ * Execute query with a lot of JOINs to produce very long excution phase.
+ * Cancel query on execution phase and check query registry is empty on the all nodes of the cluster.
+ */
+ @Test
+ public void testCancelAtExecutionPhase() throws IgniteCheckedException {
+ QueryEngine cliEngine = queryProcessor(client);
+ QueryEngine srvEngine = queryProcessor(srv);
+ int cnt = 6;
+
+ sql("CREATE TABLE person (id int, val varchar)");
+
+ String data = IntStream.range(0, 1000).mapToObj((i) -> "(" + i + "," + i + ")").collect(joining(", "));
+ String insertSql = "INSERT INTO person (id, val) VALUES " + data;
+
+ sql(insertSql);
+
+ String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "person p" + i).collect(joining(", "));
+ String sql = "SELECT * FROM " + bigJoin;
+
+ IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql));
+
+ // The query is executing on client.
+ Assert.assertTrue(GridTestUtils.waitForCondition(
+ () -> {
+ Collection<? extends RunningQuery> queries = cliEngine.runningQueries();
+
+ return !queries.isEmpty() && F.first(queries).state() == QueryState.EXECUTING;
+ },
+ TIMEOUT_IN_MS));
+
+ // The query is executing on sever.
+ Assert.assertTrue(GridTestUtils.waitForCondition(
+ () -> {
+ Collection<? extends RunningQuery> queries = srvEngine.runningQueries();
+
+ return !queries.isEmpty() && F.first(queries).state() == QueryState.EXECUTING;
+ },
+ TIMEOUT_IN_MS));
+
+ Collection<? extends RunningQuery> running = cliEngine.runningQueries();
+
+ assertEquals(1, running.size());
+
+ RunningQuery qry = F.first(running);
+
+ assertSame(qry, cliEngine.runningQuery(qry.id()));
+
+ qry.cancel();
+
+ Assert.assertTrue(GridTestUtils.waitForCondition(
+ () -> cliEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+ Assert.assertTrue(GridTestUtils.waitForCondition(
+ () -> srvEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+ GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(100), IgniteSQLException.class, "The query was cancelled while executing.");
+ }
+
+ /**
+ * Execute query with a lot of JOINs to produce very long excution phase.
+ * Cancel query on execution phase on remote node (no query originator node)
+ * and check query registry is empty on the all nodes of the cluster.
+ */
+ @Test
+ public void testCancelByRemoteFragment() throws IgniteCheckedException {
+ QueryEngine clientEngine = queryProcessor(client);
+ QueryEngine serverEngine = queryProcessor(srv);
+ int cnt = 6;
+
+ sql("CREATE TABLE t (id int, val varchar)");
+
+ String data = IntStream.range(0, 10000).mapToObj((i) -> "(" + i + ",'" + i + "')").collect(joining(", "));
+ String insertSql = "INSERT INTO t (id, val) VALUES " + data;
+
+ sql(insertSql);
+
+ String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "t t" + i).collect(joining(", "));
+ String sql = "SELECT * FROM " + bigJoin;
+
+ IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql));
+
+ Assert.assertTrue(GridTestUtils.waitForCondition(
+ () -> {
+ Collection<? extends RunningQuery> queries = clientEngine.runningQueries();
+
+ return !queries.isEmpty() && F.first(queries).state() == QueryState.EXECUTING;
+ },
+ TIMEOUT_IN_MS));
+
+ Assert.assertTrue(GridTestUtils.waitForCondition(() -> !serverEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+ Collection<? extends RunningQuery> running = serverEngine.runningQueries();
+ RunningQuery qry = F.first(running);
+
+ assertSame(qry, serverEngine.runningQuery(qry.id()));
+
+ qry.cancel();
+
+ Assert.assertTrue(GridTestUtils.waitForCondition(
+ () -> clientEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+ Assert.assertTrue(GridTestUtils.waitForCondition(
+ () -> serverEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+ GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(100), IgniteSQLException.class, "The query was cancelled while executing.");
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SetOpIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SetOpIntegrationTest.java
index f56a3d1..0a85fae 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SetOpIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SetOpIntegrationTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.integration;
import java.util.List;
+
import javax.cache.Cache;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
index b7c02c4..57ac900 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
@@ -60,7 +60,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
public void createTableSimpleCase() {
Set<String> cachesBefore = new HashSet<>(client.cacheNames());
- executeSql("create table my_table (id int primary key, val varchar)");
+ sql("create table my_table (id int primary key, val varchar)");
Set<String> newCaches = new HashSet<>(client.cacheNames());
newCaches.removeAll(cachesBefore);
@@ -91,7 +91,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
*/
@Test
public void createTableDifferentDataTypes() {
- executeSql("create table my_table (" +
+ sql("create table my_table (" +
"id int primary key, " +
"c1 varchar, " +
"c2 date, " +
@@ -107,7 +107,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
"c12 decimal(20, 10) " +
")");
- executeSql("insert into my_table values (" +
+ sql("insert into my_table values (" +
"0, " +
"'test', " +
"date '2021-01-01', " +
@@ -123,7 +123,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
"1234567890.1234567890" +
")");
- List<List<?>> res = executeSql("select * from my_table");
+ List<List<?>> res = sql("select * from my_table");
assertEquals(1, res.size());
@@ -152,7 +152,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
public void createTableWithOptions() {
Set<String> cachesBefore = new HashSet<>(client.cacheNames());
- executeSql("create table my_table (id1 int, id2 int, val varchar, primary key(id1, id2)) with " +
+ sql("create table my_table (id1 int, id2 int, val varchar, primary key(id1, id2)) with " +
" backups=2," +
" affinity_key=id2," +
" atomicity=transactional," +
@@ -199,8 +199,8 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
public void createTableWithTemplate() {
Set<String> cachesBefore = new HashSet<>(client.cacheNames());
- executeSql("create table my_table_replicated (id int, val varchar) with template=replicated, cache_name=repl");
- executeSql("create table my_table_partitioned (id int, val varchar) with template=partitioned, cache_name=part");
+ sql("create table my_table_replicated (id int, val varchar) with template=replicated, cache_name=repl");
+ sql("create table my_table_partitioned (id int, val varchar) with template=partitioned, cache_name=part");
Set<String> newCaches = new HashSet<>(client.cacheNames());
newCaches.removeAll(cachesBefore);
@@ -224,13 +224,13 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
@Test
@SuppressWarnings("ThrowableNotThrown")
public void createTableIfNotExists() {
- executeSql("create table my_table (id int, val varchar)");
+ sql("create table my_table (id int, val varchar)");
GridTestUtils.assertThrows(log,
- () -> executeSql("create table my_table (id int, val varchar)"),
+ () -> sql("create table my_table (id int, val varchar)"),
IgniteSQLException.class, "Table already exists: MY_TABLE");
- executeSql("create table if not exists my_table (id int, val varchar)");
+ sql("create table if not exists my_table (id int, val varchar)");
}
/**
@@ -238,12 +238,12 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
*/
@Test
public void createTableWithoutPk() {
- executeSql("create table my_table (f1 int, f2 varchar)");
+ sql("create table my_table (f1 int, f2 varchar)");
- executeSql("insert into my_table(f1, f2) values (1, '1'),(2, '2')");
- executeSql("insert into my_table values (1, '1'),(2, '2')");
+ sql("insert into my_table(f1, f2) values (1, '1'),(2, '2')");
+ sql("insert into my_table values (1, '1'),(2, '2')");
- assertThat(executeSql("select * from my_table"), hasSize(4));
+ assertThat(sql("select * from my_table"), hasSize(4));
}
/**
@@ -251,12 +251,12 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
*/
@Test
public void createTableCustomSchema() {
- executeSql("create table my_schema.my_table (f1 int, f2 varchar)");
+ sql("create table my_schema.my_table (f1 int, f2 varchar)");
- executeSql("insert into my_schema.my_table(f1, f2) values (1, '1'),(2, '2')");
- executeSql("insert into my_schema.my_table values (1, '1'),(2, '2')");
+ sql("insert into my_schema.my_table(f1, f2) values (1, '1'),(2, '2')");
+ sql("insert into my_schema.my_table values (1, '1'),(2, '2')");
- assertThat(executeSql("select * from my_schema.my_table"), hasSize(4));
+ assertThat(sql("select * from my_schema.my_table"), hasSize(4));
}
/**
@@ -266,11 +266,11 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
public void createTableOnExistingCache() {
IgniteCache<Object, Object> cache = client.getOrCreateCache("my_cache");
- executeSql("create table my_schema.my_table (f1 int, f2 varchar) with cache_name=\"my_cache\"");
+ sql("create table my_schema.my_table (f1 int, f2 varchar) with cache_name=\"my_cache\"");
- executeSql("insert into my_schema.my_table(f1, f2) values (1, '1'),(2, '2')");
+ sql("insert into my_schema.my_table(f1, f2) values (1, '1'),(2, '2')");
- assertThat(executeSql("select * from my_schema.my_table"), hasSize(2));
+ assertThat(sql("select * from my_schema.my_table"), hasSize(2));
assertEquals(2, cache.size(CachePeekMode.PRIMARY));
}
@@ -280,8 +280,8 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
*/
@Test
public void createTableAsSelectSimpleCase() {
- executeSql("create table my_table as select 1 as i, 'test' as s");
- List<List<?>> res = executeSql("select i, s from my_table");
+ sql("create table my_table as select 1 as i, 'test' as s");
+ List<List<?>> res = sql("select i, s from my_table");
assertEquals(1, res.size());
@@ -296,8 +296,8 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
*/
@Test
public void createTableAsSelectWithColumns() {
- executeSql("create table my_table(i, s) as select 1 as a, 'test' as b");
- List<List<?>> res = executeSql("select i, s from my_table");
+ sql("create table my_table(i, s) as select 1 as a, 'test' as b");
+ List<List<?>> res = sql("select i, s from my_table");
assertEquals(1, res.size());
@@ -315,8 +315,8 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
@SuppressWarnings("unchecked")
@Test
public void createTableAsSelectWithOptions() {
- executeSql("create table my_table(s, i) with cache_name=\"CacheWithOpts\", cache_group=\"CacheGroup\" as select '1', 1");
- List<List<?>> res = executeSql("select * from my_table");
+ sql("create table my_table(s, i) with cache_name=\"CacheWithOpts\", cache_group=\"CacheGroup\" as select '1', 1");
+ List<List<?>> res = sql("select * from my_table");
assertEquals(1, res.size());
@@ -335,8 +335,8 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
*/
@Test
public void createTableAsSelectWithParameters() {
- executeSql("create table my_table(s, i) as select cast(? as varchar), cast(? as int)", "a", 1);
- List<List<?>> res = executeSql("select * from my_table");
+ sql("create table my_table(s, i) as select cast(? as varchar), cast(? as int)", "a", 1);
+ List<List<?>> res = sql("select * from my_table");
assertEquals(1, res.size());
@@ -350,7 +350,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
@Test
public void createTableAsSelectWrongColumnsCount() {
GridTestUtils.assertThrowsAnyCause(log,
- () -> executeSql("create table my_table(i, s1, s2) as select 1, 'test'"),
+ () -> sql("create table my_table(i, s1, s2) as select 1, 'test'"),
IgniteSQLException.class, "Number of columns");
}
@@ -359,13 +359,13 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
*/
@Test
public void createTableAsSelectFromDistributedTable() {
- executeSql("create table my_table1(i) as select x from table(system_range(1, 100))");
+ sql("create table my_table1(i) as select x from table(system_range(1, 100))");
- assertEquals(100L, executeSql("select count(*) from my_table1").get(0).get(0));
+ assertEquals(100L, sql("select count(*) from my_table1").get(0).get(0));
- executeSql("create table my_table2(i) as select * from my_table1");
+ sql("create table my_table2(i) as select * from my_table1");
- assertEquals(100L, executeSql("select count(*) from my_table2").get(0).get(0));
+ assertEquals(100L, sql("select count(*) from my_table2").get(0).get(0));
}
/**
@@ -375,7 +375,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
public void dropTableDefaultSchema() {
Set<String> cachesBefore = new HashSet<>(client.cacheNames());
- executeSql("create table my_table (id int primary key, val varchar)");
+ sql("create table my_table (id int primary key, val varchar)");
Set<String> cachesAfter = new HashSet<>(client.cacheNames());
cachesAfter.removeAll(cachesBefore);
@@ -384,7 +384,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
String createdCacheName = cachesAfter.iterator().next();
- executeSql("drop table my_table");
+ sql("drop table my_table");
cachesAfter = new HashSet<>(client.cacheNames());
cachesAfter.removeAll(cachesBefore);
@@ -401,7 +401,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
public void dropTableCustomSchema() {
Set<String> cachesBefore = new HashSet<>(client.cacheNames());
- executeSql("create table my_schema.my_table (id int primary key, val varchar)");
+ sql("create table my_schema.my_table (id int primary key, val varchar)");
Set<String> cachesAfter = new HashSet<>(client.cacheNames());
cachesAfter.removeAll(cachesBefore);
@@ -410,7 +410,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
String createdCacheName = cachesAfter.iterator().next();
- executeSql("drop table my_schema.my_table");
+ sql("drop table my_schema.my_table");
cachesAfter = new HashSet<>(client.cacheNames());
cachesAfter.removeAll(cachesBefore);
@@ -427,19 +427,19 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
@Test
@SuppressWarnings("ThrowableNotThrown")
public void dropTableIfExists() {
- executeSql("create table my_schema.my_table (id int primary key, val varchar)");
+ sql("create table my_schema.my_table (id int primary key, val varchar)");
GridTestUtils.assertThrows(log,
- () -> executeSql("drop table my_table"),
+ () -> sql("drop table my_table"),
IgniteSQLException.class, "Table doesn't exist: MY_TABLE");
- executeSql("drop table my_schema.my_table");
+ sql("drop table my_schema.my_table");
GridTestUtils.assertThrows(log,
- () -> executeSql("drop table my_schema.my_table"),
+ () -> sql("drop table my_schema.my_table"),
IgniteSQLException.class, "Table doesn't exist: MY_TABLE");
- executeSql("drop table if exists my_schema.my_table");
+ sql("drop table if exists my_schema.my_table");
}
/**
@@ -447,20 +447,20 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
*/
@Test
public void alterTableAddDropSimpleCase() {
- executeSql("create table my_table (id int primary key, val varchar)");
+ sql("create table my_table (id int primary key, val varchar)");
- executeSql("alter table my_table add column val2 varchar");
+ sql("alter table my_table add column val2 varchar");
- executeSql("insert into my_table (id, val, val2) values (0, '1', '2')");
+ sql("insert into my_table (id, val, val2) values (0, '1', '2')");
- List<List<?>> res = executeSql("select * from my_table ");
+ List<List<?>> res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals("2", res.get(0).get(2));
- executeSql("alter table my_table drop column val2");
+ sql("alter table my_table drop column val2");
- res = executeSql("select * from my_table ");
+ res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals(2, res.get(0).size());
@@ -471,21 +471,21 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
*/
@Test
public void alterTableAddDropTwoColumns() {
- executeSql("create table my_table (id int primary key, val varchar)");
+ sql("create table my_table (id int primary key, val varchar)");
- executeSql("alter table my_table add column (val2 varchar, val3 int)");
+ sql("alter table my_table add column (val2 varchar, val3 int)");
- executeSql("insert into my_table (id, val, val2, val3) values (0, '1', '2', 3)");
+ sql("insert into my_table (id, val, val2, val3) values (0, '1', '2', 3)");
- List<List<?>> res = executeSql("select * from my_table ");
+ List<List<?>> res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals("2", res.get(0).get(2));
assertEquals(3, res.get(0).get(3));
- executeSql("alter table my_table drop column (val2, val3)");
+ sql("alter table my_table drop column (val2, val3)");
- res = executeSql("select * from my_table ");
+ res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals(2, res.get(0).size());
@@ -496,20 +496,20 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
*/
@Test
public void alterTableAddDropCustomSchema() {
- executeSql("create table my_schema.my_table (id int primary key, val varchar)");
+ sql("create table my_schema.my_table (id int primary key, val varchar)");
- executeSql("alter table my_schema.my_table add column val2 varchar");
+ sql("alter table my_schema.my_table add column val2 varchar");
- executeSql("insert into my_schema.my_table (id, val, val2) values (0, '1', '2')");
+ sql("insert into my_schema.my_table (id, val, val2) values (0, '1', '2')");
- List<List<?>> res = executeSql("select * from my_schema.my_table ");
+ List<List<?>> res = sql("select * from my_schema.my_table ");
assertEquals(1, res.size());
assertEquals("2", res.get(0).get(2));
- executeSql("alter table my_schema.my_table drop column val2");
+ sql("alter table my_schema.my_table drop column val2");
- res = executeSql("select * from my_schema.my_table ");
+ res = sql("select * from my_schema.my_table ");
assertEquals(1, res.size());
assertEquals(2, res.get(0).size());
@@ -522,29 +522,29 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
public void alterTableAddDropIfTableExists() {
assertThrows("alter table my_table add val2 varchar", IgniteSQLException.class, "Table doesn't exist");
- executeSql("alter table if exists my_table add column val2 varchar");
+ sql("alter table if exists my_table add column val2 varchar");
assertThrows("alter table my_table drop column val2", IgniteSQLException.class, "Table doesn't exist");
- executeSql("alter table if exists my_table drop column val2");
+ sql("alter table if exists my_table drop column val2");
- executeSql("create table my_table (id int primary key, val varchar)");
+ sql("create table my_table (id int primary key, val varchar)");
- executeSql("alter table if exists my_table add column val3 varchar");
+ sql("alter table if exists my_table add column val3 varchar");
- executeSql("insert into my_table (id, val, val3) values (0, '1', '2')");
+ sql("insert into my_table (id, val, val3) values (0, '1', '2')");
- List<List<?>> res = executeSql("select * from my_table ");
+ List<List<?>> res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals("2", res.get(0).get(2));
- executeSql("alter table if exists my_table drop column val3");
+ sql("alter table if exists my_table drop column val3");
assertThrows("alter table if exists my_table drop column val3", IgniteSQLException.class,
"Column doesn't exist");
- res = executeSql("select * from my_table ");
+ res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals(2, res.get(0).size());
@@ -555,50 +555,50 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
*/
@Test
public void alterTableAddDropIfColumnExists() {
- executeSql("create table my_table (id int primary key, val varchar)");
+ sql("create table my_table (id int primary key, val varchar)");
- executeSql("insert into my_table (id, val) values (0, '1')");
+ sql("insert into my_table (id, val) values (0, '1')");
assertThrows("alter table my_table add column val varchar", IgniteSQLException.class,
"Column already exist");
- executeSql("alter table my_table add column if not exists val varchar");
+ sql("alter table my_table add column if not exists val varchar");
- List<List<?>> res = executeSql("select * from my_table ");
+ List<List<?>> res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals(2, res.get(0).size());
assertThrows("alter table my_table drop column val2", IgniteSQLException.class,
"Column doesn't exist");
- executeSql("alter table my_table drop column if exists val2");
+ sql("alter table my_table drop column if exists val2");
- res = executeSql("select * from my_table ");
+ res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals(2, res.get(0).size());
- executeSql("alter table my_table add column if not exists val3 varchar");
+ sql("alter table my_table add column if not exists val3 varchar");
- res = executeSql("select * from my_table ");
+ res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals(3, res.get(0).size());
- executeSql("alter table my_table drop column if exists val3");
+ sql("alter table my_table drop column if exists val3");
- res = executeSql("select * from my_table ");
+ res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals(2, res.get(0).size());
// Mixing existsing and not existsing columns
- executeSql("alter table my_table add column if not exists (val varchar, val4 varchar)");
+ sql("alter table my_table add column if not exists (val varchar, val4 varchar)");
- res = executeSql("select * from my_table ");
+ res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals(3, res.get(0).size());
- executeSql("alter table my_table drop column if exists (val4, val5)");
+ sql("alter table my_table drop column if exists (val4, val5)");
- res = executeSql("select * from my_table ");
+ res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals(2, res.get(0).size());
}
@@ -608,16 +608,16 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
*/
@Test
public void alterTableAddNotNullColumn() {
- executeSql("create table my_table (id int primary key, val varchar)");
+ sql("create table my_table (id int primary key, val varchar)");
- executeSql("alter table my_table add column val2 varchar not null");
+ sql("alter table my_table add column val2 varchar not null");
assertThrows("insert into my_table (id, val, val2) values (0, '1', null)", IgniteSQLException.class,
"Null value is not allowed");
- executeSql("insert into my_table (id, val, val2) values (0, '1', '2')");
+ sql("insert into my_table (id, val, val2) values (0, '1', '2')");
- List<List<?>> res = executeSql("select * from my_table ");
+ List<List<?>> res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals("2", res.get(0).get(2));
@@ -628,11 +628,11 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
*/
@Test
public void alterTableDropForbiddenColumn() {
- executeSql("create table my_table (id int primary key, val varchar, val2 varchar)");
+ sql("create table my_table (id int primary key, val varchar, val2 varchar)");
- executeSql("create index my_index on my_table(val)");
+ sql("create index my_index on my_table(val)");
- executeSql("insert into my_table (id, val, val2) values (0, '1', '2')");
+ sql("insert into my_table (id, val, val2) values (0, '1', '2')");
assertThrows("alter table my_table drop column id", IgniteSQLException.class,
"Cannot drop column");
@@ -640,14 +640,14 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
assertThrows("alter table my_table drop column val", IgniteSQLException.class,
"Cannot drop column");
- List<List<?>> res = executeSql("select * from my_table ");
+ List<List<?>> res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals(3, res.get(0).size());
- executeSql("alter table my_table drop column val2");
+ sql("alter table my_table drop column val2");
- res = executeSql("select * from my_table ");
+ res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals(2, res.get(0).size());
@@ -658,39 +658,39 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
*/
@Test
public void alterTableServerAndClient() throws Exception {
- executeSql(grid(0), "create table my_table (id int primary key, val varchar)");
+ sql(grid(0), "create table my_table (id int primary key, val varchar)");
- executeSql(grid(0), "alter table my_table add column val2 varchar");
+ sql(grid(0), "alter table my_table add column val2 varchar");
- executeSql(grid(0), "insert into my_table (id, val, val2) values (0, '1', '2')");
+ sql(grid(0), "insert into my_table (id, val, val2) values (0, '1', '2')");
- List<List<?>> res = executeSql(grid(0), "select * from my_table ");
+ List<List<?>> res = sql(grid(0), "select * from my_table ");
assertEquals(1, res.size());
assertEquals(3, res.get(0).size());
- executeSql(grid(0), "drop table my_table");
+ sql(grid(0), "drop table my_table");
awaitPartitionMapExchange();
- executeSql("create table my_table (id int primary key, val varchar)");
+ sql("create table my_table (id int primary key, val varchar)");
- executeSql("alter table my_table add column val2 varchar");
+ sql("alter table my_table add column val2 varchar");
- executeSql("insert into my_table (id, val, val2) values (0, '1', '2')");
+ sql("insert into my_table (id, val, val2) values (0, '1', '2')");
- res = executeSql("select * from my_table ");
+ res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals(3, res.get(0).size());
awaitPartitionMapExchange();
- executeSql(grid(0), "alter table my_table drop column val2");
+ sql(grid(0), "alter table my_table drop column val2");
awaitPartitionMapExchange();
- res = executeSql("select * from my_table ");
+ res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals(2, res.get(0).size());
@@ -701,20 +701,20 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
*/
@Test
public void alterTableDropAddColumn() {
- executeSql("create table my_table (id int primary key, val varchar, val2 varchar)");
+ sql("create table my_table (id int primary key, val varchar, val2 varchar)");
- executeSql("insert into my_table (id, val, val2) values (0, '1', '2')");
+ sql("insert into my_table (id, val, val2) values (0, '1', '2')");
- executeSql("alter table my_table drop column val2");
+ sql("alter table my_table drop column val2");
- List<List<?>> res = executeSql("select * from my_table ");
+ List<List<?>> res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals(2, res.get(0).size());
- executeSql("alter table my_table add column val2 varchar not null");
+ sql("alter table my_table add column val2 varchar not null");
- res = executeSql("select * from my_table ");
+ res = sql("select * from my_table ");
assertEquals(1, res.size());
assertEquals(3, res.get(0).size());
// The command DROP COLUMN does not remove actual data from the cluster, it's a known and documented limitation.
@@ -723,9 +723,9 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
assertThrows("insert into my_table (id, val, val2) values (1, '2', null)", IgniteSQLException.class,
"Null value is not allowed");
- executeSql("insert into my_table (id, val, val2) values (1, '2', '3')");
+ sql("insert into my_table (id, val, val2) values (1, '2', '3')");
- assertEquals(2, executeSql("select * from my_table").size());
+ assertEquals(2, sql("select * from my_table").size());
}
/**
@@ -740,11 +740,11 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
assertTrue(client.cluster().isWalEnabled(cacheName));
- executeSql("alter table \"" + cacheName + "\".Integer nologging");
+ sql("alter table \"" + cacheName + "\".Integer nologging");
assertFalse(client.cluster().isWalEnabled(cacheName));
- executeSql("alter table \"" + cacheName + "\".Integer logging");
+ sql("alter table \"" + cacheName + "\".Integer logging");
assertTrue(client.cluster().isWalEnabled(cacheName));
}
@@ -760,9 +760,9 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
"INSERT INTO test(val0, val1, val2) VALUES(1, 'test1', 10);" +
"ALTER TABLE test DROP COLUMN val2;";
- executeSql(multiLineQuery);
+ sql(multiLineQuery);
- List<List<?>> res = executeSql("SELECT * FROM test order by val0");
+ List<List<?>> res = sql("SELECT * FROM test order by val0");
assertEquals(2, res.size());
for (int i = 0; i < res.size(); i++) {
@@ -775,17 +775,6 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
}
/**
- * Asserts that executeSql throws an exception.
- *
- * @param sql Query.
- * @param cls Exception class.
- * @param msg Error message.
- */
- private void assertThrows(String sql, Class<? extends Exception> cls, String msg) {
- assertThrowsAnyCause(log, () -> executeSql(sql), cls, msg);
- }
-
- /**
* Matcher to verify that an object of the expected type and matches the given predicat.
*
* @param desc Description for this matcher.
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDdlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDdlIntegrationTest.java
index db5a907..e4b55afb 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDdlIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDdlIntegrationTest.java
@@ -75,21 +75,21 @@ public class UserDdlIntegrationTest extends AbstractDdlIntegrationTest {
for (Ignite ignite : G.allGrids()) {
IgniteEx igniteEx = (IgniteEx)ignite;
- executeSql(igniteEx, "CREATE USER test WITH PASSWORD 'test'");
+ sql(igniteEx, "CREATE USER test WITH PASSWORD 'test'");
SecurityContext secCtx = authenticate(igniteEx, "TEST", "test");
assertNotNull(secCtx);
assertEquals("TEST", secCtx.subject().login());
- executeSql(igniteEx, "ALTER USER test WITH PASSWORD 'newpasswd'");
+ sql(igniteEx, "ALTER USER test WITH PASSWORD 'newpasswd'");
secCtx = authenticate(igniteEx, "TEST", "newpasswd");
assertNotNull(secCtx);
assertEquals("TEST", secCtx.subject().login());
- executeSql(igniteEx, "DROP USER test");
+ sql(igniteEx, "DROP USER test");
GridTestUtils.assertThrowsWithCause(() -> authenticate(igniteEx, "TEST", "newpasswd"),
IgniteAccessControlException.class);
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index 3039a32..c3499c9 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -29,6 +29,7 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.plan.RelOptCluster;
@@ -68,7 +69,6 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.PlannerHelper;
import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader;
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.Cloner;
import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerHelper;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index 7f803a2..1b5779d 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -168,12 +168,11 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(phys);
- MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
- new Splitter().go(phys)), null);
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
assertNotNull(plan);
- plan.init(Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
+ plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
assertNotNull(plan);
@@ -277,12 +276,11 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(phys);
- MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
- new Splitter().go(phys)), null);
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
assertNotNull(plan);
- plan.init(Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
+ plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
List<Fragment> fragments = plan.fragments();
assertEquals(2, fragments.size());
@@ -503,12 +501,11 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(phys);
- MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
- new Splitter().go(phys)), null);
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
assertNotNull(plan);
- plan.init(Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
+ plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
List<Fragment> fragments = plan.fragments();
assertEquals(2, fragments.size());
@@ -721,12 +718,11 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(phys);
- MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
- new Splitter().go(phys)), null);
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
assertNotNull(plan);
- plan.init(Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
+ plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
assertNotNull(plan);
@@ -805,12 +801,11 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(phys);
- MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
- new Splitter().go(phys)), null);
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
assertNotNull(plan);
- plan.init(Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
+ plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
assertEquals(3, plan.fragments().size());
}
@@ -886,12 +881,11 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(phys);
- MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
- new Splitter().go(phys)), null);
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
assertNotNull(plan);
- plan.init(Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
+ plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
assertNotNull(plan);
@@ -969,12 +963,11 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(phys);
- MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
- new Splitter().go(phys)), null);
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
assertNotNull(plan);
- plan.init(Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
+ plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
assertEquals(3, plan.fragments().size());
}
@@ -1046,12 +1039,11 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(phys);
- MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
- new Splitter().go(phys)), null);
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
assertNotNull(plan);
- plan.init(Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
+ plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
assertNotNull(plan);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java
index 04a97db..055eef6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java
@@ -83,4 +83,9 @@ public class GridQueryCancel {
if (canceled)
throw new QueryCancelledException();
}
+
+ /** */
+ public synchronized boolean isCanceled() {
+ return canceled;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NoOpQueryEngine.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NoOpQueryEngine.java
index 175c745..27d1ce7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NoOpQueryEngine.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NoOpQueryEngine.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.processors.query;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -38,4 +40,14 @@ public class NoOpQueryEngine extends GridProcessorAdapter implements QueryEngine
Object... params) throws IgniteSQLException {
return Collections.emptyList();
}
+
+ /** {@inheritDoc} */
+ @Override public Collection<? extends RunningQuery> runningQueries() {
+ return Collections.emptyList();
+ }
+
+ /** {@inheritDoc} */
+ @Override public RunningQuery runningQuery(UUID id) {
+ return null;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
index 7c168fa..e1206af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
@@ -17,7 +17,10 @@
package org.apache.ignite.internal.processors.query;
+import java.util.Collection;
import java.util.List;
+import java.util.UUID;
+
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.internal.processors.GridProcessor;
import org.jetbrains.annotations.Nullable;
@@ -36,4 +39,10 @@ public interface QueryEngine extends GridProcessor {
*/
List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext ctx, String schemaName, String qry, Object... params)
throws IgniteSQLException;
+
+ /** */
+ Collection<? extends RunningQuery> runningQueries();
+
+ /** */
+ RunningQuery runningQuery(UUID id);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryState.java
similarity index 72%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryState.java
index 311ef65..a6cd8cb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryState.java
@@ -15,17 +15,25 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.schema;
+package org.apache.ignite.internal.processors.query;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.ignite.internal.processors.query.calcite.util.Service;
+/** */
+public enum QueryState {
+ /** */
+ INITED,
-/**
- *
- */
-public interface SchemaHolder extends Service {
- /**
- * @return Schema.
- */
- SchemaPlus schema();
+ /** */
+ PLANNING,
+
+ /** */
+ MAPPING,
+
+ /** */
+ EXECUTING,
+
+ /** */
+ CLOSING,
+
+ /** */
+ CLOSED
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQuery.java
similarity index 73%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQuery.java
index 311ef65..447e925 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQuery.java
@@ -15,17 +15,20 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.schema;
+package org.apache.ignite.internal.processors.query;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.ignite.internal.processors.query.calcite.util.Service;
+import java.util.UUID;
/**
*
*/
-public interface SchemaHolder extends Service {
- /**
- * @return Schema.
- */
- SchemaPlus schema();
+public interface RunningQuery {
+ /** */
+ UUID id();
+
+ /** */
+ QueryState state();
+
+ /** */
+ void cancel();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlCommandProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlCommandProcessor.java
index f82f022..51d77bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlCommandProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlCommandProcessor.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql;
import java.util.LinkedHashMap;
import java.util.List;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.IgniteLogger;
@@ -40,7 +41,6 @@ import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQuerySchemaManager;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.SqlClientContext;
import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import org.apache.ignite.internal.sql.command.SqlAlterTableCommand;
import org.apache.ignite.internal.sql.command.SqlAlterUserCommand;
@@ -88,17 +88,14 @@ public class SqlCommandProcessor {
/**
* Execute command.
*
- * @param sql SQL.
* @param cmdNative Native command.
- * @param cliCtx Client context.
* @return Result.
*/
- @Nullable public FieldsQueryCursor<List<?>> runCommand(String sql, SqlCommand cmdNative,
- @Nullable SqlClientContext cliCtx) {
+ @Nullable public FieldsQueryCursor<List<?>> runCommand(SqlCommand cmdNative) {
assert cmdNative != null;
if (isDdl(cmdNative))
- runCommandNativeDdl(sql, cmdNative);
+ runCommandNativeDdl(cmdNative);
else if (cmdNative instanceof SqlKillComputeTaskCommand)
processKillComputeTaskCommand((SqlKillComputeTaskCommand) cmdNative);
else if (cmdNative instanceof SqlKillTransactionCommand)
@@ -192,10 +189,9 @@ public class SqlCommandProcessor {
/**
* Run DDL statement.
*
- * @param sql Original SQL.
* @param cmd Command.
*/
- private void runCommandNativeDdl(String sql, SqlCommand cmd) {
+ private void runCommandNativeDdl(SqlCommand cmd) {
IgniteInternalFuture<?> fut = null;
try {
@@ -306,7 +302,7 @@ public class SqlCommandProcessor {
ctx.security().dropUser(dropCmd.userName());
}
else
- throw new IgniteSQLException("Unsupported DDL operation: " + sql,
+ throw new IgniteSQLException("Unsupported DDL operation: " + cmd,
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
if (fut != null)
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
index f727505..65fd9c7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
@@ -442,7 +442,7 @@ public class CommandProcessor extends SqlCommandProcessor {
public FieldsQueryCursor<List<?>> runNativeCommand(String sql, SqlCommand cmdNative,
QueryParameters params, @Nullable SqlClientContext cliCtx, Long qryId) throws IgniteCheckedException {
if (super.isCommandSupported(cmdNative))
- return runCommand(sql, cmdNative, cliCtx);
+ return runCommand(cmdNative);
if (cmdNative instanceof SqlBulkLoadCommand)
return processBulkLoadCommand((SqlBulkLoadCommand) cmdNative, qryId);