You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2022/02/03 06:31:16 UTC

[ignite] branch sql-calcite updated: IGNITE-16354 Fix memory leak in RelOptCluster - Fixes #9758.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new e0d91b4  IGNITE-16354 Fix memory leak in RelOptCluster - Fixes #9758.
e0d91b4 is described below

commit e0d91b42b3025be5df52a9cfb2806ef2a43a5ae5
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Thu Feb 3 09:23:35 2022 +0300

    IGNITE-16354 Fix memory leak in RelOptCluster - Fixes #9758.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../query/calcite/externalize/RelJson.java         |  2 +-
 .../query/calcite/externalize/RelJsonReader.java   | 10 ++---
 .../query/calcite/prepare/BaseQueryContext.java    | 46 +++++++++++++++++++---
 .../processors/query/calcite/prepare/Fragment.java |  6 +--
 .../query/calcite/prepare/FragmentPlan.java        |  2 +-
 .../query/calcite/prepare/MappingQueryContext.java | 20 ++++++++++
 .../query/calcite/prepare/QueryTemplate.java       |  6 +--
 .../processors/query/calcite/util/Commons.java     |  2 +-
 8 files changed, 74 insertions(+), 20 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
index 52255e4..9f03dc4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
@@ -743,7 +743,7 @@ class RelJson {
     /** */
     private Object toJson(RexNode node) {
         // removes calls to SEARCH and the included Sarg and converts them to comparisons
-        node = RexUtil.expandSearch(Commons.cluster().getRexBuilder(), null, node);
+        node = RexUtil.expandSearch(Commons.emptyCluster().getRexBuilder(), null, node);
 
         Map<String, Object> map;
         switch (node.getKind()) {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonReader.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonReader.java
index 7d2b4c1..a3a5000 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonReader.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonReader.java
@@ -127,12 +127,12 @@ public class RelJsonReader {
 
         /** {@inheritDoc} */
         @Override public RelOptCluster getCluster() {
-            return Commons.cluster();
+            return Commons.emptyCluster();
         }
 
         /** {@inheritDoc} */
         @Override public RelTraitSet getTraitSet() {
-            return Commons.cluster().traitSet();
+            return Commons.emptyCluster().traitSet();
         }
 
         /** {@inheritDoc} */
@@ -252,7 +252,7 @@ public class RelJsonReader {
         /** {@inheritDoc} */
         @Override public RelDataType getRowType(String tag) {
             Object o = jsonRel.get(tag);
-            return relJson.toType(Commons.typeFactory(Commons.cluster()), o);
+            return relJson.toType(Commons.typeFactory(Commons.emptyCluster()), o);
         }
 
         /** {@inheritDoc} */
@@ -260,7 +260,7 @@ public class RelJsonReader {
             List<RexNode> expressionList = getExpressionList(expressionsTag);
             List<String> names =
                 (List<String>)get(fieldsTag);
-            return Commons.typeFactory(Commons.cluster()).createStructType(
+            return Commons.typeFactory(Commons.emptyCluster()).createStructType(
                 new AbstractList<Map.Entry<String, RelDataType>>() {
                     @Override public Map.Entry<String, RelDataType> get(int index) {
                         return Pair.of(names.get(index),
@@ -323,7 +323,7 @@ public class RelJsonReader {
             Boolean distinct = (Boolean)jsonAggCall.get("distinct");
             List<Integer> operands = (List<Integer>)jsonAggCall.get("operands");
             Integer filterOperand = (Integer)jsonAggCall.get("filter");
-            RelDataType type = relJson.toType(Commons.typeFactory(Commons.cluster()), jsonAggCall.get("type"));
+            RelDataType type = relJson.toType(Commons.typeFactory(Commons.emptyCluster()), jsonAggCall.get("type"));
             String name = (String)jsonAggCall.get("name");
             return AggregateCall.create(aggregation, distinct, false, false, operands,
                 filterOperand == null ? -1 : filterOperand,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
index 9b90758..a920c95 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
@@ -17,7 +17,10 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
+import java.lang.reflect.Method;
+import java.util.List;
 import java.util.Properties;
+import com.google.common.collect.Multimap;
 import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.config.CalciteConnectionConfigImpl;
 import org.apache.calcite.config.CalciteConnectionProperty;
@@ -25,9 +28,15 @@ import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptSchema;
 import org.apache.calcite.plan.volcano.VolcanoPlanner;
 import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.metadata.CachingRelMetadataProvider;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.Metadata;
+import org.apache.calcite.rel.metadata.MetadataDef;
+import org.apache.calcite.rel.metadata.MetadataHandler;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.UnboundMetadata;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.schema.SchemaPlus;
@@ -37,8 +46,6 @@ import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata;
-import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.logger.NullLogger;
@@ -86,7 +93,13 @@ public final class BaseQueryContext extends AbstractQueryContext {
 
         EMPTY_CONTEXT = builder().build();
 
-        EMPTY_PLANNER = new VolcanoPlanner(COST_FACTORY, EMPTY_CONTEXT);
+        EMPTY_PLANNER = new VolcanoPlanner(COST_FACTORY, EMPTY_CONTEXT) {
+            @Override public void registerSchema(RelOptSchema schema) {
+                // This method in VolcanoPlanner stores schema in hash map. It can be invoked during relational
+                // operators cloning, so, can be executed even with empty context. Override it for empty context to
+                // prevent memory leaks.
+            }
+        };
 
         RelDataTypeSystem typeSys = CALCITE_CONNECTION_CONFIG.typeSystem(RelDataTypeSystem.class, FRAMEWORK_CONFIG.getTypeSystem());
         TYPE_FACTORY = new IgniteTypeFactory(typeSys);
@@ -95,8 +108,29 @@ public final class BaseQueryContext extends AbstractQueryContext {
 
         CLUSTER = RelOptCluster.create(EMPTY_PLANNER, DFLT_REX_BUILDER);
 
-        CLUSTER.setMetadataProvider(new CachingRelMetadataProvider(IgniteMetadata.METADATA_PROVIDER, EMPTY_PLANNER));
-        CLUSTER.setMetadataQuerySupplier(RelMetadataQueryEx::create);
+        // Forbid using the empty cluster in any planning or mapping procedures to prevent memory leaks.
+        String cantBeUsedMsg = "Empty cluster can't be used for planning or mapping";
+
+        CLUSTER.setMetadataProvider(
+            new RelMetadataProvider() {
+                @Override public <M extends Metadata> UnboundMetadata<M> apply(
+                    Class<? extends RelNode> relCls,
+                    Class<? extends M> metadataCls)
+                {
+                    throw new AssertionError(cantBeUsedMsg);
+                }
+
+                @Override public <M extends Metadata> Multimap<Method, MetadataHandler<M>> handlers(MetadataDef<M> def) {
+                    throw new AssertionError(cantBeUsedMsg);
+                }
+
+                @Override public List<MetadataHandler<?>> handlers(Class<? extends MetadataHandler<?>> hndCls) {
+                    throw new AssertionError(cantBeUsedMsg);
+                }
+            }
+        );
+
+        CLUSTER.setMetadataQuerySupplier(() -> { throw new AssertionError(cantBeUsedMsg); });
     }
 
     /** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
index ff1032b..5ff0676 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -22,6 +22,7 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Supplier;
 import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationMappingException;
@@ -34,7 +35,6 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.NotNull;
@@ -121,8 +121,8 @@ public class Fragment {
     }
 
     /** */
-    public Fragment copy() {
-        return new Cloner(Commons.cluster()).go(this);
+    public Fragment attach(RelOptCluster cluster) {
+        return root.getCluster() == cluster ? this : new Cloner(cluster).go(this);
     }
 
     /**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentPlan.java
index 62284d2..635ed7c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentPlan.java
@@ -28,7 +28,7 @@ public class FragmentPlan implements QueryPlan {
 
     /** */
     public FragmentPlan(IgniteRel root) {
-        RelOptCluster cluster = Commons.cluster();
+        RelOptCluster cluster = Commons.emptyCluster();
 
         this.root = new Cloner(cluster).visit(root);
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
index 105f415..73f0e91 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
@@ -18,7 +18,12 @@
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import java.util.UUID;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.metadata.CachingRelMetadataProvider;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata;
+import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
  * Query mapping context.
@@ -31,6 +36,9 @@ public class MappingQueryContext {
     private final AffinityTopologyVersion topVer;
 
     /** */
+    private RelOptCluster cluster;
+
+    /** */
     public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer) {
         this.locNodeId = locNodeId;
         this.topVer = topVer;
@@ -45,4 +53,16 @@ public class MappingQueryContext {
     public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
+
+    /** Creates a cluster. */
+    RelOptCluster cluster() {
+        if (cluster == null) {
+            cluster = RelOptCluster.create(Commons.emptyCluster().getPlanner(), Commons.emptyCluster().getRexBuilder());
+            cluster.setMetadataProvider(new CachingRelMetadataProvider(IgniteMetadata.METADATA_PROVIDER,
+                Commons.emptyCluster().getPlanner()));
+            cluster.setMetadataQuerySupplier(RelMetadataQueryEx::create);
+        }
+
+        return cluster;
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
index ac8bbc4..b1c121d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
@@ -47,7 +47,7 @@ public class QueryTemplate {
     public QueryTemplate(List<Fragment> fragments) {
         ImmutableList.Builder<Fragment> b = ImmutableList.builder();
         for (Fragment fragment : fragments)
-            b.add(fragment.copy());
+            b.add(fragment.attach(Commons.emptyCluster()));
 
         this.fragments = b.build();
     }
@@ -59,7 +59,7 @@ public class QueryTemplate {
         if (executionPlan != null && Objects.equals(executionPlan.topologyVersion(), ctx.topologyVersion()))
             return executionPlan;
 
-        List<Fragment> fragments = Commons.transform(this.fragments, Fragment::copy);
+        List<Fragment> fragments = Commons.transform(this.fragments, f -> f.attach(ctx.cluster()));
 
         Exception ex = null;
         RelMetadataQuery mq = F.first(fragments).root().getCluster().getMetadataQuery();
@@ -96,7 +96,7 @@ public class QueryTemplate {
         ImmutableList.Builder<Fragment> b = ImmutableList.builder();
 
         for (Fragment fragment : fragments)
-            b.add(fragment.map(mappingService, ctx, mq));
+            b.add(fragment.map(mappingService, ctx, mq).attach(Commons.emptyCluster()));
 
         return b.build();
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index f25fa87..c94245d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -431,7 +431,7 @@ public final class Commons {
     }
 
     /** */
-    public static RelOptCluster cluster() {
+    public static RelOptCluster emptyCluster() {
         return BaseQueryContext.CLUSTER;
     }