You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2021/11/23 12:41:27 UTC

[ignite] branch sql-calcite updated: IGNITE-14827 User defined functions support - Fixes #9272.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new dd2c1ad  IGNITE-14827 User defined functions support - Fixes #9272.
dd2c1ad is described below

commit dd2c1ade35ad041c54b37e4203eb476be51871cc
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Tue Nov 23 15:21:30 2021 +0300

    IGNITE-14827 User defined functions support - Fixes #9272.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../query/calcite/CalciteQueryProcessor.java       |   1 -
 .../calcite/exec/exp/IgniteScalarFunction.java     |  67 ++++++++
 .../query/calcite/externalize/RelJson.java         |  13 +-
 .../query/calcite/externalize/RelJsonReader.java   |   8 +-
 .../query/calcite/externalize/RelJsonWriter.java   |   3 +-
 .../query/calcite/prepare/BaseQueryContext.java    |  15 ++
 .../query/calcite/prepare/PlanningContext.java     |  14 +-
 .../query/calcite/prepare/QueryPlanCacheImpl.java  |   6 +
 .../query/calcite/schema/IgniteSchema.java         |  20 +++
 .../query/calcite/schema/SchemaHolderImpl.java     |  11 ++
 .../UserDefinedFunctionsIntegrationTest.java       | 169 +++++++++++++++++++++
 .../query/calcite/planner/AbstractPlannerTest.java |   2 +-
 .../ignite/testsuites/IntegrationTestSuite.java    |   2 +
 .../query/schema/SchemaChangeListener.java         |  10 ++
 .../processors/query/h2/SchemaManager.java         |  12 +-
 15 files changed, 335 insertions(+), 18 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 508d50e..98042d4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
-
 import org.apache.calcite.DataContexts;
 import org.apache.calcite.config.Lex;
 import org.apache.calcite.config.NullCollation;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/IgniteScalarFunction.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/IgniteScalarFunction.java
new file mode 100644
index 0000000..0e44c88
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/IgniteScalarFunction.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.exec.exp;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import org.apache.calcite.adapter.enumerable.NullPolicy;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ScalarFunction;
+import org.apache.calcite.schema.impl.ReflectiveFunctionBase;
+
+/**
+ * Implementation of {@link ScalarFunction} for Ignite user defined functions.
+ */
+public class IgniteScalarFunction extends ReflectiveFunctionBase implements ScalarFunction, ImplementableFunction {
+    /** Implementor. */
+    private final CallImplementor implementor;
+
+    /**
+     * Private constructor.
+     */
+    private IgniteScalarFunction(Method method, CallImplementor implementor) {
+        super(method);
+
+        this.implementor = implementor;
+    }
+
+    /**
+     * Creates {@link ScalarFunction} from given method.
+     *
+     * @param method Method that is used to implement the function.
+     * @return Created {@link ScalarFunction}.
+     */
+    public static ScalarFunction create(Method method) {
+        assert Modifier.isStatic(method.getModifiers());
+
+        CallImplementor implementor = RexImpTable.createImplementor(
+            new ReflectiveCallNotNullImplementor(method), NullPolicy.NONE, false);
+
+        return new IgniteScalarFunction(method, implementor);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
+        return typeFactory.createJavaType(method.getReturnType());
+    }
+
+    /** {@inheritDoc} */
+    @Override public CallImplementor getImplementor() {
+        return implementor;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
index 3a969f036..45e55da 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -97,7 +96,7 @@ import org.apache.calcite.sql.validate.SqlNameMatchers;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
@@ -222,6 +221,14 @@ class RelJson {
             "org.apache.calcite.adapter.jdbc.",
             "org.apache.calcite.adapter.jdbc.JdbcRules$");
 
+    /** Query context. */
+    private final BaseQueryContext qctx;
+
+    /** */
+    RelJson(BaseQueryContext qctx) {
+        this.qctx = qctx;
+    }
+
     /** */
     Function<RelInput, RelNode> factory(String type) {
         return FACTORIES_CACHE.getUnchecked(type);
@@ -526,7 +533,7 @@ class RelJson {
         SqlSyntax sqlSyntax = toEnum(map.get("syntax"));
         List<SqlOperator> operators = new ArrayList<>();
 
-        CalciteQueryProcessor.FRAMEWORK_CONFIG.getOperatorTable().lookupOperatorOverloads(
+        qctx.opTable().lookupOperatorOverloads(
             new SqlIdentifier(name, new SqlParserPos(0, 0)),
             null,
             sqlSyntax,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonReader.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonReader.java
index 573fdf3..cdcb892 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonReader.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonReader.java
@@ -73,16 +73,16 @@ public class RelJsonReader {
 
     /** */
     public static <T extends RelNode> T fromJson(BaseQueryContext ctx, String json) {
-        RelJsonReader reader = new RelJsonReader(ctx.catalogReader());
+        RelJsonReader reader = new RelJsonReader(ctx);
 
         return (T)reader.read(json);
     }
 
     /** */
-    public RelJsonReader(RelOptSchema relOptSchema) {
-        this.relOptSchema = relOptSchema;
+    public RelJsonReader(BaseQueryContext qctx) {
+        relOptSchema = qctx.catalogReader();
 
-        relJson = new RelJson();
+        relJson = new RelJson(qctx);
     }
 
     /** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonWriter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonWriter.java
index 8a178b4..e628313 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonWriter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonWriter.java
@@ -33,6 +33,7 @@ import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
 
 /**
  * Callback for a relational expression to dump itself as JSON.
@@ -73,7 +74,7 @@ public class RelJsonWriter implements RelWriter {
     public RelJsonWriter(RelOptCluster cluster, boolean pretty) {
         this.pretty = pretty;
 
-        relJson = new RelJson();
+        relJson = new RelJson(cluster.getPlanner().getContext().unwrap(BaseQueryContext.class));
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
index 490e555..9b90758 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
@@ -31,6 +31,8 @@ import org.apache.calcite.rel.metadata.CachingRelMetadataProvider;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.util.SqlOperatorTables;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.IgniteLogger;
@@ -113,6 +115,9 @@ public final class BaseQueryContext extends AbstractQueryContext {
     private CalciteCatalogReader catalogReader;
 
     /** */
+    private SqlOperatorTable opTable;
+
+    /** */
     private final GridQueryCancel qryCancel;
 
     /**
@@ -180,6 +185,16 @@ public final class BaseQueryContext extends AbstractQueryContext {
     }
 
     /**
+     * @return Sql operators table.
+     */
+    public SqlOperatorTable opTable() {
+        if (opTable == null)
+            opTable = SqlOperatorTables.chain(config().getOperatorTable(), catalogReader());
+
+        return opTable;
+    }
+
+    /**
      * @return New catalog reader.
      */
     public CalciteCatalogReader catalogReader() {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
index 36c762d..3754ce7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
@@ -86,13 +86,6 @@ public final class PlanningContext implements Context {
 
     // Helper methods
     /**
-     * @return Sql operators table.
-     */
-    public SqlOperatorTable opTable() {
-        return config().getOperatorTable();
-    }
-
-    /**
      * @return Sql conformance.
      */
     public SqlConformance conformance() {
@@ -131,6 +124,13 @@ public final class PlanningContext implements Context {
     }
 
     /**
+     * @return Sql operators table.
+     */
+    public SqlOperatorTable opTable() {
+        return unwrap(BaseQueryContext.class).opTable();
+    }
+
+    /**
      * @return New catalog reader.
      */
     public CalciteCatalogReader catalogReader() {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
index 2552898..cdd6cf0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
+import java.lang.reflect.Method;
 import java.util.Map;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.GridKernalContext;
@@ -132,4 +133,9 @@ public class QueryPlanCacheImpl extends AbstractService implements QueryPlanCach
         GridCacheContextInfo<?, ?> cacheInfo) {
         clear();
     }
+
+    /** {@inheritDoc} */
+    @Override public void onFunctionCreated(String schemaName, String name, Method method) {
+        // No-op.
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java
index ee2422f..09b5682 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java
@@ -21,6 +21,10 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import org.apache.calcite.schema.Function;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.impl.AbstractSchema;
 
@@ -34,6 +38,9 @@ public class IgniteSchema extends AbstractSchema {
     /** */
     private final Map<String, IgniteTable> tblMap = new ConcurrentHashMap<>();
 
+    /** */
+    private final Multimap<String, Function> funcMap = Multimaps.synchronizedMultimap(HashMultimap.create());
+
     /**
      * Creates a Schema.
      *
@@ -55,6 +62,11 @@ public class IgniteSchema extends AbstractSchema {
         return Collections.unmodifiableMap(tblMap);
     }
 
+    /** {@inheritDoc} */
+    @Override protected Multimap<String, Function> getFunctionMultimap() {
+        return Multimaps.unmodifiableMultimap(funcMap);
+    }
+
     /**
      * @param tbl Table.
      */
@@ -68,4 +80,12 @@ public class IgniteSchema extends AbstractSchema {
     public void removeTable(String tblName) {
         tblMap.remove(tblName);
     }
+
+    /**
+     * @param name Function name.
+     * @param func SQL function.
+     */
+    public void addFunction(String name, Function func) {
+        funcMap.put(name, func);
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
index 1e618ba..037a0bb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.schema;
 
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.ignite.internal.cache.query.index.Index;
 import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
 import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.IgniteScalarFunction;
 import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
 import org.apache.ignite.internal.processors.query.schema.SchemaChangeListener;
 import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
@@ -265,6 +267,15 @@ public class SchemaHolderImpl extends AbstractService implements SchemaHolder, S
     }
 
     /** {@inheritDoc} */
+    @Override public void onFunctionCreated(String schemaName, String name, Method method) {
+        IgniteSchema schema = igniteSchemas.computeIfAbsent(schemaName, IgniteSchema::new);
+
+        schema.addFunction(name.toUpperCase(), IgniteScalarFunction.create(method));
+
+        rebuild();
+    }
+
+    /** {@inheritDoc} */
     @Override public SchemaPlus schema(@Nullable String schema) {
         return schema != null ? calciteSchema.getSubSchema(schema) : calciteSchema;
     }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedFunctionsIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedFunctionsIntegrationTest.java
new file mode 100644
index 0000000..17cbd3f
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedFunctionsIntegrationTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ * Integration test for user defined functions.
+ */
+public class UserDefinedFunctionsIntegrationTest extends AbstractBasicIntegrationTest {
+    /** */
+    @Test
+    public void testFunctions() throws Exception {
+        // Cache with impicit schema.
+        IgniteCache<Integer, Employer> emp1 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>("emp1")
+            .setSqlFunctionClasses(AddFunctionsLibrary.class, MulFunctionsLibrary.class)
+            .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("emp1")))
+        );
+
+        // Cache with explicit custom schema.
+        IgniteCache<Integer, Employer> emp2 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>("emp2")
+            .setSqlFunctionClasses(AddFunctionsLibrary.class)
+            .setSqlSchema("emp2_schema")
+            .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("emp2")))
+        );
+
+        // Cache with PUBLIC schema.
+        IgniteCache<Integer, Employer> emp3 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>("emp3")
+            .setSqlFunctionClasses(OtherFunctionsLibrary.class)
+            .setSqlSchema("PUBLIC")
+            .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("emp3")))
+        );
+
+        emp1.put(1, new Employer("Igor1", 1d));
+        emp1.put(2, new Employer("Roman1", 2d));
+
+        emp2.put(1, new Employer("Igor2", 10d));
+        emp2.put(2, new Employer("Roman2", 20d));
+
+        emp3.put(1, new Employer("Igor3", 100d));
+        emp3.put(2, new Employer("Roman3", 200d));
+
+        awaitPartitionMapExchange();
+
+        assertQuery("SELECT \"emp1\".add(1, 2)").returns(3.0d).check();
+        assertQuery("SELECT \"emp1\".add(1, 2, 3)").returns(6).check();
+        assertQuery("SELECT \"emp1\".add(1, 2, 3, 4)").returns(10).check();
+        assertQuery("SELECT \"emp1\".mul(1, 2)").returns(2).check();
+        assertQuery("SELECT \"emp1\".mul(1, 2, 3)").returns(6).check();
+        assertQuery("SELECT \"emp1\".test(1, 2, 2)").returns(true).check();
+        assertQuery("SELECT \"emp1\".test(1, 2, 3)").returns(false).check();
+        assertQuery("SELECT EMP2_SCHEMA.add(1, 2)").returns(3d).check();
+        assertQuery("SELECT EMP2_SCHEMA.add(1, 2, 3)").returns(6).check();
+        assertQuery("SELECT EMP2_SCHEMA.add(1, 2, 3, 4)").returns(10).check();
+        assertQuery("SELECT sq(4)").returns(16d).check();
+        assertQuery("SELECT echo('test')").returns("test").check();
+        assertQuery("SELECT sq(salary) FROM emp3").returns(10_000d).returns(40_000d).check();
+        assertQuery("SELECT echo(name) FROM emp3").returns("Igor3").returns("Roman3").check();
+        assertQuery("SELECT sq(salary) FROM EMP2_SCHEMA.emp2").returns(100d).returns(400d).check();
+        assertQuery("SELECT sq(salary) FROM \"emp1\".emp1").returns(1d).returns(4d).check();
+        assertThrows("SELECT add(1, 2)");
+        assertThrows("SELECT mul(1, 2)");
+        assertThrows("SELECT EMP2_SCHEMA.mul(1, 2)");
+        assertThrows("SELECT EMP2_SCHEMA.sq(1)");
+
+        client.cache("emp1").destroy();
+        awaitPartitionMapExchange();
+
+        assertThrows("SELECT \"emp1\".add(1, 2)");
+        assertQuery("SELECT EMP2_SCHEMA.add(1, 2)").returns(3d).check();
+
+        client.cache("emp2").destroy();
+        awaitPartitionMapExchange();
+
+        assertThrows("SELECT EMP2_SCHEMA.add(1, 2)");
+        assertQuery("SELECT sq(4)").returns(16d).check();
+
+        client.cache("emp3").destroy();
+        awaitPartitionMapExchange();
+
+        // PUBLIC schema is predefined and not dropped on cache destroy.
+        assertQuery("SELECT sq(4)").returns(16d).check();
+    }
+
+    /** */
+    @SuppressWarnings("ThrowableNotThrown")
+    private void assertThrows(String sql) {
+        GridTestUtils.assertThrowsWithCause(() -> assertQuery(sql).check(), IgniteSQLException.class);
+    }
+
+    /** */
+    public static class AddFunctionsLibrary {
+        /** */
+        @QuerySqlFunction
+        public static double add(double a, double b) {
+            return a + b;
+        }
+
+        /** */
+        @QuerySqlFunction
+        public static int add(int a, int b, int c) {
+            return a + b + c;
+        }
+
+        /** */
+        @QuerySqlFunction(alias = "add")
+        public static int addFour(int a, int b, int c, int d) {
+            return a + b + c + d;
+        }
+    }
+
+    /** */
+    public static class MulFunctionsLibrary {
+        /** */
+        @QuerySqlFunction
+        public static int mul(int a, int b) {
+            return a * b;
+        }
+
+        /** */
+        @QuerySqlFunction
+        public static int mul(int a, int b, int c) {
+            return a * b * c;
+        }
+
+        /** */
+        @QuerySqlFunction
+        public static boolean test(int a, int b, int res) {
+            return a * b == res;
+        }
+    }
+
+    /** */
+    public static class OtherFunctionsLibrary {
+        /** */
+        @QuerySqlFunction
+        public static double sq(double a) {
+            return a * a;
+        }
+
+        /** */
+        @QuerySqlFunction
+        public static String echo(String s) {
+            return s;
+        }
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index 8ffaa06..c9abe3b 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -345,7 +345,7 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
         List<RelNode> deserializedNodes = new ArrayList<>();
 
         for (String s : serialized) {
-            RelJsonReader reader = new RelJsonReader(ctx.catalogReader());
+            RelJsonReader reader = new RelJsonReader(ctx);
 
             deserializedNodes.add(reader.read(s));
         }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index 0be1471..96cccfc 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.query.calcite.integration.SortAggre
 import org.apache.ignite.internal.processors.query.calcite.integration.TableDdlIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.TableDmlIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.UserDdlIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.UserDefinedFunctionsIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcQueryTest;
 import org.apache.ignite.internal.processors.query.calcite.rules.JoinCommuteRulesTest;
 import org.apache.ignite.internal.processors.query.calcite.rules.OrToUnionRuleTest;
@@ -81,6 +82,7 @@ import org.junit.runners.Suite;
     ServerStatisticsIntegrationTest.class,
     JoinIntegrationTest.class,
     IntervalTest.class,
+    UserDefinedFunctionsIntegrationTest.class,
 })
 public class IntegrationTestSuite {
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java
index 27e5ff2..fa374b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.schema;
 
+import java.lang.reflect.Method;
 import org.apache.ignite.internal.cache.query.index.Index;
 import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
 import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
@@ -89,4 +90,13 @@ public interface SchemaChangeListener {
      * @param idxName Index name.
      */
     public void onIndexDropped(String schemaName, String tblName, String idxName);
+
+    /**
+     * Callback on function creation.
+     *
+     * @param schemaName Schema name.
+     * @param name Function name.
+     * @param method Public static method, implementing this function.
+     */
+    public void onFunctionCreated(String schemaName, String name, Method method);
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java
index d43c6c2..e7fc198 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java
@@ -475,6 +475,8 @@ public class SchemaManager implements GridQuerySchemaManager {
                         cls.getName() + '.' + m.getName() + '"';
 
                     connMgr.executeStatement(schema, clause);
+
+                    lsnr.onFunctionCreated(schema, alias, m);
                 }
             }
         }
@@ -938,6 +940,9 @@ public class SchemaManager implements GridQuerySchemaManager {
 
         /** {@inheritDoc} */
         @Override public void onSqlTypeDropped(String schemaName, GridQueryTypeDescriptor typeDescriptor) {}
+
+        /** {@inheritDoc} */
+        @Override public void onFunctionCreated(String schemaName, String name, Method method) {}
     }
 
     /** */
@@ -965,7 +970,7 @@ public class SchemaManager implements GridQuerySchemaManager {
          * {@inheritDoc}
          */
         @Override public void onSchemaDropped(String schemaName) {
-            lsnrs.forEach(lsnr -> lsnr.onSchemaCreated(schemaName));
+            lsnrs.forEach(lsnr -> lsnr.onSchemaDropped(schemaName));
         }
 
         /**
@@ -1011,6 +1016,11 @@ public class SchemaManager implements GridQuerySchemaManager {
         @Override public void onIndexDropped(String schemaName, String tblName, String idxName) {
             lsnrs.forEach(lsnr -> lsnr.onIndexDropped(schemaName, tblName, idxName));
         }
+
+        /** {@inheritDoc} */
+        @Override public void onFunctionCreated(String schemaName, String name, Method method) {
+            lsnrs.forEach(lsnr -> lsnr.onFunctionCreated(schemaName, name, method));
+        }
     }
 
     /**