You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by zs...@apache.org on 2022/03/04 12:52:13 UTC

[ignite-3] branch main updated: IGNITE-16645 Adoption of a bunch of calcite related tickets from Ignite-2 part2 - Fixes #702.

This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6659e4a  IGNITE-16645 Adoption of a bunch of calcite related tickets from Ignite-2 part2 - Fixes #702.
6659e4a is described below

commit 6659e4a3deb55f200f2575d7ad40b8fbc50bdc81
Author: zstan <st...@gmail.com>
AuthorDate: Fri Mar 4 12:18:44 2022 +0300

    IGNITE-16645 Adoption of a bunch of calcite related tickets from Ignite-2 part2 - Fixes #702.
    
    IGNITE-16228 SetOp execution node cannot be rewinded.
    IGNITE-16191 Unexpected result of COUNT with multiple parameters.
    IGNITE-16354 Memory leak in RelOptCluster during mapping.
    
    Signed-off-by: zstan <st...@gmail.com>
---
 .../ignite/internal/sql/engine/ItSetOpTest.java    | 14 +++++
 .../engine/exec/exp/agg/AccumulatorsFactory.java   |  5 ++
 .../sql/engine/exec/rel/AbstractSetOpNode.java     |  1 +
 .../internal/sql/engine/prepare/Fragment.java      |  4 +-
 .../sql/engine/prepare/IgniteSqlValidator.java     |  5 ++
 .../sql/engine/prepare/MappingQueryContext.java    | 20 +++++++
 .../internal/sql/engine/prepare/QueryTemplate.java | 12 ++---
 .../internal/sql/engine/util/BaseQueryContext.java | 61 ++++++++++++++--------
 .../ignite/internal/sql/engine/util/Commons.java   |  4 --
 9 files changed, 93 insertions(+), 33 deletions(-)

diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
index 43715cf..3ab9a49 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
@@ -220,6 +220,20 @@ public class ItSetOpTest extends AbstractBasicIntegrationTest {
         assertEquals(2, countIf(rows, r -> r.get(0).equals("Igor1")));
     }
 
+    /**
+     * Test that set op node can be rewinded.
+     */
+    @Test
+    public void testSetOpRewindability() {
+        sql("CREATE TABLE test(id int PRIMARY KEY, i INTEGER)");
+        sql("INSERT INTO test VALUES (1, 1), (2, 2)");
+
+        assertQuery("SELECT (SELECT i FROM test EXCEPT SELECT test.i) FROM test")
+                .returns(1)
+                .returns(2)
+                .check();
+    }
+
     @Test
     public void testUnionAll() {
         List<List<?>> rows = sql("SELECT name, salary FROM emp1 "
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
index d88de66..2a08145 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
@@ -205,6 +205,11 @@ public class AccumulatorsFactory<RowT> implements Supplier<List<AccumulatorWrapp
             List<RelDataType> inTypes = SqlTypeUtil.projectTypes(inputRowType, call.getArgList());
             List<RelDataType> outTypes = accumulator.argumentTypes(ctx.getTypeFactory());
 
+            if (call.getArgList().size() > outTypes.size()) {
+                throw new AssertionError("Unexpected number of arguments: "
+                        + "expected=" + outTypes.size() + ", actual=" + inTypes.size());
+            }
+
             if (call.ignoreNulls()) {
                 inTypes = Commons.transform(inTypes, this::nonNull);
             }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java
index 1bc76b0..0ccd890 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java
@@ -127,6 +127,7 @@ public abstract class AbstractSetOpNode<RowT> extends AbstractNode<RowT> {
     protected void rewindInternal() {
         requested = 0;
         waiting = 0;
+        curSrcIdx = 0;
         grouping.groups.clear();
     }
 
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
index 2df6073..2a3d6f7 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
@@ -141,8 +141,8 @@ public class Fragment {
         return !(root instanceof IgniteSender);
     }
 
-    public Fragment copy(RelOptCluster cluster) {
-        return new Cloner(cluster).go(this);
+    public Fragment attach(RelOptCluster cluster) {
+        return root.getCluster() == cluster ? this : new Cloner(cluster).go(this);
     }
 
     /**
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteSqlValidator.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteSqlValidator.java
index 9f0b910..8d73afe 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteSqlValidator.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteSqlValidator.java
@@ -260,6 +260,11 @@ public class IgniteSqlValidator extends SqlValidatorImpl {
 
         switch (aggFunction.kind) {
             case COUNT:
+                if (call.operandCount() > 1) {
+                    throw newValidationError(call, RESOURCE.invalidArgCount(aggFunction.getName(), 1));
+                }
+
+                return;
             case SUM:
             case AVG:
             case MIN:
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MappingQueryContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MappingQueryContext.java
index cc190ea..cb98009 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MappingQueryContext.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MappingQueryContext.java
@@ -17,6 +17,12 @@
 
 package org.apache.ignite.internal.sql.engine.prepare;
 
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.metadata.CachingRelMetadataProvider;
+import org.apache.ignite.internal.sql.engine.metadata.IgniteMetadata;
+import org.apache.ignite.internal.sql.engine.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+
 /**
  * Query mapping context.
  */
@@ -25,6 +31,8 @@ public class MappingQueryContext {
 
     private final long topVer;
 
+    private RelOptCluster cluster;
+
     /**
      * Constructor.
      *
@@ -36,6 +44,18 @@ public class MappingQueryContext {
         this.topVer = topVer;
     }
 
+    /** Creates a cluster. */
+    RelOptCluster cluster() {
+        if (cluster == null) {
+            cluster = RelOptCluster.create(Commons.cluster().getPlanner(), Commons.cluster().getRexBuilder());
+            cluster.setMetadataProvider(new CachingRelMetadataProvider(IgniteMetadata.METADATA_PROVIDER,
+                    Commons.cluster().getPlanner()));
+            cluster.setMetadataQuerySupplier(RelMetadataQueryEx::create);
+        }
+
+        return cluster;
+    }
+
     public String localNodeId() {
         return locNodeId;
     }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryTemplate.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryTemplate.java
index 775bd67..e747990 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryTemplate.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryTemplate.java
@@ -51,10 +51,10 @@ public class QueryTemplate {
     public QueryTemplate(List<Fragment> fragments) {
         List<Fragment> frgs = new ArrayList<>(fragments.size());
 
-        RelOptCluster cluster = Commons.createCluster();
+        RelOptCluster cluster = Commons.cluster();
 
         for (Fragment fragment : fragments) {
-            frgs.add(fragment.copy(cluster));
+            frgs.add(fragment.attach(cluster));
         }
 
         this.fragments = List.copyOf(frgs);
@@ -70,9 +70,7 @@ public class QueryTemplate {
             return executionPlan;
         }
 
-        RelOptCluster cluster = Commons.createCluster();
-
-        List<Fragment> fragments = Commons.transform(this.fragments, fragment -> fragment.copy(cluster));
+        List<Fragment> fragments = Commons.transform(this.fragments, fragment -> fragment.attach(ctx.cluster()));
 
         Exception ex = null;
         RelMetadataQuery mq = first(fragments).root().getCluster().getMetadataQuery();
@@ -103,8 +101,10 @@ public class QueryTemplate {
     private List<Fragment> map(MappingService mappingService, List<Fragment> fragments, MappingQueryContext ctx, RelMetadataQuery mq) {
         List<Fragment> frgs = new ArrayList<>();
 
+        RelOptCluster cluster = Commons.cluster();
+
         for (Fragment fragment : fragments) {
-            frgs.add(fragment.map(mappingService, ctx, mq));
+            frgs.add(fragment.map(mappingService, ctx, mq).attach(cluster));
         }
 
         return List.copyOf(frgs);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
index 013b6be..bcfbb99 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
@@ -20,6 +20,9 @@ package org.apache.ignite.internal.sql.engine.util;
 import static org.apache.calcite.tools.Frameworks.createRootSchema;
 import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
 
+import com.google.common.collect.Multimap;
+import java.lang.reflect.Method;
+import java.util.List;
 import java.util.Properties;
 import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.config.CalciteConnectionConfigImpl;
@@ -30,15 +33,18 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptSchema;
 import org.apache.calcite.plan.volcano.VolcanoPlanner;
 import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.metadata.CachingRelMetadataProvider;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.Metadata;
+import org.apache.calcite.rel.metadata.MetadataDef;
+import org.apache.calcite.rel.metadata.MetadataHandler;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.UnboundMetadata;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
-import org.apache.ignite.internal.sql.engine.metadata.IgniteMetadata;
-import org.apache.ignite.internal.sql.engine.metadata.RelMetadataQueryEx;
 import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.lang.IgniteLogger;
@@ -80,7 +86,9 @@ public final class BaseQueryContext extends AbstractQueryContext {
         DUMMY_PLANNER = new VolcanoPlanner(COST_FACTORY, EMPTY_CONTEXT) {
             @Override
             public void registerSchema(RelOptSchema schema) {
-                throw new UnsupportedOperationException("Dummy planer. Please use a specific instance.");
+                // This method in VolcanoPlanner stores schema in hash map. It can be invoked during relational
+                // operators cloning, so, can be executed even with empty context. Override it for empty context to
+                // prevent memory leaks.
             }
         };
 
@@ -91,27 +99,38 @@ public final class BaseQueryContext extends AbstractQueryContext {
 
         RelOptCluster cluster = RelOptCluster.create(DUMMY_PLANNER, DFLT_REX_BUILDER);
 
-        cluster.setMetadataProvider(IgniteMetadata.METADATA_PROVIDER);
-        cluster.setMetadataQuerySupplier(RelMetadataQueryEx::create);
+        // Forbid using the empty cluster in any planning or mapping procedures to prevent memory leaks.
+        String cantBeUsedMsg = "Empty cluster can't be used for planning or mapping";
+
+        cluster.setMetadataProvider(
+                new RelMetadataProvider() {
+                    @Override
+                    public <M extends Metadata> UnboundMetadata<M> apply(
+                            Class<? extends RelNode> relCls,
+                            Class<? extends M> metadataCls
+                    ) {
+                        throw new AssertionError(cantBeUsedMsg);
+                    }
+
+                    @Override
+                    public <M extends Metadata> Multimap<Method, MetadataHandler<M>> handlers(MetadataDef<M> def) {
+                        throw new AssertionError(cantBeUsedMsg);
+                    }
+
+                    @Override
+                    public List<MetadataHandler<?>> handlers(Class<? extends MetadataHandler<?>> hndCls) {
+                        throw new AssertionError(cantBeUsedMsg);
+                    }
+                }
+        );
+
+        cluster.setMetadataQuerySupplier(() -> {
+            throw new AssertionError(cantBeUsedMsg);
+        });
 
         CLUSTER = cluster;
     }
 
-    /**
-     * Creates a new cluster.
-     *
-     * @return New cluster.
-     */
-    public static RelOptCluster createCluster() {
-        RelOptCluster cluster = RelOptCluster.create(new VolcanoPlanner(COST_FACTORY, EMPTY_CONTEXT), DFLT_REX_BUILDER);
-
-        cluster.setMetadataProvider(new CachingRelMetadataProvider(IgniteMetadata.METADATA_PROVIDER,
-                cluster.getPlanner()));
-        cluster.setMetadataQuerySupplier(RelMetadataQueryEx::create);
-
-        return cluster;
-    }
-
     private final FrameworkConfig cfg;
 
     private final IgniteLogger log;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
index 7715808..070d924 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
@@ -834,8 +834,4 @@ public final class Commons {
     public static RelOptCluster cluster() {
         return CLUSTER;
     }
-
-    public static RelOptCluster createCluster() {
-        return BaseQueryContext.createCluster();
-    }
 }