You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2020/02/17 10:21:22 UTC
[ignite] branch ignite-12248 updated: IGNITE-12602: Calcite
integration. JDBC Thin driver support. This closes #7377
This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new b9431b7 IGNITE-12602: Calcite integration. JDBC Thin driver support. This closes #7377
b9431b7 is described below
commit b9431b7880d97256cc51c35e6e16ba7cc6aee99b
Author: Igor Seliverstov <gv...@gmail.com>
AuthorDate: Mon Feb 17 13:21:05 2020 +0300
IGNITE-12602: Calcite integration. JDBC Thin driver support. This closes #7377
---
modules/calcite/pom.xml | 6 +
.../query/calcite/CalciteQueryProcessor.java | 8 +-
.../calcite/exec/ClosableIteratorsHolder.java | 164 +++++++
.../query/calcite/exec/ConsumerNode.java | 10 +-
.../query/calcite/exec/ExchangeService.java | 6 +-
.../query/calcite/exec/ExchangeServiceImpl.java | 27 +-
.../query/calcite/exec/ExecutionContext.java | 9 +-
.../query/calcite/exec/ExecutionService.java | 2 +-
.../query/calcite/exec/ExecutionServiceImpl.java | 538 ++++++++++++++-------
.../processors/query/calcite/exec/Implementor.java | 4 +-
.../query/calcite/exec/MailboxRegistry.java | 11 +-
.../query/calcite/exec/MailboxRegistryImpl.java | 16 +-
.../query/calcite/exec/QueryCancelGroup.java | 92 ++++
.../processors/query/calcite/exec/ScanNode.java | 4 +-
.../query/calcite/message/MessageService.java | 12 +-
.../query/calcite/message/MessageServiceImpl.java | 76 ++-
.../query/calcite/metadata/FragmentInfo.java | 2 +-
.../calcite/metadata/IgniteMdDistribution.java | 8 +-
.../calcite/metadata/IgniteMdFragmentInfo.java | 38 +-
.../query/calcite/metadata/IgniteMetadata.java | 4 +-
.../metadata/OptimisticPlanningException.java | 2 +-
.../query/calcite/metadata/RelMetadataQueryEx.java | 2 +-
.../calcite/prepare/CalciteQueryFieldMetadata.java | 116 +++++
.../calcite/{splitter => prepare}/Cloner.java | 6 +-
.../query/calcite/{splitter => prepare}/Edge.java | 2 +-
.../calcite/{splitter => prepare}/Fragment.java | 5 +-
.../query/calcite/prepare/IgnitePlanner.java | 215 ++++----
.../query/calcite/prepare/IgnitePrograms.java | 92 ++++
.../query/calcite/prepare/MultiStepPlan.java | 49 ++
.../MultiStepPlanImpl.java} | 45 +-
.../query/calcite/prepare/PlannerPhase.java | 29 +-
.../query/calcite/prepare/PlannerType.java | 26 -
.../query/calcite/prepare/PlanningContext.java | 50 +-
.../query/calcite/prepare/QueryPlan.java | 37 ++
.../query/calcite/prepare/QueryPlanCache.java | 4 +-
.../query/calcite/prepare/QueryPlanCacheImpl.java | 25 +-
.../query/calcite/prepare/QueryPlanFactory.java | 4 +-
.../calcite/{splitter => prepare}/RelSource.java | 3 +-
.../{splitter => prepare}/RelSourceImpl.java | 2 +-
.../calcite/{splitter => prepare}/RelTarget.java | 2 +-
.../{splitter => prepare}/RelTargetImpl.java | 2 +-
.../query/calcite/prepare/RowMetadata.java | 59 +++
.../calcite/{splitter => prepare}/Splitter.java | 17 +-
.../query/calcite/prepare/ValidationResult.java | 68 +++
.../query/calcite/rel/IgniteConvention.java | 8 -
.../query/calcite/rel/IgniteReceiver.java | 2 +-
.../processors/query/calcite/rel/IgniteSender.java | 2 +-
.../query/calcite/schema/SchemaHolderImpl.java | 1 +
.../query/calcite/schema/TableDescriptorImpl.java | 127 ++++-
.../calcite/serialize/relation/ReceiverNode.java | 4 +-
.../calcite/serialize/relation/SenderNode.java | 2 +-
.../processors/query/calcite/util/Commons.java | 21 +-
.../calcite/util/ConvertingClosableIterator.java | 59 +++
.../query/calcite/util/IgniteMethod.java | 4 +-
.../query/calcite/util/ListFieldsQueryCursor.java | 75 +--
.../processors/query/calcite/util/TableScan.java | 2 +-
.../query/calcite/CalciteQueryProcessorTest.java | 14 +-
.../processors/query/calcite/PlannerTest.java | 70 +--
.../query/calcite/exec/AbstractExecutionTest.java | 8 +-
.../calcite/exec/ClosableIteratorsHolderTest.java | 101 ++++
.../processors/query/calcite/exec/OutboxTest.java | 6 +-
.../query/calcite/jdbc/JdbcQueryTest.java | 85 ++++
.../ignite/testsuites/IgniteCalciteTestSuite.java | 4 +-
.../internal/jdbc/thin/ConnectionProperties.java | 12 +
.../jdbc/thin/ConnectionPropertiesImpl.java | 17 +-
.../ignite/internal/jdbc/thin/JdbcThinTcpIo.java | 8 +-
.../internal/processors/cache/QueryCursorImpl.java | 2 +-
.../processors/cache/query/QueryCursorEx.java | 7 +
.../odbc/jdbc/JdbcConnectionContext.java | 11 +-
.../processors/odbc/jdbc/JdbcQueryCursor.java | 10 +-
.../processors/odbc/jdbc/JdbcRequestHandler.java | 49 +-
.../cache/query/PlatformContinuousQueryImpl.java | 4 +
.../internal/processors/query/QueryContext.java | 2 +-
73 files changed, 1967 insertions(+), 649 deletions(-)
diff --git a/modules/calcite/pom.xml b/modules/calcite/pom.xml
index b546eca..3b30977 100644
--- a/modules/calcite/pom.xml
+++ b/modules/calcite/pom.xml
@@ -98,6 +98,12 @@
<version>${spring.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-clients</artifactId>
+ <version>2.9.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index fc87608..27a092a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -17,10 +17,10 @@
package org.apache.ignite.internal.processors.query.calcite;
-import java.util.Collections;
import java.util.List;
import org.apache.calcite.config.Lex;
import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCostImpl;
import org.apache.calcite.sql.fun.SqlLibrary;
import org.apache.calcite.sql.fun.SqlLibraryOperatorTableFactory;
import org.apache.calcite.sql.parser.SqlParser;
@@ -66,6 +66,7 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
// Lexical configuration defines how identifiers are quoted, whether they are converted to upper or lower
// case when they are read, and whether identifiers are matched case-sensitively.
.setLex(Lex.ORACLE)
+// .setParserFactory(SqlDdlParserImpl.FACTORY) // Enables DDL support
.build())
// Dialects support.
.operatorTable(SqlLibraryOperatorTableFactory.INSTANCE
@@ -75,7 +76,7 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
// Context provides a way to store data within the planner session that can be accessed in planner rules.
.context(Contexts.empty())
// Custom cost factory to use during optimization
- .costFactory(null)
+ .costFactory(RelOptCostImpl.FACTORY)
.typeSystem(IgniteTypeSystem.INSTANCE)
.build();
@@ -251,8 +252,7 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
@Override public List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext qryCtx, @Nullable String schemaName,
String query, Object... params) throws IgniteSQLException {
- // TODO multiline query.
- return Collections.singletonList(executionService.executeQuery(qryCtx, schemaName, query, params));
+ return executionService.executeQuery(qryCtx, schemaName, query, params);
}
/** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolder.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolder.java
new file mode 100644
index 0000000..d3910c9
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolder.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ClosableIteratorsHolder {
+ /** */
+ private final ReferenceQueue refQueue;
+
+ /** */
+ private final Map<Reference, Object> refMap;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private volatile boolean stopped;
+
+ /** */
+ private Thread cleanWorker;
+
+ /** */
+ public ClosableIteratorsHolder(IgniteLogger log) {
+ this.log = log;
+
+ refQueue = new ReferenceQueue<>();
+ refMap = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * @param src Closeable iterator.
+ * @return Weak closable iterator wrapper.
+ */
+ public <T> Iterator<T> iterator(final Iterator<T> src) {
+ cleanUp(false);
+
+ return new DelegatingIterator<>(src);
+ }
+
+ /** */
+ public void init() {
+ cleanWorker = new Thread(() -> cleanUp(true));
+ cleanWorker.setDaemon(true);
+ cleanWorker.start();
+ }
+
+ /** */
+ public void tearDown() {
+ stopped = true;
+ refMap.clear();
+ U.interrupt(cleanWorker);
+ }
+
+ /** */
+ private void cleanUp(boolean blocking) {
+ for (Reference<?> ref = nextRef(blocking); !stopped && ref != null; ref = nextRef(blocking))
+ Commons.close(refMap.remove(ref), log);
+ }
+
+ /** */
+ private Reference nextRef(boolean blocking) {
+ try {
+ return !blocking ? refQueue.poll() : refQueue.remove();
+ }
+ catch (InterruptedException ignored) {
+ return null;
+ }
+ }
+
+ /** */
+ private AutoCloseable closeable(Object referent, Object resource) {
+ if (!(resource instanceof AutoCloseable))
+ return null;
+
+ return new CloseableReference(referent, resource);
+ }
+
+ /** */
+ private final class DelegatingIterator<T> implements Iterator<T>, AutoCloseable {
+ /** */
+ private final Iterator<T> delegate;
+
+ /** */
+ private final AutoCloseable closeable;
+
+ /** */
+ private DelegatingIterator(Iterator<T> delegate) {
+ closeable = closeable(this, this.delegate = delegate);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ /** {@inheritDoc} */
+ @Override public T next() {
+ return delegate.next();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ delegate.remove();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void forEachRemaining(Consumer<? super T> action) {
+ delegate.forEachRemaining(action);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ Commons.close(closeable);
+ }
+ }
+
+ /** */
+ private final class CloseableReference extends WeakReference implements AutoCloseable {
+ /** */
+ private CloseableReference(Object referent, Object resource) {
+ super(referent, refQueue);
+
+ refMap.put(this, resource);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ try {
+ Commons.close(refMap.remove(this));
+ }
+ finally {
+ clear();
+ }
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
index a686866..19a474c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
@@ -134,8 +134,6 @@ public class ConsumerNode extends AbstractNode<Object[]> implements SingleNode<O
/** {@inheritDoc} */
@Override public void cancel() {
- context().setCancelled();
-
if (state != State.RUNNING)
return;
@@ -144,6 +142,7 @@ public class ConsumerNode extends AbstractNode<Object[]> implements SingleNode<O
if (state != State.RUNNING)
return;
+ context().setCancelled();
state = State.CANCELLED;
buff.clear();
cond.signalAll();
@@ -155,8 +154,11 @@ public class ConsumerNode extends AbstractNode<Object[]> implements SingleNode<O
context().execute(input()::cancel);
onClose.accept(this);
}
-
- public boolean canceled() {
+
+ /**
+ * @return Cancelled flag.
+ */
+ boolean canceled() {
return state == State.CANCELLED;
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
index 24b5383..4f9a05e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
@@ -42,7 +42,7 @@ public interface ExchangeService extends Service {
* @param batchId Batch ID.
* @param rows Data rows.
*/
- void sendBatch(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List<?> rows);
+ void sendBatch(Outbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List<?> rows);
/**
* Acknowledges a batch with given ID is processed.
@@ -54,7 +54,7 @@ public interface ExchangeService extends Service {
* @param exchangeId Exchange ID.
* @param batchId Batch ID.
*/
- void acknowledge(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId);
+ void acknowledge(Inbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId);
/**
* Sends cancel request.
@@ -66,5 +66,5 @@ public interface ExchangeService extends Service {
* @param exchangeId Exchange ID.
* @param batchId Batch ID.
*/
- void cancel(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId);
+ void cancel(Outbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index 7559f06..b94940b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
import org.apache.ignite.internal.processors.query.calcite.message.InboxCancelMessage;
@@ -95,18 +96,32 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
}
/** {@inheritDoc} */
- @Override public void sendBatch(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List<?> rows) {
- messageService().send(nodeId, new QueryBatchMessage(queryId, fragmentId, exchangeId, batchId, rows));
+ @Override public void sendBatch(Outbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List<?> rows) {
+ try {
+ messageService().send(nodeId, new QueryBatchMessage(queryId, fragmentId, exchangeId, batchId, rows));
+ }
+ catch (IgniteCheckedException e) {
+ caller.cancel();
+ }
}
/** {@inheritDoc} */
- @Override public void acknowledge(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
- messageService().send(nodeId, new QueryBatchAcknowledgeMessage(queryId, fragmentId, exchangeId, batchId));
+ @Override public void acknowledge(Inbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
+ try {
+ messageService().send(nodeId, new QueryBatchAcknowledgeMessage(queryId, fragmentId, exchangeId, batchId));
+ }
+ catch (IgniteCheckedException e) {
+ caller.cancel();
+ }
}
/** {@inheritDoc} */
- @Override public void cancel(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
- messageService().send(nodeId, new InboxCancelMessage(queryId, fragmentId, exchangeId, batchId));
+ @Override public void cancel(Outbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
+ try {
+ messageService().send(nodeId, new InboxCancelMessage(queryId, fragmentId, exchangeId, batchId));
+ }
+ catch (IgniteCheckedException ignored) {
+ }
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 0b4177a..e01f78b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -99,7 +99,7 @@ public class ExecutionContext implements DataContext {
* @return Keep binary flag.
*/
public boolean keepBinary() {
- return false; // TODO
+ return true; // TODO
}
/**
@@ -116,6 +116,13 @@ public class ExecutionContext implements DataContext {
return cancelled;
}
+ /**
+ * @return Originating node ID.
+ */
+ public UUID originatingNodeId() {
+ return parent().originatingNodeId();
+ }
+
/** {@inheritDoc} */
@Override public SchemaPlus getRootSchema() {
return ctx.schema();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
index 16afe41..59ba0a8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
@@ -37,7 +37,7 @@ public interface ExecutionService extends Service {
* @param params Query parameters.
* @return Query cursor.
*/
- FieldsQueryCursor<List<?>> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params);
+ List<FieldsQueryCursor<List<?>>> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params);
/**
* Cancels a running query.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index e4e1fd5..8116883 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
+import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -27,19 +29,25 @@ import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.ValidationException;
-import org.apache.calcite.util.Pair;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -49,6 +57,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryCancellable;
import org.apache.ignite.internal.processors.query.QueryContext;
@@ -58,15 +68,22 @@ import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
import org.apache.ignite.internal.processors.query.calcite.message.QueryCancelRequest;
import org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
import org.apache.ignite.internal.processors.query.calcite.message.QueryStartResponse;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata;
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
import org.apache.ignite.internal.processors.query.calcite.metadata.PartitionService;
import org.apache.ignite.internal.processors.query.calcite.prepare.CacheKey;
+import org.apache.ignite.internal.processors.query.calcite.prepare.CalciteQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlanImpl;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerType;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCache;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
+import org.apache.ignite.internal.processors.query.calcite.prepare.ValidationResult;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
@@ -75,9 +92,6 @@ import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolder;
import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelGraph;
import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelToGraphConverter;
import org.apache.ignite.internal.processors.query.calcite.serialize.relation.SenderNode;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Fragment;
-import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
@@ -133,6 +147,9 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
private ExchangeService exchangeService;
/** */
+ private ClosableIteratorsHolder iteratorsHolder;
+
+ /** */
private final Map<UUID, QueryInfo> running;
/**
@@ -313,86 +330,36 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
return exchangeManager;
}
- /** {@inheritDoc} */
- @Override public FieldsQueryCursor<List<?>> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params) {
- UUID queryId = UUID.randomUUID();
-
- PlanningContext pctx = createContext(ctx, schema, query, params);
-
- QueryPlan plan = prepare(pctx);
-
- // Local execution
- Fragment local = F.first(plan.fragments());
-
- if (U.assertionsEnabled()) {
- assert local != null;
-
- NodesMapping mapping = local.mapping();
-
- assert mapping != null;
+ /**
+ * @param iteratorsHolder Iterators holder.
+ */
+ public void iteratorsHolder(ClosableIteratorsHolder iteratorsHolder) {
+ this.iteratorsHolder = iteratorsHolder;
+ }
- List<UUID> nodes = mapping.nodes();
+ /**
+ * @return Iterators holder.
+ */
+ public ClosableIteratorsHolder iteratorsHolder() {
+ return iteratorsHolder;
+ }
- assert nodes != null && nodes.size() == 1 && F.first(nodes).equals(pctx.localNodeId());
+ /** {@inheritDoc} */
+ @Override public List<FieldsQueryCursor<List<?>>> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params) {
+ PlanningContext pctx = createContext(ctx, schema, query, params);
+ RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(IgniteMetadata.METADATA_PROVIDER));
+ try (IgnitePlanner ignored = pctx.planner()) {
+ return Commons.transform(prepare(pctx), p -> executeSingle(UUID.randomUUID(), pctx, p));
}
-
- ExecutionContext ectx = new ExecutionContext(
- taskExecutor(), pctx, queryId, local.fragmentId(), local.mapping().partitions(pctx.localNodeId()), Commons.parametersMap(params)
- );
-
- Node<Object[]> node = new Implementor(partitionService(), mailboxRegistry(), exchangeService(), failureProcessor(), ectx, log).go(local.root());
-
- assert !(node instanceof SenderNode);
-
- QueryInfo info = new QueryInfo(ectx, local.root().getRowType(), new ConsumerNode(ectx, node, this::onConsumerClose));
-
- if (plan.fragments().size() == 1)
- running.put(queryId, info);
- else {
- // remote execution
- RelOp<IgniteRel, RelGraph> converter = new RelToGraphConverter();
- List<Pair<UUID, QueryStartRequest>> requests = new ArrayList<>();
-
- for (Fragment remote : plan.fragments().subList(1, plan.fragments().size())) {
- long id = remote.fragmentId();
- NodesMapping mapping = remote.mapping();
- RelGraph graph = converter.go(remote.root());
-
- for (UUID nodeId : mapping.nodes()) {
- info.addFragment(nodeId, id);
-
- QueryStartRequest req = new QueryStartRequest(
- queryId,
- id,
- schema,
- graph,
- pctx.topologyVersion(),
- mapping.partitions(nodeId),
- params);
-
- requests.add(Pair.of(nodeId, req));
- }
- }
-
- running.put(queryId, info);
-
- // start remote execution
- for (Pair<UUID, QueryStartRequest> pair : requests)
- messageService().send(pair.left, pair.right);
+ finally {
+ RelMetadataQuery.THREAD_PROVIDERS.remove();
}
-
- // start local execution
- info.consumer.request();
-
- info.awaitAllReplies();
-
- // TODO weak map to stop query on cursor collected by GC.
- return new ListFieldsQueryCursor<>(info.type(), info.<Object[]>iterator(), Arrays::asList);
}
/** {@inheritDoc} */
@Override public void cancelQuery(UUID queryId) {
- mailboxRegistry().outboxes(queryId).forEach(Outbox::cancel);
+ mailboxRegistry().outboxes(queryId).forEach(this::executeCancel);
+ mailboxRegistry().inboxes(queryId).forEach(this::executeCancel);
QueryInfo info = running.get(queryId);
@@ -405,6 +372,7 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
localNodeId(ctx.localNodeId());
exchangeManager(ctx.cache().context().exchange());
eventManager(ctx.event());
+ iteratorsHolder(new ClosableIteratorsHolder(log));
CalciteQueryProcessor proc = Objects.requireNonNull(Commons.lookupComponent(ctx, CalciteQueryProcessor.class));
@@ -428,6 +396,8 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
messageService().register((n,m) -> onMessage(n, (QueryCancelRequest) m), MessageType.QUERY_CANCEL_REQUEST);
eventManager().addDiscoveryEventListener(discoLsnr, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
+
+ iteratorsHolder().init();
}
/** {@inheritDoc} */
@@ -435,6 +405,8 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
eventManager().removeDiscoveryEventListener(discoLsnr, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
running.clear();
+
+ iteratorsHolder().tearDown();
}
/** */
@@ -443,65 +415,6 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
}
/** */
- private QueryPlan prepare(PlanningContext ctx) {
- CacheKey cacheKey = new CacheKey(ctx.schema().getName(), ctx.query());
-
- QueryPlan plan = queryPlanCache().queryPlan(ctx, cacheKey, this::prepare0);
-
- plan.init(mappingService(), ctx);
-
- return plan;
- }
-
- /** */
- private QueryPlan prepare0(PlanningContext ctx) {
- IgnitePlanner planner = ctx.planner();
-
- try {
- String query = ctx.query();
-
- assert query != null;
-
- // Parse
- SqlNode sqlNode = planner.parse(query);
-
- // Validate
- sqlNode = planner.validate(sqlNode);
-
- // Convert to Relational operators graph
- RelNode rel = planner.convert(sqlNode);
-
- // Transformation chain
- rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
-
- RelTraitSet desired = rel.getCluster().traitSet()
- .replace(IgniteConvention.INSTANCE)
- .replace(IgniteDistributions.single())
- .simplify();
-
- IgniteRel igniteRel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
-
- // Split query plan to multi-step one.
- return new Splitter().go(igniteRel);
- }
- catch (SqlParseException e) {
- IgniteSQLException ex = new IgniteSQLException("Failed to parse query.", IgniteQueryErrorCode.PARSING, e);
- Commons.close(planner, ex);
- throw ex;
- }
- catch (ValidationException e) {
- IgniteSQLException ex = new IgniteSQLException("Failed to validate query.", IgniteQueryErrorCode.UNKNOWN, e);
- Commons.close(planner, ex);
- throw ex;
- }
- catch (Exception e) {
- IgniteSQLException ex = new IgniteSQLException("Failed to plan query.", IgniteQueryErrorCode.UNKNOWN, e);
- Commons.close(planner, ex);
- throw ex;
- }
- }
-
- /** */
private PlanningContext createContext(@Nullable QueryContext qryCtx, @Nullable String schemaName, String query, Object[] params) {
RelTraitDef<?>[] traitDefs = {
ConventionTraitDef.INSTANCE
@@ -511,7 +424,7 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
return PlanningContext.builder()
.localNodeId(localNodeId())
- .parentContext(Commons.convert(qryCtx)) // TODO Connection config on the basis of query context
+ .parentContext(Commons.convert(qryCtx))
.frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
.defaultSchema(schemaName != null
? schemaHolder().schema().getSubSchema(schemaName)
@@ -521,6 +434,7 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
.query(query)
.parameters(params)
.topologyVersion(topologyVersion())
+ .cancelGroup(cancelGroup(qryCtx))
.logger(log)
.build();
}
@@ -550,17 +464,239 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
}
/** */
- protected void onMessage(UUID nodeId, QueryStartRequest msg) {
+ private List<QueryPlan> prepare(PlanningContext ctx) {
+ return queryPlanCache().queryPlan(ctx, new CacheKey(ctx.schemaName(), ctx.query()), this::prepare0);
+ }
+
+ /** */
+ private List<QueryPlan> prepare0(PlanningContext ctx) {
+ try {
+ String query = ctx.query();
+
+ assert query != null;
+
+ // Parse query.
+ SqlNode sqlNode = ctx.planner().parse(query);
+
+ if (single(sqlNode))
+ return Collections.singletonList(prepareSingle(sqlNode, ctx));
+
+ List<SqlNode> nodes = ((SqlNodeList) sqlNode).getList();
+ List<QueryPlan> res = new ArrayList<>(nodes.size());
+
+ for (SqlNode node : nodes)
+ res.add(prepareSingle(node, ctx));
+
+ return res;
+ }
+ catch (IgniteSQLException e) {
+ throw e;
+ }
+ catch (SqlParseException e) {
+ throw new IgniteSQLException("Failed to parse query.", IgniteQueryErrorCode.PARSING, e);
+ }
+ catch (ValidationException e) {
+ throw new IgniteSQLException("Failed to validate query.", IgniteQueryErrorCode.PARSING, e);
+ }
+ catch (Exception e) {
+ throw new IgniteSQLException("Failed to plan query.", IgniteQueryErrorCode.UNKNOWN, e);
+ }
+ }
+
+ /** */
+ private QueryPlan prepareSingle(SqlNode sqlNode, PlanningContext ctx) throws ValidationException {
+ assert single(sqlNode);
+
+ ctx.planner().reset();
+
+ if (SqlKind.QUERY.contains(sqlNode.getKind()))
+ return prepareQuery(sqlNode, ctx);
+
+ throw new IgniteSQLException("Unsupported operation [querySql=\"" + ctx.query() + "\"]", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+ }
+
+ /** */
+ private QueryPlan prepareQuery(SqlNode sqlNode, PlanningContext ctx) throws ValidationException {
+ IgnitePlanner planner = ctx.planner();
+
+ // Validate
+ ValidationResult validated = planner.validateAndGetTypeMetadata(sqlNode);
+
+ sqlNode = validated.sqlNode();
+
+ // Convert to Relational operators graph
+ RelNode rel = planner.convert(sqlNode);
+
+ // Transformation chain
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteConvention.INSTANCE)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ IgniteRel igniteRel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+ // Split query plan to query fragments.
+ List<Fragment> fragments = new Splitter().go(igniteRel);
+
+ return new MultiStepPlanImpl(fragments, fieldsMetadata(validated, ctx));
+ }
+
+ /** */
+ private FieldsQueryCursor<List<?>> executeSingle(UUID queryId, PlanningContext pctx, QueryPlan plan) {
+ if (plan.type() == QueryPlan.Type.QUERY)
+ return executeQuery(queryId, (MultiStepPlan) plan, pctx);
+
+ throw new AssertionError("Unexpected plan type: " + plan);
+ }
+
+ /** */
+ private FieldsQueryCursor<List<?>> executeQuery(UUID queryId, MultiStepPlan plan, PlanningContext pctx) {
+ plan.init(mappingService(), pctx);
+
+ List<Fragment> fragments = plan.fragments();
+
+ // Local execution
+ Fragment local = F.first(fragments);
+
+ if (U.assertionsEnabled()) {
+ assert local != null;
+
+ NodesMapping mapping = local.mapping();
+
+ assert mapping != null;
+
+ List<UUID> nodes = mapping.nodes();
+
+ assert nodes != null && nodes.size() == 1 && F.first(nodes).equals(pctx.localNodeId());
+ }
+
+ ExecutionContext ectx = new ExecutionContext(taskExecutor(), pctx, queryId, local.fragmentId(),
+ local.mapping().partitions(pctx.localNodeId()), Commons.parametersMap(pctx.parameters()));
+
+ Node<Object[]> node = new Implementor(partitionService(), mailboxRegistry(), exchangeService(), failureProcessor(), ectx, log).go(local.root());
+
+ assert !(node instanceof SenderNode);
+
+ QueryInfo info = new QueryInfo(ectx, fragments, node);
+
+ // register query
+ register(info);
+
+ // start local execution
+ info.consumer.request();
+
+ // start remote execution
+ if (fragments.size() > 1) {
+ RelOp<IgniteRel, RelGraph> converter = new RelToGraphConverter();
+
+ for (int i = 1; i < fragments.size(); i++) {
+ Fragment fragment = fragments.get(i);
+
+ boolean error = false;
+
+ for (UUID nodeId : fragment.mapping().nodes()) {
+ if (error)
+ info.onResponse(nodeId, fragment.fragmentId(), new QueryCancelledException());
+ else {
+ try {
+ QueryStartRequest req = new QueryStartRequest(
+ queryId,
+ fragment.fragmentId(),
+ pctx.schemaName(),
+ converter.go(fragment.root()),
+ pctx.topologyVersion(),
+ fragment.mapping().partitions(nodeId),
+ pctx.parameters());
+
+ messageService().send(nodeId, req);
+ }
+ catch (Exception e) {
+ info.onResponse(nodeId, fragment.fragmentId(), e);
+ error = true;
+ }
+ }
+ }
+
+ if (error) {
+ info.awaitAllReplies();
+
+ throw new AssertionError(); // Previous call must throw an exception
+ }
+ }
+ }
+
+ return new ListFieldsQueryCursor<>(info.iterator(), Arrays::asList, plan.fieldsMetadata());
+ }
+
+ /** */
+ private void register(QueryInfo info) {
+ UUID queryId = info.ctx.queryId();
+ PlanningContext pctx = info.ctx.parent();
+
+ running.put(queryId, info);
+
+ if (pctx.cancelGroup() == null || pctx.cancelGroup().add(info))
+ return;
+
+ running.remove(queryId);
+
+ throw new IgniteSQLException(QueryCancelledException.ERR_MSG, IgniteQueryErrorCode.QUERY_CANCELED);
+ }
+
+ /** */
+ private QueryCancelGroup cancelGroup(@Nullable QueryContext qryCtx) {
+ GridQueryCancel cancel;
+
+ if (qryCtx == null || (cancel = qryCtx.unwrap(GridQueryCancel.class)) == null)
+ return null;
+
+ return new QueryCancelGroup(cancel, failureProcessor());
+ }
+
+ /** */
+ private List<GridQueryFieldMetadata> fieldsMetadata(ValidationResult validationResult, PlanningContext ctx) {
+ List<RelDataTypeField> fields = validationResult.dataType().getFieldList();
+ List<List<String>> origins = validationResult.origins();
+
+ assert fields.size() == origins.size();
+
+ ImmutableList.Builder<GridQueryFieldMetadata> b = ImmutableList.builder();
+
+ for (int i = 0; i < fields.size(); i++) {
+ RelDataTypeField field = fields.get(i);
+ List<String> origin = origins.get(i);
+
+ b.add(new CalciteQueryFieldMetadata(
+ F.isEmpty(origin) ? null : origin.get(0),
+ F.isEmpty(origin) ? null : origin.get(1),
+ field.getName(),
+ String.valueOf(ctx.typeFactory().getJavaClass(field.getType())),
+ field.getType().getPrecision(),
+ field.getType().getScale()
+ ));
+ }
+
+ return b.build();
+ }
+
+ /** */
+ private boolean single(SqlNode sqlNode) {
+ return !(sqlNode instanceof SqlNodeList);
+ }
+
+ /** */
+ private void onMessage(UUID nodeId, QueryStartRequest msg) {
assert nodeId != null && msg != null;
PlanningContext ctx = createContext(msg.schema(), nodeId, msg.topologyVersion());
-
+ RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(IgniteMetadata.METADATA_PROVIDER));
try (IgnitePlanner planner = ctx.planner()) {
IgniteRel root = planner.convert(msg.plan());
assert root instanceof IgniteSender : root;
- // TODO do we need a local optimisation phase here?
ExecutionContext execCtx = new ExecutionContext(
taskExecutor(),
ctx,
@@ -574,12 +710,27 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
assert node instanceof Outbox : node;
- node.request();
+ node.context().execute(node::request);
messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentId()));
}
catch (Exception ex) {
- messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentId(), ex));
+ cancelQuery(msg.queryId());
+
+ if (ex instanceof ClusterTopologyCheckedException)
+ return;
+
+ try {
+ messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentId(), ex));
+ }
+ catch (IgniteCheckedException e) {
+ e.addSuppressed(ex);
+
+ U.warn(log, "Failed to send reply. [nodeId=" + nodeId + ']', e);
+ }
+ }
+ finally {
+ RelMetadataQuery.THREAD_PROVIDERS.remove();
}
}
@@ -611,10 +762,28 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
/** */
private void onNodeLeft(UUID nodeId) {
running.forEach((uuid, queryInfo) -> queryInfo.onNodeLeft(nodeId));
+
+ final Predicate<Node<?>> p = new OriginatingFilter(nodeId);
+
+ mailboxRegistry().outboxes(null).stream()
+ .filter(p).forEach(this::executeCancel);
+
+ mailboxRegistry().inboxes(null).stream()
+ .filter(p).forEach(this::executeCancel);
}
/** */
- private static class RemoteFragmentKey {
+ private void executeCancel(Node<?> node) {
+ node.context().execute(node::cancel);
+ }
+
+ /** */
+ private enum QueryState {
+ RUNNING, CANCELLING, CANCELLED
+ }
+
+ /** */
+ private static final class RemoteFragmentKey {
/** */
private final UUID nodeId;
@@ -650,19 +819,11 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
}
/** */
- private enum QueryState {
- RUNNING, CANCELLING, CANCELLED
- }
-
- /** */
private final class QueryInfo implements QueryCancellable {
/** */
private final ExecutionContext ctx;
/** */
- private final RelDataType type;
-
- /** */
private final ConsumerNode consumer;
/** remote nodes */
@@ -678,41 +839,36 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
private Throwable error;
/** */
- private QueryInfo(ExecutionContext ctx, RelDataType type, ConsumerNode consumer) {
+ private QueryInfo(ExecutionContext ctx, List<Fragment> fragments, Node<Object[]> root) {
this.ctx = ctx;
- this.type = type;
- this.consumer = consumer;
+
+ consumer = new ConsumerNode(ctx, root, ExecutionServiceImpl.this::onConsumerClose);
remotes = new HashSet<>();
waiting = new HashSet<>();
- state = QueryState.RUNNING;
- }
+ for (int i = 1; i < fragments.size(); i++) {
+ Fragment fragment = fragments.get(i);
+ long id = fragment.fragmentId();
+ List<UUID> nodes = fragment.mapping().nodes();
- /** {@inheritDoc} */
- @Override public void doCancel() {
- cancel();
- }
+ remotes.addAll(nodes);
- /** */
- private ConsumerNode localNode() {
- return consumer;
- }
+ for (UUID node : nodes)
+ waiting.add(new RemoteFragmentKey(node, id));
+ }
- /** */
- private RelDataType type() {
- return type;
+ state = QueryState.RUNNING;
}
/** */
- private <T> Iterator<T> iterator() {
- return (Iterator<T>) consumer;
+ public Iterator<Object[]> iterator() {
+ return iteratorsHolder().iterator(consumer);
}
- /** */
- private void addFragment(UUID nodeId, long fragmentId) {
- remotes.add(nodeId);
- waiting.add(new RemoteFragmentKey(nodeId, fragmentId));
+ /** {@inheritDoc} */
+ @Override public void doCancel() {
+ cancel();
}
/** */
@@ -759,8 +915,21 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
if (cancelLocal)
consumer.cancel();
- if (cancelRemote)
- messageService().send(remotes, new QueryCancelRequest(ctx.queryId()));
+ if (cancelRemote) {
+ QueryCancelRequest msg = new QueryCancelRequest(ctx.queryId());
+
+ for (UUID remote : remotes) {
+ try {
+ messageService().send(remote, msg);
+ }
+ catch (ClusterTopologyCheckedException e) {
+ U.warn(log, e.getMessage(), e);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+ }
if (state0 == QueryState.CANCELLED)
running.remove(ctx.queryId());
@@ -822,4 +991,21 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
cancel();
}
}
+
+ /** */
+ private static class OriginatingFilter implements Predicate<Node<?>> {
+ /** */
+ private final UUID nodeId;
+
+ /** */
+ private OriginatingFilter(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Node<?> node) {
+ // Uninitialized inbox doesn't know originating node ID.
+ return Objects.equals(node.context().originatingNodeId(), nodeId);
+ }
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Implementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Implementor.java
index 4199f7f..de2bfb8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Implementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Implementor.java
@@ -25,6 +25,8 @@ import org.apache.calcite.schema.ScannableTable;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.query.calcite.metadata.PartitionService;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelSource;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelTarget;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
@@ -35,8 +37,6 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import org.apache.ignite.internal.processors.query.calcite.rel.RelOp;
-import org.apache.ignite.internal.processors.query.calcite.splitter.RelSource;
-import org.apache.ignite.internal.processors.query.calcite.splitter.RelTarget;
import org.apache.ignite.internal.processors.query.calcite.trait.Destination;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
index 7dd462f..b5b80f6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
@@ -17,9 +17,10 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
-import java.util.List;
+import java.util.Collection;
import java.util.UUID;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
+import org.jetbrains.annotations.Nullable;
/**
*
@@ -77,16 +78,16 @@ public interface MailboxRegistry extends Service {
/**
* Returns all registered inboxes for provided query ID.
*
- * @param queryId Query ID.
+ * @param queryId Query ID. {@code null} means return all registered inboxes.
* @return Registered inboxes.
*/
- List<Inbox<?>> inboxes(UUID queryId);
+ Collection<Inbox<?>> inboxes(@Nullable UUID queryId);
/**
* Returns all registered outboxes for provided query ID.
*
- * @param queryId Query ID.
+ * @param queryId Query ID. {@code null} means return all registered outboxes.
* @return Registered outboxes.
*/
- List<Outbox<?>> outboxes(UUID queryId);
+ Collection<Outbox<?>> outboxes(@Nullable UUID queryId);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
index cc6e0be..1846e3b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
@@ -17,13 +17,14 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
-import java.util.List;
+import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
+import org.jetbrains.annotations.Nullable;
/**
*
@@ -80,7 +81,10 @@ public class MailboxRegistryImpl extends AbstractService implements MailboxRegis
}
/** {@inheritDoc} */
- @Override public List<Inbox<?>> inboxes(UUID queryId) {
+ @Override public Collection<Inbox<?>> inboxes(@Nullable UUID queryId) {
+ if (queryId == null)
+ return remotes.values();
+
return remotes.entrySet().stream()
.filter(e -> e.getKey().queryId.equals(queryId))
.map(Map.Entry::getValue)
@@ -88,11 +92,15 @@ public class MailboxRegistryImpl extends AbstractService implements MailboxRegis
}
/** {@inheritDoc} */
- @Override public List<Outbox<?>> outboxes(UUID queryId) {
+ @Override public Collection<Outbox<?>> outboxes(@Nullable UUID queryId) {
+ if (queryId == null)
+ return locals.values();
+
return locals.entrySet().stream()
.filter(e -> e.getKey().queryId.equals(queryId))
.map(Map.Entry::getValue)
- .collect(Collectors.toList()); }
+ .collect(Collectors.toList());
+ }
/** */
private static class MailboxKey {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryCancelGroup.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryCancelGroup.java
new file mode 100644
index 0000000..6e2573a
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryCancelGroup.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.failure.FailureProcessor;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryCancellable;
+
+/** */
+public final class QueryCancelGroup implements QueryCancellable {
+ /** */
+ private final FailureProcessor failureProcessor;
+
+ /** */
+ private final Set<QueryCancellable> queries;
+
+ /** */
+ private boolean cancelled;
+
+ /** */
+ public QueryCancelGroup(GridQueryCancel cancel, FailureProcessor failureProcessor) {
+ this.failureProcessor = failureProcessor;
+
+ queries = new HashSet<>();
+
+ register(cancel);
+ }
+
+ /**
+ * Adds a cancellable to the group.
+ *
+ * @param query Query cancellable object.
+ * @return {@code false} if query was cancelled before this call.
+ */
+ public synchronized boolean add(QueryCancellable query) {
+ if (cancelled)
+ return false;
+
+ boolean res = queries.add(query);
+
+ assert res;
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void doCancel() {
+ cancelled = true;
+
+ try {
+ for (QueryCancellable query : queries)
+ query.doCancel();
+ }
+ catch (Exception e) {
+ failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+
+ throw e;
+ }
+ }
+
+ /** */
+ private void register(GridQueryCancel cancel) {
+ try {
+ cancel.set(this);
+ }
+ catch (QueryCancelledException e) {
+ throw new IgniteSQLException(QueryCancelledException.ERR_MSG, IgniteQueryErrorCode.QUERY_CANCELED);
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
index 393d26d..5405534 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
@@ -49,7 +49,7 @@ public class ScanNode extends AbstractNode<Object[]> implements SingleNode<Objec
requestInternal();
}
catch (Exception e) {
- Commons.close(it, e);
+ Commons.closeQuiet(it, e);
throw e;
}
@@ -98,7 +98,7 @@ public class ScanNode extends AbstractNode<Object[]> implements SingleNode<Objec
/** {@inheritDoc} */
@Override public void close() {
- Commons.close(it);
+ Commons.closeQuiet(it);
it = null;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
index 02bbe46..3293365 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.processors.query.calcite.message;
-import java.util.Collection;
import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
/**
@@ -26,20 +26,12 @@ import org.apache.ignite.internal.processors.query.calcite.util.Service;
*/
public interface MessageService extends Service {
/**
- * Sends a message to given nodes.
- *
- * @param nodeIds Nodes IDs.
- * @param msg Message.
- */
- void send(Collection<UUID> nodeIds, CalciteMessage msg);
-
- /**
* Sends a message to given node.
*
* @param nodeId Node ID.
* @param msg Message.
*/
- void send(UUID nodeId, CalciteMessage msg);
+ void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException;
/**
* Registers a listener for messages of a given type.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
index c6e3c2b..219fd92 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.query.calcite.message;
-import java.util.Collection;
import java.util.EnumMap;
import java.util.Objects;
import java.util.UUID;
@@ -27,7 +26,6 @@ import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
@@ -189,32 +187,16 @@ public class MessageServiceImpl extends AbstractService implements MessageServic
}
/** {@inheritDoc} */
- @Override public void send(Collection<UUID> nodeIds, CalciteMessage msg) {
- for (UUID nodeId : nodeIds)
- send(nodeId, msg);
- }
-
- /** {@inheritDoc} */
- @Override public void send(UUID nodeId, CalciteMessage msg) {
+ @Override public void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException {
if (localNodeId().equals(nodeId))
onMessage(nodeId, msg, true);
else {
byte plc = msg instanceof ExecutionContextAware ?
((ExecutionContextAware) msg).ioPolicy() : GridIoPolicy.QUERY_POOL;
- if (!prepareMarshal(msg))
- return;
-
- try {
- ioManager().sendToGridTopic(nodeId, GridTopic.TOPIC_QUERY, msg, plc);
- }
- catch (ClusterTopologyCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to send message, node failed: " + nodeId);
- }
- catch (IgniteCheckedException e) {
- failureProcessor().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
- }
+ prepareMarshal(msg);
+
+ ioManager().sendToGridTopic(nodeId, GridTopic.TOPIC_QUERY, msg, plc);
}
}
@@ -229,45 +211,41 @@ public class MessageServiceImpl extends AbstractService implements MessageServic
}
/** */
- protected void onMessage(UUID nodeId, CalciteMessage msg, boolean async) {
- if (msg instanceof ExecutionContextAware) {
- ExecutionContextAware msg0 = (ExecutionContextAware) msg;
- taskExecutor().execute(msg0.queryId(), msg0.fragmentId(), () -> onMessageInternal(nodeId, msg));
- }
- else if (async)
- taskExecutor().execute(IgniteUuid.VM_ID, ThreadLocalRandom.current().nextLong(1024), () -> onMessageInternal(nodeId, msg));
- else
- onMessageInternal(nodeId, msg);
- }
-
- /** */
- protected boolean prepareMarshal(Message msg) {
+ protected void prepareMarshal(Message msg) throws IgniteCheckedException {
try {
if (msg instanceof MarshalableMessage)
((MarshalableMessage) msg).prepareMarshal(marshaller());
-
- return true;
}
catch (IgniteCheckedException e) {
failureProcessor().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
- }
- return false;
+ throw e;
+ }
}
/** */
- protected boolean prepareUnmarshal(Message msg) {
+ protected void prepareUnmarshal(Message msg) throws IgniteCheckedException {
try {
if (msg instanceof MarshalableMessage)
((MarshalableMessage) msg).prepareUnmarshal(marshaller(), classLoader());
-
- return true;
}
catch (IgniteCheckedException e) {
failureProcessor().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+
+ throw e;
}
+ }
- return false;
+ /** */
+ protected void onMessage(UUID nodeId, CalciteMessage msg, boolean async) {
+ if (msg instanceof ExecutionContextAware) {
+ ExecutionContextAware msg0 = (ExecutionContextAware) msg;
+ taskExecutor().execute(msg0.queryId(), msg0.fragmentId(), () -> onMessageInternal(nodeId, msg));
+ }
+ else if (async)
+ taskExecutor().execute(IgniteUuid.VM_ID, ThreadLocalRandom.current().nextLong(1024), () -> onMessageInternal(nodeId, msg));
+ else
+ onMessageInternal(nodeId, msg);
}
/** */
@@ -278,10 +256,14 @@ public class MessageServiceImpl extends AbstractService implements MessageServic
/** */
private void onMessageInternal(UUID nodeId, CalciteMessage msg) {
- if (!prepareUnmarshal(msg))
- return;
+ try {
+ prepareUnmarshal(msg);
- MessageListener lsnr = Objects.requireNonNull(lsnrs.get(msg.type()));
- lsnr.onMessage(nodeId, msg);
+ MessageListener lsnr = Objects.requireNonNull(lsnrs.get(msg.type()));
+ lsnr.onMessage(nodeId, msg);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
index b3c5720..a24d4b1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
@@ -19,9 +19,9 @@ package org.apache.ignite.internal.processors.query.calcite.metadata;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelSource;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.splitter.RelSource;
/**
* Collects meta information about a query fragment, mainly it is data
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
index 1c4c940..48e9800 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.query.calcite.metadata;
import java.util.List;
import org.apache.calcite.plan.hep.HepRelVertex;
import org.apache.calcite.plan.volcano.RelSubset;
-import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
@@ -41,6 +40,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.schema.DistributedTable;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
@@ -106,11 +106,7 @@ public class IgniteMdDistribution implements MetadataHandler<BuiltInMetadata.Dis
* See {@link IgniteMdDistribution#distribution(RelNode, RelMetadataQuery)}
*/
public IgniteDistribution distribution(LogicalTableScan rel, RelMetadataQuery mq) {
- RelDistribution distr = rel.getTable().getDistribution();
-
- assert distr instanceof IgniteDistribution : distr;
-
- return (IgniteDistribution) distr;
+ return rel.getTable().unwrap(DistributedTable.class).distribution();
}
/**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
index 7b82c33..3fdd28d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
@@ -31,11 +31,11 @@ import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMetadata;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Edge;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import org.apache.ignite.internal.processors.query.calcite.schema.DistributedTable;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Edge;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
@@ -63,35 +63,35 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
* @param mq Metadata query instance. Used to request appropriate metadata from node children.
* @return Fragment meta information.
*/
- public FragmentInfo getFragmentInfo(RelNode rel, RelMetadataQuery mq) {
+ public FragmentInfo fragmentInfo(RelNode rel, RelMetadataQuery mq) {
throw new AssertionError();
}
/**
- * See {@link IgniteMdFragmentInfo#getFragmentInfo(RelNode, RelMetadataQuery)}
+ * See {@link IgniteMdFragmentInfo#fragmentInfo(RelNode, RelMetadataQuery)}
*/
- public FragmentInfo getFragmentInfo(RelSubset rel, RelMetadataQuery mq) {
+ public FragmentInfo fragmentInfo(RelSubset rel, RelMetadataQuery mq) {
throw new AssertionError();
}
/**
- * See {@link IgniteMdFragmentInfo#getFragmentInfo(RelNode, RelMetadataQuery)}
+ * See {@link IgniteMdFragmentInfo#fragmentInfo(RelNode, RelMetadataQuery)}
*
* Prunes involved partitions (hence nodes, involved in query execution) if possible.
*/
- public FragmentInfo getFragmentInfo(IgniteFilter rel, RelMetadataQuery mq) {
- return fragmentInfo(rel.getInput(), mq).prune(rel);
+ public FragmentInfo fragmentInfo(IgniteFilter rel, RelMetadataQuery mq) {
+ return _fragmentInfo(rel.getInput(), mq).prune(rel);
}
/**
- * See {@link IgniteMdFragmentInfo#getFragmentInfo(RelNode, RelMetadataQuery)}
+ * See {@link IgniteMdFragmentInfo#fragmentInfo(RelNode, RelMetadataQuery)}
*/
- public FragmentInfo getFragmentInfo(SingleRel rel, RelMetadataQuery mq) {
- return fragmentInfo(rel.getInput(), mq);
+ public FragmentInfo fragmentInfo(SingleRel rel, RelMetadataQuery mq) {
+ return _fragmentInfo(rel.getInput(), mq);
}
/**
- * See {@link IgniteMdFragmentInfo#getFragmentInfo(RelNode, RelMetadataQuery)}
+ * See {@link IgniteMdFragmentInfo#fragmentInfo(RelNode, RelMetadataQuery)}
*
* {@link LocationMappingException} may be thrown on two children nodes locations merge. This means
* that the fragment (which part the parent node is) cannot be executed on any node and additional exchange
@@ -99,11 +99,11 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
* exchange. After the exchange is put into the fragment and the fragment is split into two ones, fragment meta
* information will be recalculated for all fragments.
*/
- public FragmentInfo getFragmentInfo(Join rel, RelMetadataQuery mq) {
+ public FragmentInfo fragmentInfo(Join rel, RelMetadataQuery mq) {
mq = RelMetadataQueryEx.wrap(mq);
- FragmentInfo left = fragmentInfo(rel.getLeft(), mq);
- FragmentInfo right = fragmentInfo(rel.getRight(), mq);
+ FragmentInfo left = _fragmentInfo(rel.getLeft(), mq);
+ FragmentInfo right = _fragmentInfo(rel.getRight(), mq);
try {
return left.merge(right);
@@ -126,16 +126,16 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
}
/**
- * See {@link IgniteMdFragmentInfo#getFragmentInfo(RelNode, RelMetadataQuery)}
+ * See {@link IgniteMdFragmentInfo#fragmentInfo(RelNode, RelMetadataQuery)}
*/
- public FragmentInfo getFragmentInfo(IgniteReceiver rel, RelMetadataQuery mq) {
+ public FragmentInfo fragmentInfo(IgniteReceiver rel, RelMetadataQuery mq) {
return new FragmentInfo(Pair.of(rel, rel.source()));
}
/**
- * See {@link IgniteMdFragmentInfo#getFragmentInfo(RelNode, RelMetadataQuery)}
+ * See {@link IgniteMdFragmentInfo#fragmentInfo(RelNode, RelMetadataQuery)}
*/
- public FragmentInfo getFragmentInfo(IgniteTableScan rel, RelMetadataQuery mq) {
+ public FragmentInfo fragmentInfo(IgniteTableScan rel, RelMetadataQuery mq) {
return new FragmentInfo(rel.getTable().unwrap(DistributedTable.class).mapping(Commons.context(rel)));
}
@@ -145,7 +145,7 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
* @param mq Metadata query instance.
* @return Fragment meta information.
*/
- public static FragmentInfo fragmentInfo(RelNode rel, RelMetadataQuery mq) {
+ public static FragmentInfo _fragmentInfo(RelNode rel, RelMetadataQuery mq) {
return RelMetadataQueryEx.wrap(mq).getFragmentInfo(rel);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
index 23628f6..8437f0b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
@@ -47,11 +47,11 @@ public class IgniteMetadata {
FragmentMetadata.Handler.class, IgniteMethod.FRAGMENT_INFO.method());
/** Determines how the rows are distributed. */
- FragmentInfo getFragmentInfo();
+ FragmentInfo fragmentInfo();
/** Handler API. */
interface Handler extends MetadataHandler<FragmentMetadata> {
- FragmentInfo getFragmentInfo(RelNode r, RelMetadataQuery mq);
+ FragmentInfo fragmentInfo(RelNode r, RelMetadataQuery mq);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
index fd4c33b..8857b45 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.metadata;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Edge;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Edge;
/**
*
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
index 28be9ef..7725d83 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
@@ -121,7 +121,7 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
public FragmentInfo getFragmentInfo(RelNode rel) {
for (;;) {
try {
- return sourceDistributionHandler.getFragmentInfo(rel, this);
+ return sourceDistributionHandler.fragmentInfo(rel, this);
} catch (JaninoRelMetadataProvider.NoHandler e) {
sourceDistributionHandler = revise(e.relClass, IgniteMetadata.FragmentMetadata.DEF);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/CalciteQueryFieldMetadata.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/CalciteQueryFieldMetadata.java
new file mode 100644
index 0000000..88345cd
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/CalciteQueryFieldMetadata.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+
+/**
+ *
+ */
+public class CalciteQueryFieldMetadata implements GridQueryFieldMetadata {
+ /** */
+ private String schemaName;
+
+ /** */
+ private String typeName;
+
+ /** */
+ private String fieldName;
+
+ /** */
+ private String fieldTypeName;
+
+ /** */
+ private int precision;
+
+ /** */
+ private int scale;
+
+ /** Blank constructor for external serialization. */
+ public CalciteQueryFieldMetadata() {
+ }
+
+ /**
+ * @param schemaName Schema name.
+ * @param typeName Type name.
+ * @param fieldName Field name.
+ * @param fieldTypeName Field type name.
+ * @param precision Precision.
+ * @param scale Scale.
+ */
+ public CalciteQueryFieldMetadata(String schemaName, String typeName, String fieldName, String fieldTypeName, int precision, int scale) {
+ this.schemaName = schemaName;
+ this.typeName = typeName;
+ this.fieldName = fieldName;
+ this.fieldTypeName = fieldTypeName;
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String schemaName() {
+ return schemaName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String typeName() {
+ return typeName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String fieldName() {
+ return fieldName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String fieldTypeName() {
+ return fieldTypeName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int precision() {
+ return precision;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int scale() {
+ return scale;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeUTF(schemaName);
+ out.writeUTF(typeName);
+ out.writeUTF(fieldName);
+ out.writeUTF(fieldTypeName);
+ out.writeInt(precision);
+ out.writeInt(scale);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ schemaName = in.readUTF();
+ typeName = in.readUTF();
+ fieldName = in.readUTF();
+ fieldTypeName = in.readUTF();
+ precision = in.readInt();
+ scale = in.readInt();
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
similarity index 96%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Cloner.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index b3ac66e..2c9d0a7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Cloner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
import java.util.ArrayList;
import java.util.Collections;
@@ -54,7 +54,7 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
* @param plan Plan to clone.
* @return New plan.
*/
- QueryPlan go(QueryPlan plan) {
+ MultiStepPlan go(MultiStepPlan plan) {
List<Fragment> planFragments = plan.fragments();
assert !F.isEmpty(planFragments);
@@ -69,7 +69,7 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
Collections.reverse(fragments);
- return new QueryPlan(fragments);
+ return new MultiStepPlanImpl(fragments, plan.fieldsMetadata());
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Edge.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Edge.java
similarity index 95%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Edge.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Edge.java
index 2863288..fe0f87b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Edge.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Edge.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
import org.apache.calcite.rel.RelNode;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
similarity index 96%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
index 3d9a39b..b9ef7c3 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
import com.google.common.collect.ImmutableList;
import java.util.Collections;
@@ -30,7 +30,6 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.LocationMapp
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
@@ -79,7 +78,7 @@ public class Fragment implements RelSource {
* @param mq Metadata query used for data location calculation.
*/
public void init(MappingService mappingService, PlanningContext ctx, RelMetadataQuery mq) {
- FragmentInfo info = IgniteMdFragmentInfo.fragmentInfo(root, mq);
+ FragmentInfo info = IgniteMdFragmentInfo._fragmentInfo(root, mq);
mapping = fragmentMapping(mappingService, ctx, info, mq);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index 40dcf0e..1a89a17 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -24,15 +24,13 @@ import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCostImpl;
+import org.apache.calcite.plan.RelOptLattice;
+import org.apache.calcite.plan.RelOptMaterialization;
import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.plan.hep.HepPlanner;
-import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.plan.volcano.VolcanoUtils;
import org.apache.calcite.prepare.CalciteCatalogReader;
@@ -47,28 +45,28 @@ import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexExecutor;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.sql.validate.SqlValidator;
-import org.apache.calcite.sql2rel.RelDecorrelator;
import org.apache.calcite.sql2rel.SqlRexConvertletTable;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.Program;
-import org.apache.calcite.tools.Programs;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.serialize.relation.GraphToRelConverter;
import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelGraph;
import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelToGraphConverter;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
* Query planer.
@@ -112,15 +110,9 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
private final JavaTypeFactory typeFactory;
/** */
- private boolean open;
-
- /** */
private RelOptPlanner planner;
/** */
- private RelMetadataProvider metadataProvider;
-
- /** */
private SqlValidator validator;
/**
@@ -143,11 +135,9 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
traitDefs = frameworkConfig.getTraitDefs();
}
-
-
/** {@inheritDoc} */
@Override public RelTraitSet getEmptyTraitSet() {
- return planner.emptyTraitSet();
+ return planner().emptyTraitSet();
}
/** {@inheritDoc} */
@@ -158,12 +148,7 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
/** {@inheritDoc} */
@Override public void reset() {
planner = null;
- metadataProvider = null;
validator = null;
-
- RelMetadataQuery.THREAD_PROVIDERS.remove();
-
- open = false;
}
/**
@@ -173,35 +158,17 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
return ctx;
}
- /** */
- private void ready() {
- if (!open) {
- planner = VolcanoUtils.impatient(new VolcanoPlanner(frameworkConfig.getCostFactory(), ctx));
- planner.setExecutor(rexExecutor);
- metadataProvider = new CachingRelMetadataProvider(IgniteMetadata.METADATA_PROVIDER, planner);
-
- validator = new IgniteSqlValidator(operatorTable(), createCatalogReader(), typeFactory, conformance());
- validator.setIdentifierExpansion(true);
-
- for (RelTraitDef<?> def : traitDefs) {
- planner.addRelTraitDef(def);
- }
-
- open = true;
- }
- }
-
/** {@inheritDoc} */
@Override public SqlNode parse(Reader reader) throws SqlParseException {
- return SqlParser.create(reader, parserConfig).parseStmt();
+ SqlNodeList sqlNodes = SqlParser.create(reader, parserConfig).parseStmtList();
+
+ return sqlNodes.size() == 1 ? sqlNodes.get(0) : sqlNodes;
}
/** {@inheritDoc} */
@Override public SqlNode validate(SqlNode sqlNode) throws ValidationException {
- ready();
-
try {
- return validator.validate(sqlNode);
+ return validator().validate(sqlNode);
}
catch (RuntimeException e) {
throw new ValidationException(e);
@@ -210,13 +177,26 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
/** {@inheritDoc} */
@Override public Pair<SqlNode, RelDataType> validateAndGetType(SqlNode sqlNode) throws ValidationException {
- ready();
-
- SqlNode validatedNode = validate(sqlNode);
- RelDataType type = validator.getValidatedNodeType(validatedNode);
+ SqlNode validatedNode = validator().validate(sqlNode);
+ RelDataType type = validator().getValidatedNodeType(validatedNode);
return Pair.of(validatedNode, type);
}
+ /**
+ * Validates a SQL statement.
+ *
+ * @param sqlNode Root node of the SQL parse tree.
+ * @return Validated node, its validated type and type's origins.
+ * @throws ValidationException if not valid
+ */
+ public ValidationResult validateAndGetTypeMetadata(SqlNode sqlNode) throws ValidationException {
+ SqlNode validatedNode = validator().validate(sqlNode);
+ RelDataType type = validator().getValidatedNodeType(validatedNode);
+ List<List<String>> origins = validator().getFieldOrigins(validatedNode);
+
+ return new ValidationResult(validatedNode, type, origins);
+ }
+
/** {@inheritDoc} */
@Override public RelNode convert(SqlNode sql) {
return rel(sql).rel;
@@ -229,8 +209,6 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
* @return Root node of relational tree.
*/
public IgniteRel convert(RelGraph graph) {
- ready();
-
RelOptCluster cluster = createCluster();
RelBuilder relBuilder = createRelBuilder(cluster, createCatalogReader());
@@ -239,15 +217,13 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
/** Creates a cluster. */
RelOptCluster createCluster() {
- ready();
-
- return createCluster(createRexBuilder());
+ RelOptCluster cluster = RelOptCluster.create(planner(), createRexBuilder());
+ cluster.setMetadataProvider(IgniteMetadata.METADATA_PROVIDER);
+ return cluster;
}
/** {@inheritDoc} */
@Override public RelRoot rel(SqlNode sql) {
- ready();
-
RelOptCluster cluster = createCluster();
SqlToRelConverter.Config config = SqlToRelConverter.configBuilder()
.withConfig(sqlToRelConverterConfig)
@@ -256,11 +232,8 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
.build();
SqlToRelConverter sqlToRelConverter =
new SqlToRelConverter(this, validator, createCatalogReader(), cluster, convertletTable, config);
- RelRoot root = sqlToRelConverter.convertQuery(sql, false, true);
- root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
- RelBuilder relBuilder = createRelBuilder(cluster, null);
- root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel, relBuilder));
- return root;
+
+ return sqlToRelConverter.convertQuery(sql, false, true);
}
/**
@@ -270,8 +243,6 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
* @return Relational nodes tree representation.
*/
public RelGraph graph(RelNode rel) {
- ready();
-
if (rel.getConvention() != IgniteConvention.INSTANCE)
throw new IllegalArgumentException("Physical node is required.");
@@ -280,15 +251,13 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
/** {@inheritDoc} */
@Override public RelRoot expandView(RelDataType rowType, String queryString, List<String> schemaPath, List<String> viewPath) {
- ready();
-
SqlParser parser = SqlParser.create(queryString, parserConfig);
SqlNode sqlNode;
try {
sqlNode = parser.parseQuery();
}
catch (SqlParseException e) {
- throw new RuntimeException("parse failed", e);
+ throw new IgniteSQLException("parse failed", IgniteQueryErrorCode.PARSING, e);
}
SqlConformance conformance = conformance();
@@ -305,86 +274,74 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
.withConvertTableAccess(false)
.build();
SqlToRelConverter sqlToRelConverter =
- new SqlToRelConverter(this, validator,
- catalogReader, cluster, convertletTable, config);
+ new SqlToRelConverter(this, validator, catalogReader, cluster, convertletTable, config);
- RelRoot root = sqlToRelConverter.convertQuery(sqlNode, true, false);
- RelRoot root2 = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
- RelBuilder relBuilder = createRelBuilder(cluster, null);
- return root2.withRel(RelDecorrelator.decorrelateQuery(root.rel, relBuilder));
- }
-
- /** */
- private RelOptCluster createCluster(RexBuilder rexBuilder) {
- RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
-
- cluster.setMetadataProvider(metadataProvider);
- RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(metadataProvider));
-
- return cluster;
+ return sqlToRelConverter.convertQuery(sqlNode, true, false);
}
/** {@inheritDoc} */
@Override public RelNode transform(int programIdx, RelTraitSet targetTraits, RelNode rel) {
- ready();
-
- RelTraitSet toTraits = targetTraits.simplify();
+ RelMetadataProvider clusterProvider = rel.getCluster().getMetadataProvider();
+ JaninoRelMetadataProvider threadProvider = RelMetadataQuery.THREAD_PROVIDERS.get();
+ try {
+ rel.getCluster().setMetadataProvider(new CachingRelMetadataProvider(IgniteMetadata.METADATA_PROVIDER, planner()));
+ RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(rel.getCluster().getMetadataProvider()));
- return programs.get(programIdx).run(planner, rel, toTraits, ImmutableList.of(), ImmutableList.of());
+ return programs.get(programIdx).run(planner(), rel, targetTraits.simplify(), materializations(), latices());
+ }
+ finally {
+ rel.getCluster().setMetadataProvider(clusterProvider);
+ RelMetadataQuery.THREAD_PROVIDERS.set(threadProvider);
+ }
}
/**
* Converts one relational nodes tree into another relational nodes tree
* based on a particular planner type, planning phase and required set of traits.
- * @param plannerType Planner type.
- * @param plannerPhase Planner phase.
- * @param input Root node of relational tree.
+ * @param phase Planner phase.
* @param targetTraits Target traits.
+ * @param rel Root node of relational tree.
* @return The root of the new RelNode tree.
*/
- public <T extends RelNode> T transform(PlannerType plannerType, PlannerPhase plannerPhase, RelNode input, RelTraitSet targetTraits) {
- ready();
-
- RelTraitSet toTraits = targetTraits.simplify();
-
- RelNode output;
-
- switch (plannerType) {
- case HEP:
- final HepProgramBuilder programBuilder = new HepProgramBuilder();
-
- for (RelOptRule rule : plannerPhase.getRules(Commons.context(ctx))) {
- programBuilder.addRuleInstance(rule);
- }
-
- final HepPlanner hepPlanner =
- new HepPlanner(programBuilder.build(), ctx, true, null, RelOptCostImpl.FACTORY);
-
- hepPlanner.setRoot(input);
-
- if (!input.getTraitSet().equals(targetTraits))
- hepPlanner.changeTraits(input, toTraits);
+ public <T extends RelNode> T transform(PlannerPhase phase, RelTraitSet targetTraits, RelNode rel) {
+ RelMetadataProvider clusterProvider = rel.getCluster().getMetadataProvider();
+ JaninoRelMetadataProvider threadProvider = RelMetadataQuery.THREAD_PROVIDERS.get();
+ try {
+ rel.getCluster().setMetadataProvider(new CachingRelMetadataProvider(IgniteMetadata.METADATA_PROVIDER, planner()));
+ RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(rel.getCluster().getMetadataProvider()));
- output = hepPlanner.findBestExp();
+ return (T) phase.getProgram(ctx).run(planner(), rel, targetTraits.simplify(), materializations(), latices());
+ }
+ finally {
+ rel.getCluster().setMetadataProvider(clusterProvider);
+ RelMetadataQuery.THREAD_PROVIDERS.set(threadProvider);
+ }
+ }
- break;
- case VOLCANO:
- Program program = Programs.of(plannerPhase.getRules(Commons.context(ctx)));
+ /** {@inheritDoc} */
+ @Override public JavaTypeFactory getTypeFactory() {
+ return typeFactory;
+ }
- output = program.run(planner, input, toTraits,
- ImmutableList.of(), ImmutableList.of());
+ /** */
+ private RelOptPlanner planner() {
+ if (planner == null) {
+ planner = VolcanoUtils.impatient(new VolcanoPlanner(frameworkConfig.getCostFactory(), ctx));
+ planner.setExecutor(rexExecutor);
- break;
- default:
- throw new AssertionError("Unknown planner type: " + plannerType);
+ for (RelTraitDef<?> def : traitDefs)
+ planner.addRelTraitDef(def);
}
- return (T) output;
+ return planner;
}
- /** {@inheritDoc} */
- @Override public JavaTypeFactory getTypeFactory() {
- return typeFactory;
+ /** */
+ private SqlValidator validator() {
+ if (validator == null)
+ validator = new IgniteSqlValidator(operatorTable(), createCatalogReader(), typeFactory, conformance());
+
+ return validator;
}
/** */
@@ -409,10 +366,8 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
/** */
private CalciteCatalogReader createCatalogReader() {
- SchemaPlus rootSchema = rootSchema(defaultSchema);
-
return new CalciteCatalogReader(
- CalciteSchema.from(rootSchema),
+ CalciteSchema.from(rootSchema(defaultSchema)),
CalciteSchema.from(defaultSchema).path(null),
typeFactory, connectionConfig);
}
@@ -426,4 +381,14 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
schema = schema.getParentSchema();
}
}
+
+ /** */
+ private List<RelOptLattice> latices() {
+ return ImmutableList.of(); // TODO
+ }
+
+ /** */
+ private List<RelOptMaterialization> materializations() {
+ return ImmutableList.of(); // TODO
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePrograms.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePrograms.java
new file mode 100644
index 0000000..6bbc86b
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePrograms.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import org.apache.calcite.plan.RelOptLattice;
+import org.apache.calcite.plan.RelOptMaterialization;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.hep.HepPlanner;
+import org.apache.calcite.plan.hep.HepProgramBuilder;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.sql2rel.RelDecorrelator;
+import org.apache.calcite.tools.Program;
+import org.apache.calcite.tools.Programs;
+import org.apache.calcite.tools.RuleSet;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ *
+ */
+public class IgnitePrograms {
+ /**
+ * Returns heuristic planer based program with given rules.
+ *
+ * @param rules Rules.
+ * @return New program.
+ */
+ public static Program hep(RuleSet rules) {
+ return (planner, rel, traits, materializations, lattices) -> {
+ final HepProgramBuilder builder = new HepProgramBuilder();
+
+ for (RelOptRule rule : rules)
+ builder.addRuleInstance(rule);
+
+ final HepPlanner hepPlanner = new HepPlanner(builder.build(), Commons.context(rel), true,
+ null, Commons.context(rel).frameworkConfig().getCostFactory());
+
+ for (RelOptMaterialization materialization : materializations)
+ hepPlanner.addMaterialization(materialization);
+
+ for (RelOptLattice lattice : lattices)
+ hepPlanner.addLattice(lattice);
+
+ hepPlanner.setRoot(rel);
+
+ return hepPlanner.findBestExp();
+ };
+ }
+
+ /**
+ * Returns cost based planer based program with given rules.
+ *
+ * @param rules Rules.
+ * @return New program.
+ */
+ public static Program cbo(RuleSet rules) {
+ return Programs.of(rules);
+ }
+
+ /**
+ * @return Query decorrelate program.
+ */
+ public static Program decorrelate() {
+ return (planner, rel, traits, materializations, lattices) -> {
+ if (Commons.context(rel).connectionConfig().forceDecorrelate())
+ return RelDecorrelator.decorrelateQuery(rel, RelFactories.LOGICAL_BUILDER.create(rel.getCluster(), null));
+
+ return rel;
+ };
+ }
+
+ /**
+ * @return A program that executes a sequence of programs.
+ */
+ public static Program sequence(Program... programs) {
+ return Programs.sequence(programs);
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
new file mode 100644
index 0000000..753c511
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.List;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
+
+/**
+ * Regular query or DML
+ */
+public interface MultiStepPlan extends QueryPlan {
+ /**
+ * @return Query fragments.
+ */
+ List<Fragment> fragments();
+
+ /**
+ * @return Row metadata.
+ */
+ List<GridQueryFieldMetadata> fieldsMetadata();
+
+ /**
+ * Inits query fragments.
+ *
+ * @param mappingService Mapping service.
+ * @param ctx Planner context.
+ */
+ void init(MappingService mappingService, PlanningContext ctx);
+
+ /** {@inheritDoc} */
+ @Override default Type type() {
+ return Type.QUERY;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlanImpl.java
similarity index 75%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlanImpl.java
index be17511..282878e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlanImpl.java
@@ -15,17 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException;
import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.util.typedef.F;
@@ -33,31 +34,41 @@ import org.apache.ignite.internal.util.typedef.F;
/**
* Distributed query plan.
*/
-public class QueryPlan {
+public class MultiStepPlanImpl implements MultiStepPlan {
/** */
private final List<Fragment> fragments;
+ /** */
+ private final List<GridQueryFieldMetadata> fieldsMeta;
+
/**
* @param fragments Query fragments.
*/
- public QueryPlan(List<Fragment> fragments) {
- this.fragments = fragments;
+ public MultiStepPlanImpl(List<Fragment> fragments) {
+ this(fragments, ImmutableList.of());
}
/**
- * @return Query fragments.
+ * @param fragments Query fragments.
+ * @param fieldsMeta Fields metadata.
*/
- public List<Fragment> fragments() {
+ public MultiStepPlanImpl(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta) {
+ this.fieldsMeta = fieldsMeta;
+ this.fragments = fragments;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Fragment> fragments() {
return fragments;
}
- /**
- * Inits query fragments.
- *
- * @param mappingService Mapping service.
- * @param ctx Planner context.
- */
- public void init(MappingService mappingService, PlanningContext ctx) {
+ /** {@inheritDoc} */
+ @Override public List<GridQueryFieldMetadata> fieldsMetadata() {
+ return fieldsMeta;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void init(MappingService mappingService, PlanningContext ctx) {
int i = 0;
RelMetadataQueryEx mq = RelMetadataQueryEx.instance();
@@ -103,10 +114,8 @@ public class QueryPlan {
}
}
- /**
- * Clones this plan with a new cluster.
- */
- public QueryPlan clone(RelOptCluster cluster) {
+ /** {@inheritDoc} */
+ @Override public MultiStepPlan clone(RelOptCluster cluster) {
return new Cloner(cluster).go(this);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index 9417af2..975b2d2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -17,26 +17,40 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
+import org.apache.calcite.plan.volcano.AbstractConverter;
import org.apache.calcite.rel.rules.SubQueryRemoveRule;
+import org.apache.calcite.rel.rules.TableScanRule;
+import org.apache.calcite.tools.Program;
import org.apache.calcite.tools.RuleSet;
import org.apache.calcite.tools.RuleSets;
import org.apache.ignite.internal.processors.query.calcite.rule.FilterConverter;
import org.apache.ignite.internal.processors.query.calcite.rule.JoinConverter;
import org.apache.ignite.internal.processors.query.calcite.rule.ProjectConverter;
+import static org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePrograms.cbo;
+import static org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePrograms.decorrelate;
+import static org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePrograms.hep;
+import static org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePrograms.sequence;
+
/**
* Represents a planner phase with its description and a used rule set.
*/
public enum PlannerPhase {
/** */
- SUBQUERY_REWRITE("Sub-queries rewrites") {
+ HEURISTIC_OPTIMIZATION("Heuristic optimization phase") {
/** {@inheritDoc} */
@Override public RuleSet getRules(PlanningContext ctx) {
return RuleSets.ofList(
+ TableScanRule.INSTANCE,
SubQueryRemoveRule.FILTER,
SubQueryRemoveRule.PROJECT,
SubQueryRemoveRule.JOIN);
}
+
+ /** {@inheritDoc} */
+ @Override public Program getProgram(PlanningContext ctx) {
+ return sequence(hep(getRules(ctx)), decorrelate());
+ }
},
/** */
@@ -44,10 +58,16 @@ public enum PlannerPhase {
/** {@inheritDoc} */
@Override public RuleSet getRules(PlanningContext ctx) {
return RuleSets.ofList(
+ AbstractConverter.ExpandConversionRule.INSTANCE,
JoinConverter.INSTANCE,
ProjectConverter.INSTANCE,
FilterConverter.INSTANCE);
}
+
+ /** {@inheritDoc} */
+ @Override public Program getProgram(PlanningContext ctx) {
+ return cbo(getRules(ctx));
+ }
};
/** */
@@ -66,4 +86,11 @@ public enum PlannerPhase {
* @return Rule set.
*/
public abstract RuleSet getRules(PlanningContext ctx);
+
+ /**
+ * Returns a program, calculated on the basis of query, planner context planner phase and rules set.
+ * @param ctx Planner context.
+ * @return Rule set.
+ */
+ public abstract Program getProgram(PlanningContext ctx);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerType.java
deleted file mode 100644
index cddc4d5..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerType.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.prepare;
-
-/**
- *
- */
-public enum PlannerType {
- HEP,
- VOLCANO;
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
index fabe1d6..fdfd779 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
@@ -31,6 +31,7 @@ import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.calcite.exec.QueryCancelGroup;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
/**
@@ -59,6 +60,9 @@ public final class PlanningContext implements Context {
private final AffinityTopologyVersion topologyVersion;
/** */
+ private final QueryCancelGroup cancelGroup;
+
+ /** */
private final IgniteLogger logger;
/** */
@@ -74,10 +78,11 @@ public final class PlanningContext implements Context {
* Private constructor, used by a builder.
*/
private PlanningContext(FrameworkConfig config, Context parentContext, UUID localNodeId, UUID originatingNodeId,
- String query, Object[] parameters, AffinityTopologyVersion topologyVersion, IgniteLogger logger) {
+ String query, Object[] parameters, AffinityTopologyVersion topologyVersion, QueryCancelGroup cancelGroup, IgniteLogger logger) {
this.parentContext = parentContext;
this.localNodeId = localNodeId;
this.parameters = parameters;
+ this.cancelGroup = cancelGroup;
this.originatingNodeId = originatingNodeId == null ? localNodeId : originatingNodeId;
this.query = query;
this.topologyVersion = topologyVersion;
@@ -133,6 +138,13 @@ public final class PlanningContext implements Context {
}
/**
+ * @return Query cancel group.
+ */
+ public QueryCancelGroup cancelGroup() {
+ return cancelGroup;
+ }
+
+ /**
* @return Logger.
*/
public IgniteLogger logger() {
@@ -152,6 +164,13 @@ public final class PlanningContext implements Context {
}
/**
+ * @return Schema name.
+ */
+ public String schemaName() {
+ return schema().getName();
+ }
+
+ /**
* @return Schema.
*/
public SchemaPlus schema() {
@@ -215,21 +234,6 @@ public final class PlanningContext implements Context {
}
/**
- * @return Context builder.
- */
- public static Builder builder(PlanningContext template) {
- return new Builder()
- .logger(template.logger)
- .topologyVersion(template.topologyVersion)
- .query(template.query)
- .parameters(template.parameters)
- .parentContext(template.parentContext)
- .frameworkConfig(template.frameworkConfig)
- .originatingNodeId(template.originatingNodeId)
- .localNodeId(template.localNodeId);
- }
-
- /**
* Planner context builder.
*/
public static class Builder {
@@ -255,6 +259,9 @@ public final class PlanningContext implements Context {
private AffinityTopologyVersion topologyVersion;
/** */
+ private QueryCancelGroup cancelGroup;
+
+ /** */
private IgniteLogger logger;
/**
@@ -321,6 +328,15 @@ public final class PlanningContext implements Context {
}
/**
+ * @param cancelGroup Query cancel group.
+ * @return Builder for chaining.
+ */
+ public Builder cancelGroup(QueryCancelGroup cancelGroup) {
+ this.cancelGroup = cancelGroup;
+ return this;
+ }
+
+ /**
* @param logger Logger.
* @return Builder for chaining.
*/
@@ -336,7 +352,7 @@ public final class PlanningContext implements Context {
*/
public PlanningContext build() {
return new PlanningContext(frameworkConfig, parentContext, localNodeId, originatingNodeId, query,
- parameters, topologyVersion, logger);
+ parameters, topologyVersion, cancelGroup, logger);
}
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
new file mode 100644
index 0000000..cab5a88
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import org.apache.calcite.plan.RelOptCluster;
+
+/**
+ *
+ */
+public interface QueryPlan {
+ /** Query type */
+ enum Type { QUERY, DML, DDL }
+
+ /**
+ * @return Query type.
+ */
+ Type type();
+
+ /**
+ * Clones this plan with a new cluster.
+ */
+ QueryPlan clone(RelOptCluster cluster);
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java
index 0f2586e..fe5f628 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
-import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
+import java.util.List;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
/**
@@ -30,5 +30,5 @@ public interface QueryPlanCache extends Service {
* @param factory Factory method to generate a plan on cache miss.
* @return Query plan.
*/
- QueryPlan queryPlan(PlanningContext ctx, CacheKey key, QueryPlanFactory factory);
+ List<QueryPlan> queryPlan(PlanningContext ctx, CacheKey key, QueryPlanFactory factory);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
index 9605be4..df371e2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
@@ -17,11 +17,11 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
+import java.util.List;
import java.util.Map;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
-import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.schema.SchemaChangeListener;
@@ -39,7 +39,7 @@ public class QueryPlanCacheImpl extends AbstractService implements QueryPlanCach
private GridInternalSubscriptionProcessor subscriptionProcessor;
/** */
- private volatile Map<CacheKey, QueryPlan> cache;
+ private volatile Map<CacheKey, List<QueryPlan>> cache;
/**
* @param ctx Kernal context.
@@ -70,20 +70,23 @@ public class QueryPlanCacheImpl extends AbstractService implements QueryPlanCach
// No-op.
}
- /** {@inheritDoc} */
- @Override public QueryPlan queryPlan(PlanningContext ctx, CacheKey key, QueryPlanFactory factory) {
- Map<CacheKey, QueryPlan> cache = this.cache;
+ /** {@inheritDoc}
+ * @return*/
+ @Override public List<QueryPlan> queryPlan(PlanningContext ctx, CacheKey key, QueryPlanFactory factory) {
+ Map<CacheKey, List<QueryPlan>> cache = this.cache;
- QueryPlan template = cache.get(key);
+ List<QueryPlan> template = cache.get(key);
if (template != null)
- return template.clone(ctx.createCluster());
-
- QueryPlan plan = factory.create(ctx);
+ return Commons.transform(template, t-> t.clone(ctx.createCluster()));
+ else {
+ List<QueryPlan> prepared = factory.create(ctx);
- cache.putIfAbsent(key, plan.clone(Commons.EMPTY_CLUSTER));
+ if (prepared.size() == 1) // do not cache multiline queries.
+ cache.putIfAbsent(key, Commons.transform(prepared, p -> p.clone(Commons.EMPTY_CLUSTER)));
- return plan;
+ return prepared;
+ }
}
/**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanFactory.java
index da9ad4d..f3391d3 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanFactory.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
-import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
+import java.util.List;
/**
*
@@ -27,5 +27,5 @@ public interface QueryPlanFactory {
* @param ctx Planning context.
* @return Query plan.
*/
- QueryPlan create(PlanningContext ctx);
+ List<QueryPlan> create(PlanningContext ctx);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSource.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSource.java
similarity index 92%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSource.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSource.java
index 30574c4..1f1cbf0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSource.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSource.java
@@ -15,12 +15,11 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
/**
*
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSourceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSourceImpl.java
similarity index 95%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSourceImpl.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSourceImpl.java
index 9c8a735..8afbb01 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSourceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSourceImpl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
import java.io.Serializable;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTarget.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTarget.java
similarity index 96%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTarget.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTarget.java
index 385bae3..1dcb6b4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTarget.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTarget.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTargetImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetImpl.java
similarity index 96%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTargetImpl.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetImpl.java
index a8fe214..570b81f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTargetImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetImpl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
import java.io.Serializable;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RowMetadata.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RowMetadata.java
new file mode 100644
index 0000000..824d815
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RowMetadata.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.List;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ *
+ */
+public class RowMetadata {
+ /** */
+ private final List<GridQueryFieldMetadata> fieldsMeta;
+
+ /** */
+ public RowMetadata(@NotNull List<GridQueryFieldMetadata> fieldsMeta) {
+ this.fieldsMeta = fieldsMeta;
+ }
+
+ /**
+ * @return Query metadata.
+ */
+ public List<GridQueryFieldMetadata> fieldsMetadata() {
+ return fieldsMeta;
+ }
+ /**
+ * Gets number of columns in a row.
+ *
+ * @return row size.
+ */
+ public int fieldsCount() {
+ return fieldsMeta.size();
+ }
+
+ /**
+ * Gets field name.
+ *
+ * @param idx field index.
+ * @return Field name.
+ */
+ public String fieldName(int idx){
+ return fieldsMeta.get(idx).fieldName();
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
similarity index 91%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
index adda757..74d093a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
import java.util.ArrayList;
import java.util.Collections;
@@ -37,19 +37,24 @@ import org.apache.ignite.internal.processors.query.calcite.rel.RelOp;
/**
* Splits a query into a list of query fragments.
*/
-public class Splitter implements IgniteRelVisitor<IgniteRel>, RelOp<IgniteRel, QueryPlan> {
+public class Splitter implements IgniteRelVisitor<IgniteRel>, RelOp<IgniteRel, List<Fragment>> {
/** */
private List<Fragment> fragments;
/** {@inheritDoc} */
- @Override public QueryPlan go(IgniteRel root) {
+ @Override public List<Fragment> go(IgniteRel root) {
fragments = new ArrayList<>();
- fragments.add(new Fragment(visit(root)));
+ try {
+ fragments.add(new Fragment(visit(root)));
- Collections.reverse(fragments);
+ Collections.reverse(fragments);
- return new QueryPlan(fragments);
+ return fragments;
+ }
+ finally {
+ fragments = null;
+ }
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ValidationResult.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ValidationResult.java
new file mode 100644
index 0000000..bb7d8e7
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ValidationResult.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.List;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlNode;
+
+/**
+ *
+ */
+public class ValidationResult {
+ /** */
+ private final SqlNode sqlNode;
+
+ /** */
+ private final RelDataType dataType;
+
+ /** */
+ private final List<List<String>> origins;
+
+ /**
+ *
+ * @param sqlNode Validated SQL node.
+ * @param dataType Validated type.
+ * @param origins Type fields provenance.
+ */
+ public ValidationResult(SqlNode sqlNode, RelDataType dataType, List<List<String>> origins) {
+ this.sqlNode = sqlNode;
+ this.dataType = dataType;
+ this.origins = origins;
+ }
+
+ /**
+ * @return Validated SQL node.
+ */
+ public SqlNode sqlNode() {
+ return sqlNode;
+ }
+
+ /**
+ * @return Validated type.
+ */
+ public RelDataType dataType() {
+ return dataType;
+ }
+
+ /**
+ * @return Type fields provenance.
+ */
+ public List<List<String>> origins() {
+ return origins;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteConvention.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteConvention.java
index 6f66ab8..34c4e5e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteConvention.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteConvention.java
@@ -19,9 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite.rel;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.plan.volcano.AbstractConverter;
/**
* Ignite convention trait.
@@ -40,12 +38,6 @@ public class IgniteConvention extends Convention.Impl {
}
/** {@inheritDoc} */
- @Override public void register(RelOptPlanner planner) {
- // This convention relies on AbstractConverter logic, so, ExpandConversionRule need to be registered.
- planner.addRule(AbstractConverter.ExpandConversionRule.INSTANCE);
- }
-
- /** {@inheritDoc} */
@Override public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
// use converters for physical nodes only.
return toTraits.contains(INSTANCE) && fromTraits.contains(INSTANCE);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
index 5289c90..78083a1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
@@ -25,7 +25,7 @@ import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.processors.query.calcite.splitter.RelSource;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelSource;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
index 4a4e4b5..e89bece 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
@@ -22,7 +22,7 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
-import org.apache.ignite.internal.processors.query.calcite.splitter.RelTarget;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelTarget;
/**
* Relational expression that iterates over its input
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
index 83f2453..3247458 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
@@ -125,6 +125,7 @@ public class SchemaHolderImpl extends AbstractService implements SchemaHolder, S
/** */
private void rebuild() {
SchemaPlus schema = Frameworks.createRootSchema(false);
+ schema.add("PUBLIC", new IgniteSchema("PUBLIC"));
schemas.forEach(schema::add);
schema(schema);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
index 31c607f..ed43db0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
@@ -18,14 +18,16 @@
package org.apache.ignite.internal.processors.query.calcite.schema;
import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
+import java.util.Objects;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
@@ -47,6 +49,9 @@ public class TableDescriptorImpl implements TableDescriptor {
private final Object affinityIdentity;
/** */
+ private final ColumnDescriptor[] descriptors;
+
+ /** */
private final int affinityFieldIdx;
/** */
@@ -55,7 +60,31 @@ public class TableDescriptorImpl implements TableDescriptor {
this.typeDesc = typeDesc;
this.affinityIdentity = affinityIdentity;
- affinityFieldIdx = lookupAffinityIndex(typeDesc);
+ String affinityField = typeDesc.affinityKey(); int affinityFieldIdx = 0;
+
+ List<ColumnDescriptor> descriptors = new ArrayList<>();
+
+ descriptors.add(new KeyValDescriptor(QueryUtils.KEY_FIELD_NAME, typeDesc.keyClass(), true, true));
+ descriptors.add(new KeyValDescriptor(QueryUtils.VAL_FIELD_NAME, typeDesc.keyClass(), true, false));
+
+ for (String field : this.typeDesc.fields().keySet()) {
+ if (Objects.equals(affinityField, field)) {
+ assert affinityFieldIdx == 0;
+
+ affinityFieldIdx = descriptors.size();
+ }
+
+ if (Objects.equals(field, typeDesc.keyFieldAlias()))
+ descriptors.add(new KeyValDescriptor(typeDesc.keyFieldAlias(), typeDesc.keyClass(), false, true));
+ else if (Objects.equals(field, typeDesc.valueFieldAlias()))
+ descriptors.add(new KeyValDescriptor(typeDesc.valueFieldAlias(), typeDesc.valueClass(), false, false));
+ else
+ descriptors.add(new FieldDescriptor(typeDesc.property(field)));
+ }
+
+ this.descriptors = descriptors.toArray(new ColumnDescriptor[0]);
+
+ this.affinityFieldIdx = affinityFieldIdx;
}
/** {@inheritDoc} */
@@ -64,11 +93,8 @@ public class TableDescriptorImpl implements TableDescriptor {
RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(f);
- b.add(QueryUtils.KEY_FIELD_NAME, f.createSystemType(typeDesc.keyClass()));
- b.add(QueryUtils.VAL_FIELD_NAME, f.createSystemType(typeDesc.valueClass()));
-
- for (Map.Entry<String, Class<?>> field : typeDesc.fields().entrySet())
- b.add(field.getKey(), f.createJavaType(field.getValue()));
+ for (ColumnDescriptor desc : descriptors)
+ b.add(desc.name(), desc.type(f));
return b.build();
}
@@ -98,34 +124,87 @@ public class TableDescriptorImpl implements TableDescriptor {
/** {@inheritDoc} */
@Override public <T> T toRow(ExecutionContext ectx, CacheDataRow row) throws IgniteCheckedException {
- Object[] res = new Object[typeDesc.fields().size() + 2];
+ Object[] res = new Object[descriptors.length];
- int i = 0;
+ for (int i = 0; i < descriptors.length; i++)
+ res[i] = descriptors[i].value(ectx, cctx, row);
- res[i++] = cctx.unwrapBinaryIfNeeded(row.key(), ectx.keepBinary());
- res[i++] = cctx.unwrapBinaryIfNeeded(row.value(), ectx.keepBinary());
+ return (T) res;
+ }
- for (String field : typeDesc.fields().keySet())
- res[i++] = cctx.unwrapBinaryIfNeeded(typeDesc.value(field, row.key(), row.value()), ectx.keepBinary());
+ /** */
+ private interface ColumnDescriptor {
+ /** */
+ String name();
- return (T) res;
+ /** */
+ RelDataType type(IgniteTypeFactory f);
+
+ /** */
+ Object value(ExecutionContext ectx, GridCacheContext<?,?> cctx, CacheDataRow src) throws IgniteCheckedException;
}
/** */
- private static int lookupAffinityIndex(GridQueryTypeDescriptor queryTypeDesc) {
- if (queryTypeDesc.affinityKey() != null) {
- int idx = 2;
+ private static class KeyValDescriptor implements ColumnDescriptor {
+ /** */
+ private final String name;
- String affField = queryTypeDesc.affinityKey();
+ /** */
+ private final Class<?> type;
- for (String s : queryTypeDesc.fields().keySet()) {
- if (affField.equals(s))
- return idx;
+ /** */
+ private final boolean isSystem;
- idx++;
- }
+ /** */
+ private final boolean isKey;
+
+ /** */
+ private KeyValDescriptor(String name, Class<?> type, boolean isSystem, boolean isKey) {
+ this.name = name;
+ this.type = type;
+ this.isSystem = isSystem;
+ this.isKey = isKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelDataType type(IgniteTypeFactory f) {
+ return isSystem ? f.createSystemType(type) : f.createJavaType(type);
}
- return 0;
+ /** {@inheritDoc} */
+ @Override public Object value(ExecutionContext ectx, GridCacheContext<?,?> cctx, CacheDataRow src) throws IgniteCheckedException {
+ return cctx.unwrapBinaryIfNeeded(isKey ? src.key() : src.value(), ectx.keepBinary());
+ }
+ }
+
+ /** */
+ private static class FieldDescriptor implements ColumnDescriptor {
+ /** */
+ private final GridQueryProperty desc;
+
+ /** */
+ private FieldDescriptor(GridQueryProperty desc) {
+ this.desc = desc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return desc.name();
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelDataType type(IgniteTypeFactory f) {
+ return f.createJavaType(desc.type());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object value(ExecutionContext ectx, GridCacheContext<?,?> cctx, CacheDataRow src) throws IgniteCheckedException {
+ return cctx.unwrapBinaryIfNeeded(desc.value(src.key(), src.value()), ectx.keepBinary());
+ }
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
index 586ef09..24ff49c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
@@ -19,12 +19,12 @@ package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
import java.util.List;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelSource;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelSourceImpl;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.serialize.type.DataType;
import org.apache.ignite.internal.processors.query.calcite.serialize.type.Types;
-import org.apache.ignite.internal.processors.query.calcite.splitter.RelSource;
-import org.apache.ignite.internal.processors.query.calcite.splitter.RelSourceImpl;
/**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
index b81b555..e8c161f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
@@ -21,9 +21,9 @@ import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelTarget;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.splitter.RelTarget;
import org.apache.ignite.internal.util.typedef.F;
/**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index a586a46..ba363a6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -30,6 +30,7 @@ import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rex.RexBuilder;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.QueryContext;
@@ -165,7 +166,23 @@ public final class Commons {
/**
* @param o Object to close.
*/
- public static void close(Object o) {
+ public static void close(Object o) throws Exception {
+ if (o instanceof AutoCloseable)
+ ((AutoCloseable) o).close();
+ }
+
+ /**
+ * @param o Object to close.
+ */
+ public static void close(Object o, IgniteLogger log) {
+ if (o instanceof AutoCloseable)
+ U.close((AutoCloseable) o, log);
+ }
+
+ /**
+ * @param o Object to close.
+ */
+ public static void closeQuiet(Object o) {
if (o instanceof AutoCloseable)
U.closeQuiet((AutoCloseable) o);
}
@@ -174,7 +191,7 @@ public final class Commons {
* @param o Object to close.
* @param e Exception, what causes close.
*/
- public static void close(Object o, @Nullable Exception e) {
+ public static void closeQuiet(Object o, @Nullable Exception e) {
if (!(o instanceof AutoCloseable))
return;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
new file mode 100644
index 0000000..10331ea
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.util;
+
+import java.util.Iterator;
+import java.util.function.Function;
+
+/**
+ *
+ */
+class ConvertingClosableIterator<T, R> implements Iterator<R>, AutoCloseable {
+ /** */
+ private final Iterator<T> it;
+
+ /** */
+ private final Function<T, R> converter;
+
+ /** */
+ public ConvertingClosableIterator(Iterator<T> it, Function<T, R> converter) {
+ this.it = it;
+ this.converter = converter;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public R next() {
+ return converter.apply(it.next());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public void close() throws Exception {
+ if (it instanceof AutoCloseable)
+ ((AutoCloseable) it).close();
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
index c2a11f3..c71f6a0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetada
*/
public enum IgniteMethod {
DERIVED_DISTRIBUTIONS(DerivedDistribution.class, "deriveDistributions"),
- FRAGMENT_INFO(FragmentMetadata.class, "getFragmentInfo");
+ FRAGMENT_INFO(FragmentMetadata.class, "fragmentInfo");
/** */
private final Method method;
@@ -37,7 +37,7 @@ public enum IgniteMethod {
* @param methodName Method name.
* @param argumentTypes Method parameters types.
*/
- IgniteMethod(Class clazz, String methodName, Class... argumentTypes) {
+ IgniteMethod(Class<?> clazz, String methodName, Class<?>... argumentTypes) {
method = Types.lookupMethod(clazz, methodName, argumentTypes);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java
index 1b952f9..ba921c4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java
@@ -17,57 +17,61 @@
package org.apache.ignite.internal.processors.query.calcite.util;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.Spliterators;
import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.NotNull;
/**
*
*/
-public class ListFieldsQueryCursor<T> implements FieldsQueryCursor<List<?>> {
+public class ListFieldsQueryCursor<T> implements FieldsQueryCursor<List<?>>, QueryCursorEx<List<?>> {
/** */
- private final RelDataType rowType;
+ private final Iterator<List<?>> it;
/** */
- private final Iterator<T> it;
-
- /** */
- private final Function<T, List<?>> converter;
+ private final List<GridQueryFieldMetadata> fieldsMeta;
/**
- * @param rowType Row data type description.
* @param it Iterator.
* @param converter Row converter.
+ * @param fieldsMeta Fields metadata.
*/
- public ListFieldsQueryCursor(RelDataType rowType, Iterator<T> it, Function<T, List<?>> converter) {
- this.rowType = rowType;
- this.it = it;
- this.converter = converter;
+ public ListFieldsQueryCursor(Iterator<T> it, Function<T, List<?>> converter, List<GridQueryFieldMetadata> fieldsMeta) {
+ this.it = new ConvertingClosableIterator<>(it, converter);
+ this.fieldsMeta = fieldsMeta;
}
/** {@inheritDoc} */
- @Override public String getFieldName(int idx) {
- return rowType.getFieldList().get(idx).getName();
+ @NotNull @Override public Iterator<List<?>> iterator() {
+ return it;
}
/** {@inheritDoc} */
- @Override public int getColumnsCount() {
- return rowType.getFieldCount();
+ @Override public List<List<?>> getAll() {
+ ArrayList<List<?>> res = new ArrayList<>();
+
+ try {
+ getAll(res::add);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+
+ return res;
}
/** {@inheritDoc} */
- @Override public List<List<?>> getAll() {
+ @Override public void getAll(Consumer<List<?>> c) throws IgniteCheckedException {
try {
- return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), false)
- .map(converter)
- .collect(Collectors.toList());
+ while (it.hasNext())
+ c.consume(it.next());
}
finally {
close();
@@ -75,12 +79,27 @@ public class ListFieldsQueryCursor<T> implements FieldsQueryCursor<List<?>> {
}
/** {@inheritDoc} */
- @Override public void close() {
- Commons.close(it);
+ @Override public List<GridQueryFieldMetadata> fieldsMeta() {
+ return fieldsMeta;
}
/** {@inheritDoc} */
- @NotNull @Override public Iterator<List<?>> iterator() {
- return F.iterator(it, converter::apply, true);
+ @Override public String getFieldName(int idx) {
+ return fieldsMeta.get(idx).fieldName();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getColumnsCount() {
+ return fieldsMeta.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isQuery() {
+ return true; // TODO pass and check QueryPlan.Type
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ Commons.closeQuiet(it);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScan.java
index 479a8ef..118b5c0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScan.java
@@ -182,7 +182,7 @@ public class TableScan implements Iterable<Object[]> {
return this;
}
catch (Exception e) {
- Commons.close(this, e);
+ Commons.closeQuiet(this, e);
throw e;
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index eb09e95..4f8cf09 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -81,13 +81,17 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
- List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC", "select * from DEVELOPER d, PROJECT p where d.projectId = p._key and d._key = ?", 1);
+ List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC",
+ "" +
+ "select * from DEVELOPER d, PROJECT p where d.projectId = p._key and d._key = ?;" +
+ "select * from DEVELOPER d, PROJECT p where d.projectId = p._key and d._key = ?"
+ , 0,1);
- List<List<?>> all = F.first(query).getAll();
- assertTrue(!F.isEmpty(all));
- assertEquals(1, all.size());
- assertEqualsCollections(Arrays.asList("Roman", 0, "Ignite"), F.first(all));
+ assertEquals(2, query.size());
+
+ assertEqualsCollections(Arrays.asList("Igor", 1, "Calcite"), F.first(query.get(0).getAll()));
+ assertEqualsCollections(Arrays.asList("Roman", 0, "Ignite"), F.first(query.get(1).getAll()));
}
/** */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
index bca71ad..0857e7f5 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
@@ -60,10 +60,13 @@ import org.apache.ignite.internal.processors.query.calcite.message.CalciteMessag
import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.message.TestIoManager;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlanImpl;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerType;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
@@ -72,9 +75,6 @@ import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
import org.apache.ignite.internal.processors.query.calcite.schema.SortedTable;
import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelGraph;
import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelToGraphConverter;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Fragment;
-import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
@@ -480,7 +480,7 @@ public class PlannerTest extends GridCommonAbstractTest {
RelNode rel = relRoot.rel;
// Transformation chain
- rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
}
@@ -571,13 +571,15 @@ public class PlannerTest extends GridCommonAbstractTest {
RelNode rel = relRoot.rel;
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
// Transformation chain
RelTraitSet desired = rel.getCluster().traitSet()
.replace(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single())
.simplify();
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
+ rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
}
@@ -687,18 +689,18 @@ public class PlannerTest extends GridCommonAbstractTest {
RelNode rel = planner.convert(sqlNode);
// Transformation chain
- rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
RelTraitSet desired = rel.getCluster().traitSet()
.replace(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single())
.simplify();
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
+ rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
assertNotNull(rel);
- QueryPlan plan = new Splitter().go((IgniteRel) rel);
+ MultiStepPlan plan = new MultiStepPlanImpl(new Splitter().go((IgniteRel) rel));
assertNotNull(plan);
@@ -832,21 +834,21 @@ public class PlannerTest extends GridCommonAbstractTest {
RelNode rel = relRoot.rel;
// Transformation chain
- rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
RelTraitSet desired = rel.getCluster().traitSet()
.replace(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single())
.simplify();
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
+ rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
}
assertNotNull(relRoot);
- QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+ MultiStepPlan plan = new MultiStepPlanImpl(new Splitter().go((IgniteRel) relRoot.rel));
assertNotNull(plan);
@@ -963,17 +965,17 @@ public class PlannerTest extends GridCommonAbstractTest {
RelNode rel = relRoot.rel;
// Transformation chain
- rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
RelTraitSet desired = rel.getCluster()
.traitSetOf(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single());
- RelNode phys = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
+ RelNode phys = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
assertNotNull(phys);
- QueryPlan plan = new Splitter().go((IgniteRel) phys);
+ MultiStepPlan plan = new MultiStepPlanImpl(new Splitter().go((IgniteRel) phys));
assertNotNull(plan);
@@ -1198,21 +1200,21 @@ public class PlannerTest extends GridCommonAbstractTest {
RelNode rel = relRoot.rel;
// Transformation chain
- rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
RelTraitSet desired = rel.getCluster().traitSet()
.replace(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single())
.simplify();
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
+ rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
}
assertNotNull(relRoot);
- QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+ MultiStepPlan plan = new MultiStepPlanImpl(new Splitter().go((IgniteRel) relRoot.rel));
assertNotNull(plan);
@@ -1320,21 +1322,21 @@ public class PlannerTest extends GridCommonAbstractTest {
RelNode rel = relRoot.rel;
// Transformation chain
- rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
RelTraitSet desired = rel.getCluster().traitSet()
.replace(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single())
.simplify();
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
+ rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
}
assertNotNull(relRoot);
- QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+ MultiStepPlan plan = new MultiStepPlanImpl(new Splitter().go((IgniteRel) relRoot.rel));
assertNotNull(plan);
@@ -1441,21 +1443,21 @@ public class PlannerTest extends GridCommonAbstractTest {
RelNode rel = relRoot.rel;
// Transformation chain
- rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
RelTraitSet desired = rel.getCluster().traitSet()
.replace(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single())
.simplify();
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
+ rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
}
assertNotNull(relRoot);
- QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+ MultiStepPlan plan = new MultiStepPlanImpl(new Splitter().go((IgniteRel) relRoot.rel));
assertNotNull(plan);
@@ -1562,21 +1564,21 @@ public class PlannerTest extends GridCommonAbstractTest {
RelNode rel = relRoot.rel;
// Transformation chain
- rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
RelTraitSet desired = rel.getCluster().traitSet()
.replace(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single())
.simplify();
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
+ rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
}
assertNotNull(relRoot);
- QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+ MultiStepPlan plan = new MultiStepPlanImpl(new Splitter().go((IgniteRel) relRoot.rel));
assertNotNull(plan);
@@ -1680,21 +1682,21 @@ public class PlannerTest extends GridCommonAbstractTest {
RelNode rel = relRoot.rel;
// Transformation chain
- rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
RelTraitSet desired = rel.getCluster().traitSet()
.replace(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single())
.simplify();
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
+ rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
}
assertNotNull(relRoot);
- QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+ MultiStepPlan plan = new MultiStepPlanImpl(new Splitter().go((IgniteRel) relRoot.rel));
assertNotNull(plan);
@@ -1785,13 +1787,13 @@ public class PlannerTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected boolean prepareMarshal(Message msg) {
- return true;
+ @Override protected void prepareMarshal(Message msg) {
+ // No-op;
}
/** {@inheritDoc} */
- @Override protected boolean prepareUnmarshal(Message msg) {
- return true;
+ @Override protected void prepareUnmarshal(Message msg) {
+ // No-op;
}
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractExecutionTest.java
index bb430954..96cf859 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractExecutionTest.java
@@ -163,13 +163,13 @@ public class AbstractExecutionTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected boolean prepareMarshal(Message msg) {
- return true;
+ @Override protected void prepareMarshal(Message msg) {
+ // No-op;
}
/** {@inheritDoc} */
- @Override protected boolean prepareUnmarshal(Message msg) {
- return true;
+ @Override protected void prepareUnmarshal(Message msg) {
+ // No-op;
}
}
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolderTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolderTest.java
new file mode 100644
index 0000000..7e99f4b
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolderTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ClosableIteratorsHolderTest extends GridCommonAbstractTest {
+ /** */
+ private static final int GENERATED = 10000;
+
+ /** */
+ private Set<Iterator<?>> iterators;
+
+ /** */
+ private ClosableIteratorsHolder holder;
+
+ @Before
+ public void setup() throws Exception {
+ iterators = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ holder = new ClosableIteratorsHolder(log());
+ holder.init();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ holder.tearDown();
+
+ holder = null;
+ iterators = null;
+ }
+
+ @Test
+ public void iterator() throws Exception {
+ for (int i = 0; i < GENERATED; i++)
+ holder.iterator(newIterator());
+
+ System.gc();
+
+ assertTrue(iterators.size() < GENERATED);
+ }
+
+ /** */
+ private Iterator<?> newIterator() {
+ final ClosableIterator iterator = new ClosableIterator();
+ iterators.add(iterator);
+ return iterator;
+ }
+
+ /** */
+ private class ClosableIterator implements Iterator<Object>, AutoCloseable {
+ /** */
+ public final byte[] data;
+
+ private ClosableIterator() {
+ data = new byte[4096];
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object next() {
+ throw new NoSuchElementException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ Optional.ofNullable(iterators)
+ .ifPresent(set -> set.remove(this));
+ }
+ }
+}
\ No newline at end of file
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/OutboxTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/OutboxTest.java
index 96cd789..aa5dbe6 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/OutboxTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/OutboxTest.java
@@ -129,19 +129,19 @@ public class OutboxTest extends GridCommonAbstractTest {
private List<?> lastBatch;
/** {@inheritDoc} */
- @Override public void sendBatch(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List<?> rows) {
+ @Override public void sendBatch(Outbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List<?> rows) {
ids.add(batchId);
lastBatch = rows;
}
/** {@inheritDoc} */
- @Override public void acknowledge(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
+ @Override public void acknowledge(Inbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
throw new AssertionError();
}
/** {@inheritDoc} */
- @Override public void cancel(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
+ @Override public void cancel(Outbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
throw new AssertionError();
}
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/jdbc/JdbcQueryTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/jdbc/JdbcQueryTest.java
new file mode 100644
index 0000000..1540e14
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/jdbc/JdbcQueryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class JdbcQueryTest extends GridCommonAbstractTest {
+ /** URL. */
+ private final String url = "jdbc:ignite:thin://127.0.0.1?useExperimentalQueryEngine=true";
+
+ /** Nodes count. */
+ private final int nodesCnt = 3;
+
+ /** Connection. */
+ private Connection conn;
+
+ /** Statement. */
+ private Statement stmt;
+
+ /**
+ * @throws SQLException If failed.
+ */
+ @Test
+ public void testSimpleQuery() throws SQLException {
+ stmt.execute("CREATE TABLE Person(\"id\" INT, PRIMARY KEY(\"id\"), \"name\" VARCHAR)");
+
+ doSleep(1000);
+
+ stmt.executeUpdate("INSERT INTO Person VALUES (10, 'Name')");
+ try (ResultSet rs = stmt.executeQuery("select p.*, (1+1) as synthetic from Person p")) {
+ assertTrue(rs.next());
+ assertEquals(10, rs.getInt(1));
+ assertEquals("Name", rs.getString(2));
+ assertEquals(2, rs.getInt(3));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids(nodesCnt);
+ conn = DriverManager.getConnection(url);
+ conn.setSchema("PUBLIC");
+ stmt = conn.createStatement();
+
+ assert stmt != null;
+ assert !stmt.isClosed();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ if (stmt != null && !stmt.isClosed()) {
+ stmt.close();
+
+ assert stmt.isClosed();
+ }
+
+ conn.close();
+
+ assert stmt.isClosed();
+ assert conn.isClosed();
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index ebcae81..705cf05 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.query.calcite.PlannerTest;
import org.apache.ignite.internal.processors.query.calcite.exec.ContinuousExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.OutboxTest;
+import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcQueryTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -34,7 +35,8 @@ import org.junit.runners.Suite;
OutboxTest.class,
ExecutionTest.class,
ContinuousExecutionTest.class,
- CalciteQueryProcessorTest.class
+ CalciteQueryProcessorTest.class,
+ JdbcQueryTest.class
})
public class IgniteCalciteTestSuite {
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
index 31d9519..442cda6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
@@ -503,4 +503,16 @@ public interface ConnectionProperties {
* @param connTimeout Connection timeout in milliseconds.
*/
public void setConnectionTimeout(@Nullable Integer connTimeout) throws SQLException;
+
+ /**
+ * @return {@code True} if experimental query engine is enabled for a connection.
+ */
+ public boolean isUseExperimentalQueryEngine();
+
+ /**
+ * Sets use experimental query engine flag.
+ *
+ * @param useExperimentalQueryEngine {@code True} if experimental query engine is enabled for a connection.
+ */
+ public void setUseExperimentalQueryEngine(boolean useExperimentalQueryEngine);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
index 7b14aec..db5d4b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
@@ -229,6 +229,10 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
" Zero means there is no limits.",
0, false, 0, Integer.MAX_VALUE);
+ /** Whether an experimental SQL engine enabled for a connection. */
+ private BooleanProperty useExperimentalQueryEngine = new BooleanProperty("useExperimentalQueryEngine",
+ "Enables experimental query engine.", false, false);
+
/** Properties array. */
private final ConnectionProperty [] propsArray = {
distributedJoins, enforceJoinOrder, collocated, replicatedOnly, autoCloseServerCursor,
@@ -244,7 +248,8 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
partitionAwarenessSQLCacheSize,
partitionAwarenessPartDistributionsCacheSize,
qryTimeout,
- connTimeout
+ connTimeout,
+ useExperimentalQueryEngine
};
/** {@inheritDoc} */
@@ -623,6 +628,16 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
connTimeout.setValue(timeout);
}
+ /** {@inheritDoc} */
+ @Override public boolean isUseExperimentalQueryEngine() {
+ return useExperimentalQueryEngine.value();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setUseExperimentalQueryEngine(boolean useExperimentalQueryEngine) {
+ this.useExperimentalQueryEngine.setValue(useExperimentalQueryEngine);
+ }
+
/**
* @param url URL connection.
* @param props Environment properties.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
index 8711182..680d7a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
@@ -82,8 +82,11 @@ public class JdbcThinTcpIo {
/** Version 2.8.0. */
private static final ClientListenerProtocolVersion VER_2_8_0 = ClientListenerProtocolVersion.create(2, 8, 0);
+ /** Version 2.9.0. */
+ private static final ClientListenerProtocolVersion VER_2_9_0 = ClientListenerProtocolVersion.create(2, 9, 0);
+
/** Current version. */
- private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_8_0;
+ private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_9_0;
/** Initial output stream capacity for handshake. */
private static final int HANDSHAKE_MSG_SIZE = 13;
@@ -254,6 +257,9 @@ public class JdbcThinTcpIo {
JdbcUtils.writeNullableInteger(writer, connProps.getUpdateBatchSize());
}
+ if (ver.compareTo(VER_2_9_0) >= 0)
+ writer.writeBoolean(connProps.isUseExperimentalQueryEngine());
+
if (!F.isEmpty(connProps.getUsername())) {
assert ver.compareTo(VER_2_5_0) >= 0 : "Authentication is supported since 2.5";
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
index 02cb107..b5908cb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
@@ -186,7 +186,7 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T>, FieldsQueryCursor<T
* @return {@code true} if this cursor corresponds to a {@link ResultSet} as a result of query,
* {@code false} if query was modifying operation like INSERT, UPDATE, or DELETE.
*/
- public boolean isQuery() {
+ @Override public boolean isQuery() {
return isQry;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
index 828c650..bda2f8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.query;
+import java.sql.ResultSet;
import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.query.QueryCursor;
@@ -39,6 +40,12 @@ public interface QueryCursorEx<T> extends QueryCursor<T> {
public List<GridQueryFieldMetadata> fieldsMeta();
/**
+ * @return {@code true} if this cursor corresponds to a {@link ResultSet} as a result of query,
+ * {@code false} if query was modifying operation like INSERT, UPDATE, or DELETE.
+ */
+ public boolean isQuery();
+
+ /**
* Query value consumer.
*/
public static interface Consumer<T> {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
index 7ce21ba..db8fb4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
@@ -64,8 +64,11 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte
/** Version 2.8.0: adds query id in order to implement cancel feature, partition awareness support: IEP-23.*/
static final ClientListenerProtocolVersion VER_2_8_0 = ClientListenerProtocolVersion.create(2, 8, 0);
+ /** Version 2.8.0: adds experimental query engine support */
+ static final ClientListenerProtocolVersion VER_2_9_0 = ClientListenerProtocolVersion.create(2, 9, 0);
+
/** Current version. */
- public static final ClientListenerProtocolVersion CURRENT_VER = VER_2_8_0;
+ public static final ClientListenerProtocolVersion CURRENT_VER = VER_2_9_0;
/** Supported versions. */
private static final Set<ClientListenerProtocolVersion> SUPPORTED_VERS = new HashSet<>();
@@ -144,6 +147,7 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte
boolean lazyExec = false;
boolean skipReducerOnUpdate = false;
+ boolean useExperimentalQueryEngine = false;
NestedTxMode nestedTxMode = NestedTxMode.DEFAULT;
AuthorizationContext actx = null;
@@ -176,6 +180,9 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte
updateBatchSize = JdbcUtils.readNullableInteger(reader);
}
+ if (ver.compareTo(VER_2_9_0) >= 0)
+ useExperimentalQueryEngine = reader.readBoolean();
+
if (ver.compareTo(VER_2_5_0) >= 0) {
String user = null;
String passwd = null;
@@ -209,7 +216,7 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte
};
handler = new JdbcRequestHandler(busyLock, sender, maxCursors, distributedJoins, enforceJoinOrder,
- collocated, replicatedOnly, autoCloseCursors, lazyExec, skipReducerOnUpdate, nestedTxMode,
+ collocated, replicatedOnly, autoCloseCursors, lazyExec, skipReducerOnUpdate, useExperimentalQueryEngine, nestedTxMode,
dataPageScanEnabled, updateBatchSize, actx, ver, this);
handler.start();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java
index cd566ed..ef6c8ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java
@@ -21,7 +21,7 @@ import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
/**
@@ -38,10 +38,10 @@ class JdbcQueryCursor extends JdbcCursor {
private long fetched;
/** Query result rows. */
- private final QueryCursorImpl<List<Object>> cur;
+ private final QueryCursorEx<List<?>> cur;
/** Query results iterator. */
- private Iterator<List<Object>> iter;
+ private Iterator<List<?>> iter;
/**
* @param pageSize Fetch size.
@@ -49,7 +49,7 @@ class JdbcQueryCursor extends JdbcCursor {
* @param cur Query cursor.
* @param reqId Id of the request that created given cursor.
*/
- JdbcQueryCursor(int pageSize, int maxRows, QueryCursorImpl<List<Object>> cur, long reqId) {
+ JdbcQueryCursor(int pageSize, int maxRows, QueryCursorEx<List<?>> cur, long reqId) {
super(reqId);
this.pageSize = pageSize;
@@ -73,7 +73,7 @@ class JdbcQueryCursor extends JdbcCursor {
List<List<Object>> items = new ArrayList<>(fetchSize);
for (int i = 0; i < fetchSize && iter.hasNext(); i++) {
- items.add(iter.next());
+ items.add((List<Object>)iter.next());
fetched++;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index 1510210..fea59b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -38,6 +38,7 @@ import org.apache.ignite.cache.query.BulkLoadContextCursor;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteVersionUtils;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
@@ -52,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
@@ -61,6 +63,8 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerResponseSender;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.NestedTxMode;
+import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.QueryEngine;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.SqlClientContext;
import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
@@ -161,6 +165,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
/** Register that keeps non-cancelled requests. */
private Map<Long, JdbcQueryDescriptor> reqRegister = new HashMap<>();
+ /** Experimental query engine. */
+ private QueryEngine experimentalQueryEngine;
+
/**
* Constructor.
* @param busyLock Shutdown latch.
@@ -173,6 +180,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
* @param autoCloseCursors Flag to automatically close server cursors.
* @param lazy Lazy query execution flag.
* @param skipReducerOnUpdate Skip reducer on update flag.
+ * @param useExperimentalQueryEngine Enable experimental query engine.
* @param dataPageScanEnabled Enable scan data page mode.
* @param updateBatchSize Size of internal batch for DML queries.
* @param actx Authentication context.
@@ -190,6 +198,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
boolean autoCloseCursors,
boolean lazy,
boolean skipReducerOnUpdate,
+ boolean useExperimentalQueryEngine,
NestedTxMode nestedTxMode,
@Nullable Boolean dataPageScanEnabled,
@Nullable Integer updateBatchSize,
@@ -228,6 +237,17 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
this.protocolVer = protocolVer;
this.actx = actx;
+ if (useExperimentalQueryEngine) {
+ for (GridComponent cmp : connCtx.kernalContext().components()) {
+ if (!(cmp instanceof QueryEngine))
+ continue;
+
+ experimentalQueryEngine = (QueryEngine) cmp;
+
+ break;
+ }
+ }
+
log = connCtx.kernalContext().log(getClass());
// TODO IGNITE-9484 Do not create worker if there is a possibility to unbind TX from threads.
@@ -612,8 +632,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
qry.setSchema(schemaName);
- List<FieldsQueryCursor<List<?>>> results = connCtx.kernalContext().query().querySqlFields(null, qry,
- cliCtx, true, protocolVer.compareTo(VER_2_3_0) < 0, cancel);
+ List<FieldsQueryCursor<List<?>>> results = querySqlFields(qry, cancel);
FieldsQueryCursor<List<?>> fieldsCur = results.get(0);
@@ -633,7 +652,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
if (results.size() == 1) {
JdbcQueryCursor cur = new JdbcQueryCursor(req.pageSize(), req.maxRows(),
- (QueryCursorImpl)fieldsCur, req.requestId());
+ (QueryCursorEx<List<?>>)fieldsCur, req.requestId());
jdbcCursors.put(cur.cursorId(), cur);
@@ -641,7 +660,10 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
JdbcQueryExecuteResult res;
- PartitionResult partRes = ((QueryCursorImpl<List<?>>)fieldsCur).partitionResult();
+ PartitionResult partRes = null;
+
+ if (fieldsCur instanceof QueryCursorImpl)
+ partRes = ((QueryCursorImpl<List<?>>)fieldsCur).partitionResult();
if (cur.isQuery())
res = new JdbcQueryExecuteResult(cur.cursorId(), cur.fetchRows(), !cur.hasNext(),
@@ -678,7 +700,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
boolean last = true;
for (FieldsQueryCursor<List<?>> c : results) {
- QueryCursorImpl qryCur = (QueryCursorImpl)c;
+ QueryCursorEx<List<?>> qryCur = (QueryCursorEx<List<?>>)c;
JdbcResultInfo jdbcRes;
@@ -723,6 +745,21 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
}
}
+ /** */
+ private List<FieldsQueryCursor<List<?>>> querySqlFields(SqlFieldsQueryEx qry, GridQueryCancel cancel) {
+ if (experimentalQueryEngine != null) {
+ try {
+ return experimentalQueryEngine.query(QueryContext.of(qry, cancel), qry.getSchema(), qry.getSql(), qry.getArgs());
+ }
+ catch (IgniteSQLException e) {
+ U.warn(log, "Failed to execute SQL query using experimental engine. [qry=" + qry + ']', e);
+ }
+ }
+
+ return connCtx.kernalContext().query().querySqlFields(null, qry,
+ cliCtx, true, protocolVer.compareTo(VER_2_3_0) < 0, cancel);
+ }
+
/**
* {@link JdbcQueryCloseRequest} command handler.
*
@@ -1011,7 +1048,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
if (cur instanceof BulkLoadContextCursor)
throw new IgniteSQLException("COPY command cannot be executed in batch mode.");
- assert !((QueryCursorImpl)cur).isQuery();
+ assert !((QueryCursorEx)cur).isQuery();
Iterator<List<?>> it = cur.iterator();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
index 4e8cd75..67dedc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
@@ -168,6 +168,10 @@ public class PlatformContinuousQueryImpl implements PlatformContinuousQuery {
@Override public List<GridQueryFieldMetadata> fieldsMeta() {
return null;
}
+
+ @Override public boolean isQuery() {
+ return false;
+ }
}, initialQry.getPageSize() > 0 ? initialQry.getPageSize() : Query.DFLT_PAGE_SIZE);
}
catch (Exception e) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryContext.java
index 1abf7d6..a9dbe02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryContext.java
@@ -50,7 +50,7 @@ public final class QueryContext {
* @param params Context parameters.
* @return Query context.
*/
- public static QueryContext of(String schemaName, Object... params) {
+ public static QueryContext of(Object... params) {
return !F.isEmpty(params) ? new QueryContext(build(null, params).toArray()) : new QueryContext(EMPTY);
}