You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by zs...@apache.org on 2022/03/04 12:52:13 UTC
[ignite-3] branch main updated: IGNITE-16645 Adoption of a bunch of calcite related tickets from Ignite-2 part2 - Fixes #702.
This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6659e4a IGNITE-16645 Adoption of a bunch of calcite related tickets from Ignite-2 part2 - Fixes #702.
6659e4a is described below
commit 6659e4a3deb55f200f2575d7ad40b8fbc50bdc81
Author: zstan <st...@gmail.com>
AuthorDate: Fri Mar 4 12:18:44 2022 +0300
IGNITE-16645 Adoption of a bunch of calcite related tickets from Ignite-2 part2 - Fixes #702.
IGNITE-16228 SetOp execution node cannot be rewinded.
IGNITE-16191 Unexpected result of COUNT with multiple parameters.
IGNITE-16354 Memory leak in RelOptCluster during mapping.
Signed-off-by: zstan <st...@gmail.com>
---
.../ignite/internal/sql/engine/ItSetOpTest.java | 14 +++++
.../engine/exec/exp/agg/AccumulatorsFactory.java | 5 ++
.../sql/engine/exec/rel/AbstractSetOpNode.java | 1 +
.../internal/sql/engine/prepare/Fragment.java | 4 +-
.../sql/engine/prepare/IgniteSqlValidator.java | 5 ++
.../sql/engine/prepare/MappingQueryContext.java | 20 +++++++
.../internal/sql/engine/prepare/QueryTemplate.java | 12 ++---
.../internal/sql/engine/util/BaseQueryContext.java | 61 ++++++++++++++--------
.../ignite/internal/sql/engine/util/Commons.java | 4 --
9 files changed, 93 insertions(+), 33 deletions(-)
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
index 43715cf..3ab9a49 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
@@ -220,6 +220,20 @@ public class ItSetOpTest extends AbstractBasicIntegrationTest {
assertEquals(2, countIf(rows, r -> r.get(0).equals("Igor1")));
}
+ /**
+ * Test that set op node can be rewinded.
+ */
+ @Test
+ public void testSetOpRewindability() {
+ sql("CREATE TABLE test(id int PRIMARY KEY, i INTEGER)");
+ sql("INSERT INTO test VALUES (1, 1), (2, 2)");
+
+ assertQuery("SELECT (SELECT i FROM test EXCEPT SELECT test.i) FROM test")
+ .returns(1)
+ .returns(2)
+ .check();
+ }
+
@Test
public void testUnionAll() {
List<List<?>> rows = sql("SELECT name, salary FROM emp1 "
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
index d88de66..2a08145 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
@@ -205,6 +205,11 @@ public class AccumulatorsFactory<RowT> implements Supplier<List<AccumulatorWrapp
List<RelDataType> inTypes = SqlTypeUtil.projectTypes(inputRowType, call.getArgList());
List<RelDataType> outTypes = accumulator.argumentTypes(ctx.getTypeFactory());
+ if (call.getArgList().size() > outTypes.size()) {
+ throw new AssertionError("Unexpected number of arguments: "
+ + "expected=" + outTypes.size() + ", actual=" + inTypes.size());
+ }
+
if (call.ignoreNulls()) {
inTypes = Commons.transform(inTypes, this::nonNull);
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java
index 1bc76b0..0ccd890 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java
@@ -127,6 +127,7 @@ public abstract class AbstractSetOpNode<RowT> extends AbstractNode<RowT> {
protected void rewindInternal() {
requested = 0;
waiting = 0;
+ curSrcIdx = 0;
grouping.groups.clear();
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
index 2df6073..2a3d6f7 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
@@ -141,8 +141,8 @@ public class Fragment {
return !(root instanceof IgniteSender);
}
- public Fragment copy(RelOptCluster cluster) {
- return new Cloner(cluster).go(this);
+ public Fragment attach(RelOptCluster cluster) {
+ return root.getCluster() == cluster ? this : new Cloner(cluster).go(this);
}
/**
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteSqlValidator.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteSqlValidator.java
index 9f0b910..8d73afe 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteSqlValidator.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteSqlValidator.java
@@ -260,6 +260,11 @@ public class IgniteSqlValidator extends SqlValidatorImpl {
switch (aggFunction.kind) {
case COUNT:
+ if (call.operandCount() > 1) {
+ throw newValidationError(call, RESOURCE.invalidArgCount(aggFunction.getName(), 1));
+ }
+
+ return;
case SUM:
case AVG:
case MIN:
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MappingQueryContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MappingQueryContext.java
index cc190ea..cb98009 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MappingQueryContext.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MappingQueryContext.java
@@ -17,6 +17,12 @@
package org.apache.ignite.internal.sql.engine.prepare;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.metadata.CachingRelMetadataProvider;
+import org.apache.ignite.internal.sql.engine.metadata.IgniteMetadata;
+import org.apache.ignite.internal.sql.engine.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+
/**
* Query mapping context.
*/
@@ -25,6 +31,8 @@ public class MappingQueryContext {
private final long topVer;
+ private RelOptCluster cluster;
+
/**
* Constructor.
*
@@ -36,6 +44,18 @@ public class MappingQueryContext {
this.topVer = topVer;
}
+ /** Creates a cluster. */
+ RelOptCluster cluster() {
+ if (cluster == null) {
+ cluster = RelOptCluster.create(Commons.cluster().getPlanner(), Commons.cluster().getRexBuilder());
+ cluster.setMetadataProvider(new CachingRelMetadataProvider(IgniteMetadata.METADATA_PROVIDER,
+ Commons.cluster().getPlanner()));
+ cluster.setMetadataQuerySupplier(RelMetadataQueryEx::create);
+ }
+
+ return cluster;
+ }
+
public String localNodeId() {
return locNodeId;
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryTemplate.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryTemplate.java
index 775bd67..e747990 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryTemplate.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryTemplate.java
@@ -51,10 +51,10 @@ public class QueryTemplate {
public QueryTemplate(List<Fragment> fragments) {
List<Fragment> frgs = new ArrayList<>(fragments.size());
- RelOptCluster cluster = Commons.createCluster();
+ RelOptCluster cluster = Commons.cluster();
for (Fragment fragment : fragments) {
- frgs.add(fragment.copy(cluster));
+ frgs.add(fragment.attach(cluster));
}
this.fragments = List.copyOf(frgs);
@@ -70,9 +70,7 @@ public class QueryTemplate {
return executionPlan;
}
- RelOptCluster cluster = Commons.createCluster();
-
- List<Fragment> fragments = Commons.transform(this.fragments, fragment -> fragment.copy(cluster));
+ List<Fragment> fragments = Commons.transform(this.fragments, fragment -> fragment.attach(ctx.cluster()));
Exception ex = null;
RelMetadataQuery mq = first(fragments).root().getCluster().getMetadataQuery();
@@ -103,8 +101,10 @@ public class QueryTemplate {
private List<Fragment> map(MappingService mappingService, List<Fragment> fragments, MappingQueryContext ctx, RelMetadataQuery mq) {
List<Fragment> frgs = new ArrayList<>();
+ RelOptCluster cluster = Commons.cluster();
+
for (Fragment fragment : fragments) {
- frgs.add(fragment.map(mappingService, ctx, mq));
+ frgs.add(fragment.map(mappingService, ctx, mq).attach(cluster));
}
return List.copyOf(frgs);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
index 013b6be..bcfbb99 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
@@ -20,6 +20,9 @@ package org.apache.ignite.internal.sql.engine.util;
import static org.apache.calcite.tools.Frameworks.createRootSchema;
import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
+import com.google.common.collect.Multimap;
+import java.lang.reflect.Method;
+import java.util.List;
import java.util.Properties;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
@@ -30,15 +33,18 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.metadata.CachingRelMetadataProvider;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.Metadata;
+import org.apache.calcite.rel.metadata.MetadataDef;
+import org.apache.calcite.rel.metadata.MetadataHandler;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.UnboundMetadata;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.internal.sql.engine.QueryCancel;
-import org.apache.ignite.internal.sql.engine.metadata.IgniteMetadata;
-import org.apache.ignite.internal.sql.engine.metadata.RelMetadataQueryEx;
import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.lang.IgniteLogger;
@@ -80,7 +86,9 @@ public final class BaseQueryContext extends AbstractQueryContext {
DUMMY_PLANNER = new VolcanoPlanner(COST_FACTORY, EMPTY_CONTEXT) {
@Override
public void registerSchema(RelOptSchema schema) {
- throw new UnsupportedOperationException("Dummy planer. Please use a specific instance.");
+ // This method in VolcanoPlanner stores schema in hash map. It can be invoked during relational
+ // operators cloning, so, can be executed even with empty context. Override it for empty context to
+ // prevent memory leaks.
}
};
@@ -91,27 +99,38 @@ public final class BaseQueryContext extends AbstractQueryContext {
RelOptCluster cluster = RelOptCluster.create(DUMMY_PLANNER, DFLT_REX_BUILDER);
- cluster.setMetadataProvider(IgniteMetadata.METADATA_PROVIDER);
- cluster.setMetadataQuerySupplier(RelMetadataQueryEx::create);
+ // Forbid using the empty cluster in any planning or mapping procedures to prevent memory leaks.
+ String cantBeUsedMsg = "Empty cluster can't be used for planning or mapping";
+
+ cluster.setMetadataProvider(
+ new RelMetadataProvider() {
+ @Override
+ public <M extends Metadata> UnboundMetadata<M> apply(
+ Class<? extends RelNode> relCls,
+ Class<? extends M> metadataCls
+ ) {
+ throw new AssertionError(cantBeUsedMsg);
+ }
+
+ @Override
+ public <M extends Metadata> Multimap<Method, MetadataHandler<M>> handlers(MetadataDef<M> def) {
+ throw new AssertionError(cantBeUsedMsg);
+ }
+
+ @Override
+ public List<MetadataHandler<?>> handlers(Class<? extends MetadataHandler<?>> hndCls) {
+ throw new AssertionError(cantBeUsedMsg);
+ }
+ }
+ );
+
+ cluster.setMetadataQuerySupplier(() -> {
+ throw new AssertionError(cantBeUsedMsg);
+ });
CLUSTER = cluster;
}
- /**
- * Creates a new cluster.
- *
- * @return New cluster.
- */
- public static RelOptCluster createCluster() {
- RelOptCluster cluster = RelOptCluster.create(new VolcanoPlanner(COST_FACTORY, EMPTY_CONTEXT), DFLT_REX_BUILDER);
-
- cluster.setMetadataProvider(new CachingRelMetadataProvider(IgniteMetadata.METADATA_PROVIDER,
- cluster.getPlanner()));
- cluster.setMetadataQuerySupplier(RelMetadataQueryEx::create);
-
- return cluster;
- }
-
private final FrameworkConfig cfg;
private final IgniteLogger log;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
index 7715808..070d924 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
@@ -834,8 +834,4 @@ public final class Commons {
public static RelOptCluster cluster() {
return CLUSTER;
}
-
- public static RelOptCluster createCluster() {
- return BaseQueryContext.createCluster();
- }
}