You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2021/04/19 08:25:51 UTC

[calcite] 01/01: [CALCITE-4564] Initialization context for non-static user-defined functions (UDFs)

This is an automated email from the ASF dual-hosted git repository.

jhyde pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git

commit b2e9e6cba1e2ce28368d1281f527a9e53f4628ca
Author: Julian Hyde <jh...@apache.org>
AuthorDate: Mon Mar 29 10:42:19 2021 -0700

    [CALCITE-4564] Initialization context for non-static user-defined functions (UDFs)
    
    When Interpreter or code generated by RexToLixTranslator calls
    a UDF, invoke its constructor only once. There will therefore
    be one instance of the UDF for the whole execution, and that
    instance can be used to store state. Typical state will be
    preparation work (e.g. compiling a regular expression) to make
    each invocation faster.
    
    Add 'interface FunctionContext' (marked as experimental). If
    a UDF has a public constructor with FunctionContext as a
    parameter, this will be called in preference to the default
    constructor.
    
    In InterpreterTest, allow more than one query to be executed
    per test method.
    
    Close apache/calcite#2395
---
 .../calcite/adapter/enumerable/EnumerableCalc.java |   1 +
 .../calcite/adapter/enumerable/RexImpTable.java    |  32 +++++-
 .../adapter/enumerable/RexToLixTranslator.java     | 102 ++++++++++++++---
 .../apache/calcite/interpreter/AggregateNode.java  |  68 ++++++++----
 .../apache/calcite/interpreter/Interpreter.java    | 121 +--------------------
 .../calcite/interpreter/JaninoRexCompiler.java     |  85 +++++++++++----
 .../org/apache/calcite/interpreter/Scalar.java     |  10 ++
 .../calcite/interpreter/TableFunctionScanNode.java |  24 ++--
 .../rules/AggregateExpandWithinDistinctRule.java   |   2 +-
 .../org/apache/calcite/rex/RexCallBinding.java     |  20 +++-
 .../org/apache/calcite/rex/RexExecutorImpl.java    |   2 +-
 .../java/org/apache/calcite/rex/RexProgram.java    |   5 +
 .../org/apache/calcite/runtime/Enumerables.java    |   1 -
 .../apache/calcite/runtime/FunctionContexts.java   |  85 +++++++++++++++
 .../org/apache/calcite/schema/FunctionContext.java | 110 +++++++++++++++++++
 .../schema/impl/ReflectiveFunctionBase.java        |  27 ++++-
 .../calcite/schema/impl/ScalarFunctionImpl.java    |   5 +-
 .../org/apache/calcite/util/BuiltInMethod.java     |   5 +
 .../main/java/org/apache/calcite/util/Util.java    |   4 +
 .../org/apache/calcite/test/InterpreterTest.java   | 114 ++++++++++++++-----
 .../test/java/org/apache/calcite/test/UdfTest.java |  82 ++++++++------
 .../java/org/apache/calcite/tools/PlannerTest.java |   2 +-
 .../test/java/org/apache/calcite/util/Smalls.java  |  56 +++++++++-
 .../apache/calcite/linq4j/tree/BlockBuilder.java   |  18 ++-
 .../apache/calcite/linq4j/tree/Expressions.java    |  37 ++++++-
 .../apache/calcite/adapter/spark/SparkRules.java   |   1 +
 26 files changed, 750 insertions(+), 269 deletions(-)

diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCalc.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCalc.java
index 7f07f9e..3c872a1 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCalc.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCalc.java
@@ -194,6 +194,7 @@ public class EnumerableCalc extends Calc implements EnumerableRel {
             typeFactory,
             conformance,
             builder3,
+            null,
             physType,
             DataContext.ROOT,
             new RexToLixTranslator.InputGetterImpl(input, result.physType),
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
index 8f9af95..bb89f1f 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
@@ -16,6 +16,7 @@
  */
 package org.apache.calcite.adapter.enumerable;
 
+import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.avatica.util.ByteString;
 import org.apache.calcite.avatica.util.DateTimeUtils;
@@ -30,6 +31,7 @@ import org.apache.calcite.linq4j.tree.ExpressionType;
 import org.apache.calcite.linq4j.tree.Expressions;
 import org.apache.calcite.linq4j.tree.MemberExpression;
 import org.apache.calcite.linq4j.tree.MethodCallExpression;
+import org.apache.calcite.linq4j.tree.NewExpression;
 import org.apache.calcite.linq4j.tree.OptimizeShuttle;
 import org.apache.calcite.linq4j.tree.ParameterExpression;
 import org.apache.calcite.linq4j.tree.Primitive;
@@ -42,6 +44,7 @@ import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexPatternFieldRef;
 import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.schema.FunctionContext;
 import org.apache.calcite.schema.ImplementableAggFunction;
 import org.apache.calcite.schema.ImplementableFunction;
 import org.apache.calcite.schema.impl.AggregateFunctionImpl;
@@ -87,6 +90,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -1375,8 +1379,7 @@ public class RexImpTable {
       if (!afi.isStatic) {
         reset.currentBlock().add(
             Expressions.statement(
-                Expressions.assign(acc.get(1),
-                    Expressions.new_(afi.declaringClass))));
+                Expressions.assign(acc.get(1), makeNew(afi))));
       }
       reset.currentBlock().add(
           Expressions.statement(
@@ -1386,6 +1389,31 @@ public class RexImpTable {
                       : acc.get(1), afi.initMethod))));
     }
 
+    private static NewExpression makeNew(AggregateFunctionImpl afi) {
+      try {
+        Constructor<?> constructor = afi.declaringClass.getConstructor();
+        Objects.requireNonNull(constructor, "constructor");
+        return Expressions.new_(afi.declaringClass);
+      } catch (NoSuchMethodException e) {
+        // ignore, and try next constructor
+      }
+      try {
+        Constructor<?> constructor =
+            afi.declaringClass.getConstructor(FunctionContext.class);
+        Objects.requireNonNull(constructor, "constructor");
+        return Expressions.new_(afi.declaringClass,
+            Expressions.call(BuiltInMethod.FUNCTION_CONTEXTS_OF.method,
+                DataContext.ROOT,
+                // TODO: pass in the values of arguments that are literals
+                Expressions.newArrayBounds(Object.class, 1,
+                    Expressions.constant(afi.getParameters().size()))));
+      } catch (NoSuchMethodException e) {
+        // This should never happen: validator should have made sure that the
+        // class had an appropriate constructor.
+        throw new AssertionError("no valid constructor for " + afi);
+      }
+    }
+
     @Override protected void implementNotNullAdd(AggContext info,
         AggAddContext add) {
       List<Expression> acc = add.accumulator();
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java
index 652fe21..0e3b467 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java
@@ -20,6 +20,7 @@ import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.avatica.util.ByteString;
 import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.linq4j.function.Function1;
 import org.apache.calcite.linq4j.tree.BlockBuilder;
 import org.apache.calcite.linq4j.tree.BlockStatement;
@@ -33,6 +34,7 @@ import org.apache.calcite.linq4j.tree.Statement;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCallBinding;
 import org.apache.calcite.rex.RexCorrelVariable;
 import org.apache.calcite.rex.RexDynamicParam;
 import org.apache.calcite.rex.RexFieldAccess;
@@ -50,6 +52,7 @@ import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.rex.RexVisitor;
 import org.apache.calcite.runtime.GeoFunctions;
 import org.apache.calcite.runtime.Geometries;
+import org.apache.calcite.schema.FunctionContext;
 import org.apache.calcite.sql.SqlIntervalQualifier;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlWindowTableFunction;
@@ -61,11 +64,13 @@ import org.apache.calcite.util.ControlFlowException;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
 
+import com.google.common.base.CaseFormat;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
 
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
 import java.lang.reflect.Type;
@@ -175,6 +180,7 @@ public class RexToLixTranslator implements RexVisitor<RexToLixTranslator.Result>
    * @param typeFactory Type factory
    * @param conformance SQL conformance
    * @param list List of statements, populated with declarations
+   * @param staticList List of member declarations
    * @param outputPhysType Output type, or null
    * @param root Root expression
    * @param inputGetter Generates expressions for inputs
@@ -184,7 +190,8 @@ public class RexToLixTranslator implements RexVisitor<RexToLixTranslator.Result>
    */
   public static List<Expression> translateProjects(RexProgram program,
       JavaTypeFactory typeFactory, SqlConformance conformance,
-      BlockBuilder list, @Nullable PhysType outputPhysType, Expression root,
+      BlockBuilder list, @Nullable BlockBuilder staticList,
+      @Nullable PhysType outputPhysType, Expression root,
       InputGetter inputGetter, @Nullable Function1<String, InputGetter> correlates) {
     List<Type> storageTypes = null;
     if (outputPhysType != null) {
@@ -195,11 +202,20 @@ public class RexToLixTranslator implements RexVisitor<RexToLixTranslator.Result>
       }
     }
     return new RexToLixTranslator(program, typeFactory, root, inputGetter,
-        list, null, new RexBuilder(typeFactory), conformance,  null)
+        list, staticList, new RexBuilder(typeFactory), conformance,  null)
         .setCorrelates(correlates)
         .translateList(program.getProjectList(), storageTypes);
   }
 
+  @Deprecated // to be removed before 2.0
+  public static List<Expression> translateProjects(RexProgram program,
+      JavaTypeFactory typeFactory, SqlConformance conformance,
+      BlockBuilder list, @Nullable PhysType outputPhysType, Expression root,
+      InputGetter inputGetter, @Nullable Function1<String, InputGetter> correlates) {
+    return translateProjects(program, typeFactory, conformance, list, null,
+        outputPhysType, root, inputGetter, correlates);
+  }
+
   public static Expression translateTableFunction(JavaTypeFactory typeFactory,
       SqlConformance conformance, BlockBuilder list,
       Expression root, RexCall rexCall, Expression inputEnumerable,
@@ -1049,10 +1065,18 @@ public class RexToLixTranslator implements RexVisitor<RexToLixTranslator.Result>
         ? getTypedNullLiteral(literal)
         : translateLiteral(literal, literal.getType(),
             typeFactory, RexImpTable.NullAs.NOT_POSSIBLE);
-    final ParameterExpression valueVariable =
-        Expressions.parameter(valueExpression.getType(),
-            list.newName("literal_value"));
-    list.add(Expressions.declare(Modifier.FINAL, valueVariable, valueExpression));
+    final ParameterExpression valueVariable;
+    final Expression literalValue =
+        appendConstant("literal_value", valueExpression);
+    if (literalValue instanceof ParameterExpression) {
+      valueVariable = (ParameterExpression) literalValue;
+    } else {
+      valueVariable =
+          Expressions.parameter(valueExpression.getType(),
+              list.newName("literal_value"));
+      list.add(
+          Expressions.declare(Modifier.FINAL, valueVariable, valueExpression));
+    }
 
     // Generate one line of code to check whether RexLiteral is null, e.g.,
     // "final boolean literal_isNull = false;"
@@ -1444,17 +1468,69 @@ public class RexToLixTranslator implements RexVisitor<RexToLixTranslator.Result>
    * <p>It might be 'new MyFunction()', but it also might be a reference
    * to a static field 'F', defined by
    * 'static final MyFunction F = new MyFunction()'.
-   * In future, we might look for and call a constructor that takes extra
-   * context, such as the values of arguments that are literals, which might
-   * allow the function to do some computation at class-load time. */
+   *
+   * <p>If there is a constructor that takes a {@link FunctionContext}
+   * argument, we call that, passing in the values of arguments that are
+   * literals; this allows the function to do some computation at load time.
+   *
+   * <p>If the call is "f(1, 2 + 3, 'foo')" and "f" is implemented by method
+   * "eval(int, int, String)" in "class MyFun", the expression might be
+   * "new MyFunction(FunctionContexts.of(new Object[] {1, null, "foo"})".
+   *
+   * @param method Method that implements the UDF
+   * @param call Call to the UDF
+   * @return New expression
+   */
   Expression functionInstance(RexCall call, Method method) {
-    final Class<?> declaringClass = method.getDeclaringClass();
+    final RexCallBinding callBinding =
+        RexCallBinding.create(typeFactory, call, program, ImmutableList.of());
+    final Expression target = getInstantiationExpression(method, callBinding);
+    return appendConstant("f", target);
+  }
 
+  /** Helper for {@link #functionInstance}. */
+  private Expression getInstantiationExpression(Method method,
+      RexCallBinding callBinding) {
+    final Class<?> declaringClass = method.getDeclaringClass();
+    // If the UDF class has a constructor that takes a Context argument,
+    // use that.
+    try {
+      final Constructor<?> constructor =
+          declaringClass.getConstructor(FunctionContext.class);
+      final List<Expression> constantArgs = new ArrayList<>();
+      //noinspection unchecked
+      Ord.forEach(method.getParameterTypes(),
+          (parameterType, i) ->
+              constantArgs.add(
+                  callBinding.isOperandLiteral(i, true)
+                      ? appendConstant("_arg",
+                      Expressions.constant(
+                          callBinding.getOperandLiteralValue(i,
+                              Primitive.box(parameterType))))
+                      : Expressions.constant(null)));
+      final Expression context =
+          Expressions.call(BuiltInMethod.FUNCTION_CONTEXTS_OF.method,
+              DataContext.ROOT,
+              Expressions.newArrayInit(Object.class, constantArgs));
+      return Expressions.new_(constructor, context);
+    } catch (NoSuchMethodException e) {
+      // ignore
+    }
     // The UDF class must have a public zero-args constructor.
     // Assume that the validator checked already.
-    final Expression target =
-        Expressions.new_(declaringClass);
-    return target;
+    return Expressions.new_(declaringClass);
+  }
+
+  /** Stores a constant expression in a variable. */
+  private Expression appendConstant(String name, Expression e) {
+    if (staticList != null) {
+      // If name is "camelCase", upperName is "CAMEL_CASE".
+      final String upperName =
+          CaseFormat.LOWER_CAMEL.to(CaseFormat.UPPER_UNDERSCORE, name);
+      return staticList.append(upperName, e);
+    } else {
+      return list.append(name, e);
+    }
   }
 
   /** Translates a field of an input to an expression. */
diff --git a/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java b/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
index 712dade..f5b2cf9 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
@@ -35,11 +35,14 @@ import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.runtime.FunctionContexts;
+import org.apache.calcite.schema.FunctionContext;
 import org.apache.calcite.schema.impl.AggregateFunctionImpl;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.validate.SqlConformance;
 import org.apache.calcite.sql.validate.SqlConformanceEnum;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
 
 import com.google.common.collect.ImmutableList;
 
@@ -139,11 +142,11 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> {
         break;
       }
       if (call.getAggregation() == SqlStdOperatorTable.SUM) {
-        return new UdaAccumulatorFactory(
-            getAggFunction(clazz), call, true);
+        return new UdaAccumulatorFactory(getAggFunction(clazz), call, true,
+            dataContext);
       } else {
-        return new UdaAccumulatorFactory(
-            getAggFunction(clazz), call, false);
+        return new UdaAccumulatorFactory(getAggFunction(clazz), call, false,
+            dataContext);
       }
     } else if (call.getAggregation() == SqlStdOperatorTable.MIN) {
       final Class<?> clazz;
@@ -168,8 +171,8 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> {
         clazz = MinLong.class;
         break;
       }
-      return new UdaAccumulatorFactory(
-          getAggFunction(clazz), call, true);
+      return new UdaAccumulatorFactory(getAggFunction(clazz), call, true,
+          dataContext);
     } else if (call.getAggregation() == SqlStdOperatorTable.MAX) {
       final Class<?> clazz;
       switch (call.getType().getSqlTypeName()) {
@@ -190,8 +193,8 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> {
         clazz = MaxLong.class;
         break;
       }
-      return new UdaAccumulatorFactory(
-          getAggFunction(clazz), call, true);
+      return new UdaAccumulatorFactory(getAggFunction(clazz), call, true,
+          dataContext);
     } else {
       final JavaTypeFactory typeFactory =
           (JavaTypeFactory) rel.getCluster().getTypeFactory();
@@ -255,9 +258,13 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> {
           Expressions.parameter(Context.class, "context");
       final ParameterExpression outputValues_ =
           Expressions.parameter(Object[].class, "outputValues");
-      Scalar addScalar =
-          JaninoRexCompiler.baz(context_, outputValues_, builder2.toBlock());
-      return new ScalarAccumulatorDef(castNonNull(null), addScalar, castNonNull(null),
+      final Scalar.Producer addScalarProducer =
+          JaninoRexCompiler.baz(context_, outputValues_, builder2.toBlock(),
+              ImmutableList.of());
+      final Scalar initScalar = castNonNull(null);
+      final Scalar addScalar = addScalarProducer.apply(dataContext);
+      final Scalar endScalar = castNonNull(null);
+      return new ScalarAccumulatorDef(initScalar, addScalar, endScalar,
           rel.getInput().getRowType().getFieldCount(), stateSize, dataContext);
     }
   }
@@ -677,26 +684,41 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> {
     public final boolean nullIfEmpty;
 
     UdaAccumulatorFactory(AggregateFunctionImpl aggFunction,
-        AggregateCall call, boolean nullIfEmpty) {
+        AggregateCall call, boolean nullIfEmpty, DataContext dataContext) {
       this.aggFunction = aggFunction;
       if (call.getArgList().size() != 1) {
         throw new UnsupportedOperationException("in current implementation, "
             + "aggregate must have precisely one argument");
       }
       argOrdinal = call.getArgList().get(0);
+      instance = createInstance(aggFunction, dataContext);
+      this.nullIfEmpty = nullIfEmpty;
+    }
+
+    static @Nullable Object createInstance(AggregateFunctionImpl aggFunction,
+        DataContext dataContext) {
       if (aggFunction.isStatic) {
-        instance = null;
-      } else {
-        try {
-          final Constructor<?> constructor =
-              aggFunction.declaringClass.getConstructor();
-          instance = constructor.newInstance();
-        } catch (InstantiationException | IllegalAccessException
-            | NoSuchMethodException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
+        return null;
+      }
+      try {
+        final Constructor<?> constructor =
+            aggFunction.declaringClass.getConstructor();
+        return constructor.newInstance();
+      } catch (InstantiationException | IllegalAccessException
+          | NoSuchMethodException | InvocationTargetException e) {
+        // ignore, and try next constructor
+      }
+      try {
+        final Constructor<?> constructor =
+            aggFunction.declaringClass.getConstructor(FunctionContext.class);
+        final Object[] args = new Object[aggFunction.getParameters().size()];
+        final FunctionContext functionContext =
+            FunctionContexts.of(dataContext, args);
+        return constructor.newInstance(functionContext);
+      } catch (InstantiationException | IllegalAccessException
+          | NoSuchMethodException | InvocationTargetException e) {
+        throw Util.toUnchecked(e);
       }
-      this.nullIfEmpty = nullIfEmpty;
     }
 
     @Override public Accumulator get() {
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
index 58b0427..6dd41c2 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
@@ -35,11 +35,7 @@ import org.apache.calcite.rel.RelVisitor;
 import org.apache.calcite.rel.rules.CoreRules;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.ReflectUtil;
 import org.apache.calcite.util.ReflectiveVisitDispatcher;
@@ -57,7 +53,6 @@ import org.checkerframework.checker.initialization.qual.NotOnlyInitialized;
 import org.checkerframework.checker.initialization.qual.UnknownInitialization;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
-import java.math.BigDecimal;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -65,7 +60,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
@@ -149,116 +143,6 @@ public class Interpreter extends AbstractEnumerable<@Nullable Object[]>
     nodes.values().forEach(NodeInfo::close);
   }
 
-  /** Not used. */
-  @SuppressWarnings("unused")
-  private static class FooCompiler implements ScalarCompiler {
-    @Override public Scalar compile(List<RexNode> nodes, RelDataType inputRowType) {
-      final RexNode node = nodes.get(0);
-      if (node instanceof RexCall) {
-        final RexCall call = (RexCall) node;
-        final Scalar argScalar = compile(call.getOperands(), inputRowType);
-        return new Scalar() {
-          final Object[] args = new Object[call.getOperands().size()];
-
-          @Override public void execute(final Context context, @Nullable Object[] results) {
-            results[0] = execute(context);
-          }
-
-          @Override public @Nullable Object execute(Context context) {
-            Comparable o0;
-            Comparable o1;
-            switch (call.getKind()) {
-            case LESS_THAN:
-            case LESS_THAN_OR_EQUAL:
-            case GREATER_THAN:
-            case GREATER_THAN_OR_EQUAL:
-            case EQUALS:
-            case NOT_EQUALS:
-              argScalar.execute(context, args);
-              o0 = (Comparable) args[0];
-              if (o0 == null) {
-                return null;
-              }
-              o1 = (Comparable) args[1];
-              if (o1 == null) {
-                return null;
-              }
-              if (o0 instanceof BigDecimal) {
-                if (o1 instanceof Double || o1 instanceof Float) {
-                  o1 = new BigDecimal(((Number) o1).doubleValue());
-                } else {
-                  o1 = new BigDecimal(((Number) o1).longValue());
-                }
-              }
-              if (o1 instanceof BigDecimal) {
-                if (o0 instanceof Double || o0 instanceof Float) {
-                  o0 = new BigDecimal(((Number) o0).doubleValue());
-                } else {
-                  o0 = new BigDecimal(((Number) o0).longValue());
-                }
-              }
-              final int c = o0.compareTo(o1);
-              switch (call.getKind()) {
-              case LESS_THAN:
-                return c < 0;
-              case LESS_THAN_OR_EQUAL:
-                return c <= 0;
-              case GREATER_THAN:
-                return c > 0;
-              case GREATER_THAN_OR_EQUAL:
-                return c >= 0;
-              case EQUALS:
-                return c == 0;
-              case NOT_EQUALS:
-                return c != 0;
-              default:
-                throw new AssertionError("unknown expression " + call);
-              }
-            default:
-              if (call.getOperator() == SqlStdOperatorTable.UPPER) {
-                argScalar.execute(context, args);
-                String s0 = (String) args[0];
-                if (s0 == null) {
-                  return null;
-                }
-                return s0.toUpperCase(Locale.ROOT);
-              }
-              if (call.getOperator() == SqlStdOperatorTable.SUBSTRING) {
-                argScalar.execute(context, args);
-                String s0 = (String) args[0];
-                Number i1 = (Number) args[1];
-                Number i2 = (Number) args[2];
-                if (s0 == null || i1 == null || i2 == null) {
-                  return null;
-                }
-                return s0.substring(i1.intValue() - 1,
-                    i1.intValue() - 1 + i2.intValue());
-              }
-              throw new AssertionError("unknown expression " + call);
-            }
-          }
-        };
-      }
-      return new Scalar() {
-        @Override public void execute(Context context, @Nullable Object[] results) {
-          results[0] = execute(context);
-        }
-
-        @Override public @Nullable Object execute(Context context) {
-          switch (node.getKind()) {
-          case LITERAL:
-            return ((RexLiteral) node).getValueAs(Comparable.class);
-          case INPUT_REF:
-            @Nullable Object[] values = requireNonNull(context.values, "context.values");
-            return values[((RexInputRef) node).getIndex()];
-          default:
-            throw new RuntimeException("unknown expression type " + node);
-          }
-        }
-      };
-    }
-  }
-
   /** Information about a node registered in the data flow graph. */
   private static class NodeInfo {
     final RelNode rel;
@@ -508,7 +392,8 @@ public class Interpreter extends AbstractEnumerable<@Nullable Object[]>
         inputRowType = getTypeFactory().builder()
             .build();
       }
-      return scalarCompiler.compile(nodes, inputRowType);
+      return scalarCompiler.compile(nodes, inputRowType)
+          .apply(interpreter.dataContext);
     }
 
     private JavaTypeFactory getTypeFactory() {
@@ -607,6 +492,6 @@ public class Interpreter extends AbstractEnumerable<@Nullable Object[]>
   /** Converts a list of expressions to a scalar that can compute their
    * values. */
   interface ScalarCompiler {
-    Scalar compile(List<RexNode> nodes, RelDataType inputRowType);
+    Scalar.Producer compile(List<RexNode> nodes, RelDataType inputRowType);
   }
 }
diff --git a/core/src/main/java/org/apache/calcite/interpreter/JaninoRexCompiler.java b/core/src/main/java/org/apache/calcite/interpreter/JaninoRexCompiler.java
index e7d02c1..8f24445 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/JaninoRexCompiler.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/JaninoRexCompiler.java
@@ -16,11 +16,13 @@
  */
 package org.apache.calcite.interpreter;
 
+import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.enumerable.JavaRowFormat;
 import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
 import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
 import org.apache.calcite.config.CalciteSystemProperty;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.linq4j.function.Function1;
 import org.apache.calcite.linq4j.tree.BlockBuilder;
 import org.apache.calcite.linq4j.tree.BlockStatement;
@@ -29,6 +31,7 @@ import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.linq4j.tree.Expressions;
 import org.apache.calcite.linq4j.tree.MemberDeclaration;
 import org.apache.calcite.linq4j.tree.ParameterExpression;
+import org.apache.calcite.linq4j.tree.Statement;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
@@ -64,7 +67,8 @@ public class JaninoRexCompiler implements Interpreter.ScalarCompiler {
     this.rexBuilder = rexBuilder;
   }
 
-  @Override public Scalar compile(List<RexNode> nodes, RelDataType inputRowType) {
+  @Override public Scalar.Producer compile(List<RexNode> nodes,
+      RelDataType inputRowType) {
     final RexProgramBuilder programBuilder =
         new RexProgramBuilder(inputRowType, rexBuilder);
     for (RexNode node : nodes) {
@@ -72,7 +76,8 @@ public class JaninoRexCompiler implements Interpreter.ScalarCompiler {
     }
     final RexProgram program = programBuilder.getProgram();
 
-    final BlockBuilder builder = new BlockBuilder();
+    final BlockBuilder list = new BlockBuilder();
+    final BlockBuilder staticList = new BlockBuilder().withRemoveUnused(false);
     final ParameterExpression context_ =
         Expressions.parameter(Context.class, "context");
     final ParameterExpression outputValues_ =
@@ -94,29 +99,67 @@ public class JaninoRexCompiler implements Interpreter.ScalarCompiler {
         Expressions.field(context_, BuiltInMethod.CONTEXT_ROOT.field);
     final SqlConformance conformance =
         SqlConformanceEnum.DEFAULT; // TODO: get this from implementor
-    final List<Expression> list =
+    final List<Expression> expressionList =
         RexToLixTranslator.translateProjects(program, javaTypeFactory,
-            conformance, builder, null, root, inputGetter, correlates);
-    for (int i = 0; i < list.size(); i++) {
-      builder.add(
-          Expressions.statement(
-              Expressions.assign(
-                  Expressions.arrayIndex(outputValues_,
-                      Expressions.constant(i)),
-                  list.get(i))));
-    }
-    return baz(context_, outputValues_, builder.toBlock());
+            conformance, list, staticList, null, root, inputGetter, correlates);
+    Ord.forEach(expressionList, (expression, i) ->
+        list.add(
+            Expressions.statement(
+                Expressions.assign(
+                    Expressions.arrayIndex(outputValues_,
+                        Expressions.constant(i)),
+                    expression))));
+    return baz(context_, outputValues_, list.toBlock(),
+        staticList.toBlock().statements);
   }
 
   /** Given a method that implements {@link Scalar#execute(Context, Object[])},
    * adds a bridge method that implements {@link Scalar#execute(Context)}, and
    * compiles. */
-  static Scalar baz(ParameterExpression context_,
-      ParameterExpression outputValues_, BlockStatement block) {
+  static Scalar.Producer baz(ParameterExpression context_,
+      ParameterExpression outputValues_, BlockStatement block,
+      List<Statement> declList) {
     final List<MemberDeclaration> declarations = new ArrayList<>();
+    final List<MemberDeclaration> innerDeclarations = new ArrayList<>();
 
-    // public void execute(Context, Object[] outputValues)
+    // public Scalar apply(DataContext root) {
+    //   <<staticList>>
+    //   return new Scalar() {
+    //     <<inner declarations>>
+    //   };
+    // }
+    final List<Statement> statements = new ArrayList<>(declList);
+    statements.add(
+        Expressions.return_(null,
+            Expressions.new_(Scalar.class, ImmutableList.of(),
+                innerDeclarations)));
+    declarations.add(
+        Expressions.methodDecl(Modifier.PUBLIC, Scalar.class,
+            BuiltInMethod.FUNCTION_APPLY.method.getName(),
+            ImmutableList.of(DataContext.ROOT),
+            Expressions.block(statements)));
+
+    // (bridge method)
+    // public Object apply(Object root) {
+    //   return this.apply((DataContext) root);
+    // }
+    final ParameterExpression objectRoot =
+        Expressions.parameter(Object.class, "root");
     declarations.add(
+        Expressions.methodDecl(Modifier.PUBLIC, Object.class,
+            BuiltInMethod.FUNCTION_APPLY.method.getName(),
+            ImmutableList.of(
+                objectRoot),
+            Expressions.block(
+                Expressions.return_(null,
+                    Expressions.call(
+                        Expressions.parameter(Scalar.Producer.class, "this"),
+                        BuiltInMethod.FUNCTION_APPLY.method,
+                        Expressions.convert_(objectRoot,
+                            DataContext.class))))));
+
+    // public void execute(Context, Object[] outputValues)
+    innerDeclarations.add(
         Expressions.methodDecl(Modifier.PUBLIC, void.class,
             BuiltInMethod.SCALAR_EXECUTE2.method.getName(),
             ImmutableList.of(context_, outputValues_), block));
@@ -134,14 +177,14 @@ public class JaninoRexCompiler implements Interpreter.ScalarCompiler {
     builder.add(
         Expressions.return_(null,
             Expressions.arrayIndex(values_, Expressions.constant(0))));
-    declarations.add(
+    innerDeclarations.add(
         Expressions.methodDecl(Modifier.PUBLIC, Object.class,
             BuiltInMethod.SCALAR_EXECUTE1.method.getName(),
             ImmutableList.of(context_), builder.toBlock()));
 
     final ClassDeclaration classDeclaration =
         Expressions.classDecl(Modifier.PUBLIC, "Buzz", null,
-            ImmutableList.of(Scalar.class), declarations);
+            ImmutableList.of(Scalar.Producer.class), declarations);
     String s = Expressions.toString(declarations, "\n", false);
     if (CalciteSystemProperty.DEBUG.value()) {
       Util.debugCode(System.out, s);
@@ -153,7 +196,7 @@ public class JaninoRexCompiler implements Interpreter.ScalarCompiler {
     }
   }
 
-  static Scalar getScalar(ClassDeclaration expr, String s)
+  static Scalar.Producer getScalar(ClassDeclaration expr, String s)
       throws CompileException, IOException {
     ICompilerFactory compilerFactory;
     try {
@@ -164,12 +207,12 @@ public class JaninoRexCompiler implements Interpreter.ScalarCompiler {
     }
     IClassBodyEvaluator cbe = compilerFactory.newClassBodyEvaluator();
     cbe.setClassName(expr.name);
-    cbe.setImplementedInterfaces(new Class[]{Scalar.class});
+    cbe.setImplementedInterfaces(new Class[] {Scalar.Producer.class});
     cbe.setParentClassLoader(JaninoRexCompiler.class.getClassLoader());
     if (CalciteSystemProperty.DEBUG.value()) {
       // Add line numbers to the generated janino class
       cbe.setDebuggingInformation(true, true, true);
     }
-    return (Scalar) cbe.createInstance(new StringReader(s));
+    return (Scalar.Producer) cbe.createInstance(new StringReader(s));
   }
 }
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Scalar.java b/core/src/main/java/org/apache/calcite/interpreter/Scalar.java
index 9146b39..a1beda0 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Scalar.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Scalar.java
@@ -16,12 +16,22 @@
  */
 package org.apache.calcite.interpreter;
 
+import org.apache.calcite.DataContext;
+
 import org.checkerframework.checker.nullness.qual.Nullable;
 
+import java.util.function.Function;
+
 /**
  * Compiled scalar expression.
  */
 public interface Scalar {
   @Nullable Object execute(Context context);
   void execute(Context context, @Nullable Object[] results);
+
+  /** Produces a {@link Scalar} when a query is executed.
+   *
+   * <p>Call {@code producer.apply(DataContext)} to get a Scalar. */
+  interface Producer extends Function<DataContext, Scalar> {
+  }
 }
diff --git a/core/src/main/java/org/apache/calcite/interpreter/TableFunctionScanNode.java b/core/src/main/java/org/apache/calcite/interpreter/TableFunctionScanNode.java
index 395cd57..749040e 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/TableFunctionScanNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/TableFunctionScanNode.java
@@ -18,10 +18,11 @@ package org.apache.calcite.interpreter;
 
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.function.Function1;
 import org.apache.calcite.rel.core.TableFunctionScan;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.runtime.Enumerables;
 import org.apache.calcite.schema.Function;
 import org.apache.calcite.schema.impl.TableFunctionImpl;
 import org.apache.calcite.sql.SqlOperator;
@@ -29,6 +30,8 @@ import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction;
 
 import com.google.common.collect.ImmutableList;
 
+import org.checkerframework.checker.nullness.qual.Nullable;
+
 /**
  * Interpreter node that implements a
  * {@link TableFunctionScan}.
@@ -37,21 +40,28 @@ public class TableFunctionScanNode implements Node {
   private final Scalar scalar;
   private final Context context;
   private final Sink sink;
+  private final Function1<?, Row> mapFn;
 
   private TableFunctionScanNode(Compiler compiler, TableFunctionScan rel) {
-    this.scalar =
-        compiler.compile(ImmutableList.of(rel.getCall()), rel.getRowType());
+    final RelDataType rowType = rel.getRowType();
+    this.scalar = compiler.compile(ImmutableList.of(rel.getCall()), rowType);
     this.context = compiler.createContext();
     this.sink = compiler.sink(rel);
+    if (rowType.getFieldCount() == 1
+        && rel.getElementType() != Object[].class) {
+      this.mapFn = (Function1<Object, Row>) Row::of;
+    } else {
+      this.mapFn = (Function1<@Nullable Object[], Row>) Row::asCopy;
+    }
   }
 
   @Override public void run() throws InterruptedException {
     final Object o = scalar.execute(context);
     if (o instanceof Enumerable) {
-      @SuppressWarnings("unchecked") final Enumerable<Row> rowEnumerable =
-          Enumerables.toRow((Enumerable) o);
-      final Enumerator<Row> enumerator = rowEnumerable.enumerator();
-      while (enumerator.moveNext()) {
+      for (@SuppressWarnings({"unchecked", "rawtypes"})
+           final Enumerator<Row> enumerator =
+           ((Enumerable) o).select(mapFn).enumerator();
+           enumerator.moveNext();) {
         sink.send(enumerator.current());
       }
     }
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandWithinDistinctRule.java b/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandWithinDistinctRule.java
index 51ef63d..91cca97 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandWithinDistinctRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandWithinDistinctRule.java
@@ -290,7 +290,7 @@ public class AggregateExpandWithinDistinctRule
     //
     // or in algebra,
     //
-    //   Aggregate($0, SUM($2 WHERE $4 = 0), SUM($3 WHERE $4 = 0))
+    //   Aggregate($0, SUM($2 WHERE $4 = 0), SUM($3 WHERE $4 = 1))
     //     Aggregate([($0), ($0, $2)], SUM($2), MIN($2), GROUPING($0, $4))
     //       Scan(emp)
     //
diff --git a/core/src/main/java/org/apache/calcite/rex/RexCallBinding.java b/core/src/main/java/org/apache/calcite/rex/RexCallBinding.java
index 1e3ccdf..a4cd8b1 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexCallBinding.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexCallBinding.java
@@ -62,15 +62,24 @@ public class RexCallBinding extends SqlOperatorBinding {
   public static RexCallBinding create(RelDataTypeFactory typeFactory,
       RexCall call,
       List<RelCollation> inputCollations) {
+    return create(typeFactory, call, null, inputCollations);
+  }
+
+  /** Creates a binding of the appropriate type, optionally with a program. */
+  public static RexCallBinding create(RelDataTypeFactory typeFactory,
+      RexCall call, @Nullable RexProgram program,
+      List<RelCollation> inputCollations) {
+    final List<RexNode> operands =
+        program != null ? program.expandList(call.getOperands())
+            : call.getOperands();
     switch (call.getKind()) {
     case CAST:
       return new RexCastCallBinding(typeFactory, call.getOperator(),
-          call.getOperands(), call.getType(), inputCollations);
+          operands, call.getType(), inputCollations);
     default:
-      break;
+      return new RexCallBinding(typeFactory, call.getOperator(),
+          operands, inputCollations);
     }
-    return new RexCallBinding(typeFactory, call.getOperator(),
-        call.getOperands(), inputCollations);
   }
 
   //~ Methods ----------------------------------------------------------------
@@ -150,8 +159,7 @@ public class RexCallBinding extends SqlOperatorBinding {
   private static class RexCastCallBinding extends RexCallBinding {
     private final RelDataType type;
 
-    RexCastCallBinding(
-        RelDataTypeFactory typeFactory,
+    RexCastCallBinding(RelDataTypeFactory typeFactory,
         SqlOperator sqlOperator, List<? extends RexNode> operands,
         RelDataType type,
         List<RelCollation> inputCollations) {
diff --git a/core/src/main/java/org/apache/calcite/rex/RexExecutorImpl.java b/core/src/main/java/org/apache/calcite/rex/RexExecutorImpl.java
index 9f9cf3f..d0ea922 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexExecutorImpl.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexExecutorImpl.java
@@ -92,7 +92,7 @@ public class RexExecutorImpl implements RexExecutor {
     final RexProgram program = programBuilder.getProgram();
     final List<Expression> expressions =
         RexToLixTranslator.translateProjects(program, javaTypeFactory,
-            conformance, blockBuilder, null, root_, getter, null);
+            conformance, blockBuilder, null, null, root_, getter, null);
     blockBuilder.add(
         Expressions.return_(null,
             Expressions.newArrayInit(Object[].class, expressions)));
diff --git a/core/src/main/java/org/apache/calcite/rex/RexProgram.java b/core/src/main/java/org/apache/calcite/rex/RexProgram.java
index bd0b511..511e49f 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexProgram.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexProgram.java
@@ -549,6 +549,11 @@ public class RexProgram {
     return ref.accept(new ExpansionShuttle(exprs));
   }
 
+  /** Expands a list of expressions that may contain {@link RexLocalRef}s. */
+  public List<RexNode> expandList(List<? extends RexNode> nodes) {
+    return new ExpansionShuttle(exprs).visitList(nodes);
+  }
+
   /** Splits this program into a list of project expressions and a list of
    * filter expressions.
    *
diff --git a/core/src/main/java/org/apache/calcite/runtime/Enumerables.java b/core/src/main/java/org/apache/calcite/runtime/Enumerables.java
index f8d4999..07ede2a 100644
--- a/core/src/main/java/org/apache/calcite/runtime/Enumerables.java
+++ b/core/src/main/java/org/apache/calcite/runtime/Enumerables.java
@@ -48,7 +48,6 @@ public class Enumerables {
   /** Converts an enumerable over singleton arrays into the enumerable of their
    * first elements. */
   public static <E> Enumerable<E> slice0(Enumerable<E[]> enumerable) {
-    //noinspection unchecked
     return enumerable.select(elements -> elements[0]);
   }
 
diff --git a/core/src/main/java/org/apache/calcite/runtime/FunctionContexts.java b/core/src/main/java/org/apache/calcite/runtime/FunctionContexts.java
new file mode 100644
index 0000000..e975b69
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/runtime/FunctionContexts.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.runtime;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.rel.metadata.NullSentinel;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.FunctionContext;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Runtime support for {@link org.apache.calcite.schema.FunctionContext}.
+ */
+public class FunctionContexts {
+  private FunctionContexts() {
+  }
+
+  /** Creates a FunctionContext. */
+  public static FunctionContext of(DataContext root, Object[] argumentValues) {
+    return new FunctionContextImpl(root, argumentValues);
+  }
+
+  /** Implementation of {@link org.apache.calcite.schema.FunctionContext}. */
+  private static class FunctionContextImpl implements FunctionContext {
+    private final DataContext root;
+    private final @Nullable Object[] argumentValues;
+
+    FunctionContextImpl(DataContext root, @Nullable Object[] argumentValues) {
+      this.root = root;
+      this.argumentValues = argumentValues;
+    }
+
+    @Override public RelDataTypeFactory getTypeFactory() {
+      return root.getTypeFactory();
+    }
+
+    @Override public int getParameterCount() {
+      return argumentValues.length;
+    }
+
+    @Override public boolean isArgumentConstant(int ordinal) {
+      return argumentValue(ordinal) != null;
+    }
+
+    private @Nullable Object argumentValue(int ordinal) {
+      if (ordinal < 0 || ordinal >= argumentValues.length) {
+        throw new IndexOutOfBoundsException("argument ordinal " + ordinal
+            + " is out of range");
+      }
+      return argumentValues[ordinal];
+    }
+
+    @Override public <V> @Nullable V getArgumentValueAs(int ordinal,
+        Class<V> valueClass) {
+      final Object v = argumentValue(ordinal);
+      if (v == null) {
+        throw new IllegalArgumentException("value of argument " + ordinal
+            + " is not constant");
+      }
+      if (v == NullSentinel.INSTANCE) {
+        return null; // value is constant NULL
+      }
+      if (valueClass == String.class
+          && !(v instanceof String)) {
+        return valueClass.cast(v.toString());
+      }
+      return valueClass.cast(v);
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/calcite/schema/FunctionContext.java b/core/src/main/java/org/apache/calcite/schema/FunctionContext.java
new file mode 100644
index 0000000..172e2e9
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/schema/FunctionContext.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.schema;
+
+import org.apache.calcite.linq4j.function.Experimental;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Information about a function call that is passed to the constructor of a
+ * function instance.
+ *
+ * <p>This may enable a function to perform work up front, during construction,
+ * rather than each time it is invoked. Here is an example of such a function:
+ *
+ * <blockquote><pre>
+ * class RegexMatchesFunction {
+ *   final java.util.regex.Pattern pattern;
+ *
+ *   public RegexMatchesFunction(FunctionContext cx) {
+ *     String pattern = cx.argumentValueAs(String.class);
+ *     this.compiledPattern = java.util.regex.Pattern.compile(pattern);
+ *   }
+ *
+ *   public boolean eval(String pattern, String s) {
+ *     return this.compiledPattern.matches(s);
+ *   }
+ * }
+ * </pre></blockquote>
+ *
+ * <p>Register it in the model as follows:
+ *
+ * <blockquote><pre>
+ *   functions: [
+ *     {
+ *       name: 'REGEX_MATCHES',
+ *       className: 'com.example.RegexMatchesFun'
+ *     }
+ *   ]
+ * </pre></blockquote>
+ *
+ * <p>and use it in a query:
+ *
+ * <blockquote><pre>
+ * SELECT empno, ename
+ * FROM Emp
+ * WHERE regex_matches('J.*ES', ename);
+ *
+ * +-------+--------+
+ * | EMPNO | ENAME  |
+ * +-------+--------+
+ * | 7900  | JAMES  |
+ * | 7566  | JONES  |
+ * +-------+--------+
+ * </pre></blockquote>
+ *
+ * <p>When executing the query, Calcite will create an instance of
+ * {@code RegexMatchesFunction} and call the {@code eval} method on that
+ * instance once per row.
+ *
+ * <p>If the {@code eval} method was static, or if the function's
+ * constructor had zero parameters, the {@code eval} method would have to call
+ * {@code java.util.regex.Pattern.compile(pattern)} to compile the pattern each
+ * time.
+ *
+ * <p>This interface is marked {@link Experimental}, which means that we may
+ * change or remove methods, or the entire interface, without notice. But
+ * probably we will add methods over time, which will just your UDFs more
+ * information to work with.
+ */
+@Experimental
+public interface FunctionContext {
+  /** Returns the type factory. */
+  RelDataTypeFactory getTypeFactory();
+
+  /** Returns the number of parameters. */
+  int getParameterCount();
+
+  /** Returns whether the value of an argument is constant.
+   *
+   * @param ordinal Argument ordinal, starting from 0 */
+  boolean isArgumentConstant(int ordinal);
+
+  /** Returns the value of an argument to this function,
+   * null if the argument is the NULL literal.
+   *
+   * @param ordinal Argument ordinal, starting from 0
+   * @param valueClass Type of value
+   *
+   * @throws ClassCastException if argument cannot be converted to
+   * {@code valueClass}
+   *
+   * @throws IllegalArgumentException if argument is not constant */
+  <V> @Nullable V getArgumentValueAs(int ordinal, Class<V> valueClass);
+}
diff --git a/core/src/main/java/org/apache/calcite/schema/impl/ReflectiveFunctionBase.java b/core/src/main/java/org/apache/calcite/schema/impl/ReflectiveFunctionBase.java
index 3aa4879..71573e0 100644
--- a/core/src/main/java/org/apache/calcite/schema/impl/ReflectiveFunctionBase.java
+++ b/core/src/main/java/org/apache/calcite/schema/impl/ReflectiveFunctionBase.java
@@ -19,6 +19,7 @@ package org.apache.calcite.schema.impl;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.FunctionContext;
 import org.apache.calcite.schema.FunctionParameter;
 import org.apache.calcite.util.ReflectUtil;
 
@@ -63,9 +64,10 @@ public abstract class ReflectiveFunctionBase implements Function {
   }
 
   /**
-   * Verifies if given class has public constructor with zero arguments.
-   * @param clazz class to verify
-   * @return true if given class has public constructor with zero arguments
+   * Returns whether a class has a public constructor with zero arguments.
+   *
+   * @param clazz Class to verify
+   * @return whether class has a public constructor with zero arguments
    */
   static boolean classHasPublicZeroArgsConstructor(Class<?> clazz) {
     for (Constructor<?> constructor : clazz.getConstructors()) {
@@ -78,6 +80,25 @@ public abstract class ReflectiveFunctionBase implements Function {
   }
 
   /**
+   * Returns whether a class has a public constructor with one argument
+   * of type {@link FunctionContext}.
+   *
+   * @param clazz Class to verify
+   * @return whether class has a public constructor with one FunctionContext
+   * argument
+   */
+  static boolean classHasPublicFunctionContextConstructor(Class<?> clazz) {
+    for (Constructor<?> constructor : clazz.getConstructors()) {
+      if (constructor.getParameterTypes().length == 1
+          && constructor.getParameterTypes()[0] == FunctionContext.class
+          && Modifier.isPublic(constructor.getModifiers())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * Finds a method in a given class by name.
    * @param clazz class to search method in
    * @param name name of the method to find
diff --git a/core/src/main/java/org/apache/calcite/schema/impl/ScalarFunctionImpl.java b/core/src/main/java/org/apache/calcite/schema/impl/ScalarFunctionImpl.java
index 162a102..87b7011 100644
--- a/core/src/main/java/org/apache/calcite/schema/impl/ScalarFunctionImpl.java
+++ b/core/src/main/java/org/apache/calcite/schema/impl/ScalarFunctionImpl.java
@@ -130,8 +130,9 @@ public class ScalarFunctionImpl extends ReflectiveFunctionBase
    */
   public static ScalarFunction create(Method method) {
     if (!Modifier.isStatic(method.getModifiers())) {
-      Class clazz = method.getDeclaringClass();
-      if (!classHasPublicZeroArgsConstructor(clazz)) {
+      Class<?> clazz = method.getDeclaringClass();
+      if (!classHasPublicZeroArgsConstructor(clazz)
+          && !classHasPublicFunctionContextConstructor(clazz)) {
         throw RESOURCE.requireDefaultConstructor(clazz.getName()).ex();
       }
     }
diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
index 60b251c..8a8e175 100644
--- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
+++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
@@ -86,6 +86,7 @@ import org.apache.calcite.runtime.Bindable;
 import org.apache.calcite.runtime.CompressionFunctions;
 import org.apache.calcite.runtime.Enumerables;
 import org.apache.calcite.runtime.FlatLists;
+import org.apache.calcite.runtime.FunctionContexts;
 import org.apache.calcite.runtime.GeoFunctions;
 import org.apache.calcite.runtime.JsonFunctions;
 import org.apache.calcite.runtime.Matcher;
@@ -135,6 +136,7 @@ import java.util.Objects;
 import java.util.TimeZone;
 import java.util.function.BiPredicate;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import javax.sql.DataSource;
 
@@ -286,6 +288,7 @@ public enum BuiltInMethod {
   ENUMERABLE_ENUMERATOR(Enumerable.class, "enumerator"),
   ENUMERABLE_FOREACH(Enumerable.class, "foreach", Function1.class),
   ITERABLE_FOR_EACH(Iterable.class, "forEach", Consumer.class),
+  FUNCTION_APPLY(Function.class, "apply", Object.class),
   PREDICATE_TEST(Predicate.class, "test", Object.class),
   BI_PREDICATE_TEST(BiPredicate.class, "test", Object.class, Object.class),
   CONSUMER_ACCEPT(Consumer.class, "accept", Object.class),
@@ -604,6 +607,8 @@ public enum BuiltInMethod {
   SCALAR_EXECUTE2(Scalar.class, "execute", Context.class, Object[].class),
   CONTEXT_VALUES(Context.class, "values", true),
   CONTEXT_ROOT(Context.class, "root", true),
+  FUNCTION_CONTEXTS_OF(FunctionContexts.class, "of", DataContext.class,
+      Object[].class),
   DATA_CONTEXT_GET_QUERY_PROVIDER(DataContext.class, "getQueryProvider"),
   METADATA_REL(Metadata.class, "rel"),
   STRUCT_ACCESS(SqlFunctions.class, "structAccess", Object.class, int.class,
diff --git a/core/src/main/java/org/apache/calcite/util/Util.java b/core/src/main/java/org/apache/calcite/util/Util.java
index 5f77a4d..cd4fc0f 100644
--- a/core/src/main/java/org/apache/calcite/util/Util.java
+++ b/core/src/main/java/org/apache/calcite/util/Util.java
@@ -63,6 +63,7 @@ import java.io.PrintWriter;
 import java.io.Reader;
 import java.io.StringReader;
 import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.io.Writer;
 import java.lang.reflect.Array;
 import java.lang.reflect.Field;
@@ -978,6 +979,9 @@ public class Util {
     if (e instanceof RuntimeException) {
       return (RuntimeException) e;
     }
+    if (e instanceof IOException) {
+      return new UncheckedIOException((IOException) e);
+    }
     return new RuntimeException(e);
   }
 
diff --git a/core/src/test/java/org/apache/calcite/test/InterpreterTest.java b/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
index 4c3ade9..16d590e 100644
--- a/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
@@ -30,9 +30,11 @@ import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.rules.CoreRules;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.schema.ScalarFunction;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.TableFunction;
 import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
 import org.apache.calcite.schema.impl.TableFunctionImpl;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.parser.SqlParseException;
@@ -54,6 +56,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
@@ -66,15 +69,15 @@ import static org.hamcrest.core.Is.is;
  */
 class InterpreterTest {
   private SchemaPlus rootSchema;
-  private Planner planner;
-  private MyDataContext dataContext;
 
   /** Implementation of {@link DataContext} for executing queries without a
    * connection. */
-  private class MyDataContext implements DataContext {
+  private static class MyDataContext implements DataContext {
+    private SchemaPlus rootSchema;
     private final Planner planner;
 
-    MyDataContext(Planner planner) {
+    MyDataContext(SchemaPlus rootSchema, Planner planner) {
+      this.rootSchema = rootSchema;
       this.planner = planner;
     }
 
@@ -98,21 +101,18 @@ class InterpreterTest {
   /** Fluent class that contains information necessary to run a test. */
   private static class Sql {
     private final String sql;
-    private final MyDataContext dataContext;
-    private final Planner planner;
+    private final SchemaPlus rootSchema;
     private final boolean project;
 
-    Sql(String sql, MyDataContext dataContext, Planner planner,
-        boolean project) {
+    Sql(String sql, SchemaPlus rootSchema, boolean project) {
       this.sql = sql;
-      this.dataContext = dataContext;
-      this.planner = planner;
+      this.rootSchema = rootSchema;
       this.project = project;
     }
 
     @SuppressWarnings("SameParameterValue")
     Sql withProject(boolean project) {
-      return new Sql(sql, dataContext, planner, project);
+      return new Sql(sql, rootSchema, project);
     }
 
     /** Interprets the sql and checks result with specified rows, ordered. */
@@ -127,13 +127,25 @@ class InterpreterTest {
       return returnsRows(true, rows);
     }
 
+    private Planner createPlanner() {
+      final FrameworkConfig config = Frameworks.newConfigBuilder()
+          .parserConfig(SqlParser.Config.DEFAULT)
+          .defaultSchema(
+              CalciteAssert.addSchema(rootSchema,
+                  CalciteAssert.SchemaSpec.JDBC_SCOTT,
+                  CalciteAssert.SchemaSpec.HR))
+          .build();
+      return Frameworks.getPlanner(config);
+    }
+
     /** Interprets the sql and checks result with specified rows. */
     private Sql returnsRows(boolean unordered, String[] rows) {
-      try {
+      try (Planner planner = createPlanner()) {
         SqlNode parse = planner.parse(sql);
         SqlNode validate = planner.validate(parse);
         final RelRoot root = planner.rel(validate);
         RelNode convert = project ? root.project() : root.rel;
+        final MyDataContext dataContext = new MyDataContext(rootSchema, planner);
         assertInterpret(convert, dataContext, unordered, rows);
         return this;
       } catch (ValidationException
@@ -146,20 +158,11 @@ class InterpreterTest {
 
   /** Creates a {@link Sql}. */
   private Sql sql(String sql) {
-    return new Sql(sql, dataContext, planner, false);
+    return new Sql(sql, rootSchema, false);
   }
 
   private void reset() {
     rootSchema = Frameworks.createRootSchema(true);
-    final FrameworkConfig config = Frameworks.newConfigBuilder()
-        .parserConfig(SqlParser.Config.DEFAULT)
-        .defaultSchema(
-            CalciteAssert.addSchema(rootSchema,
-                CalciteAssert.SchemaSpec.JDBC_SCOTT,
-                CalciteAssert.SchemaSpec.HR))
-        .build();
-    planner = Frameworks.getPlanner(config);
-    dataContext = new MyDataContext(planner);
   }
 
   @BeforeEach public void setUp() {
@@ -168,8 +171,6 @@ class InterpreterTest {
 
   @AfterEach public void tearDown() {
     rootSchema = null;
-    planner = null;
-    dataContext = null;
   }
 
   /** Tests executing a simple plan using an interpreter. */
@@ -466,7 +467,7 @@ class InterpreterTest {
     final String sql = "select x, y from (values (1, 'a'), (2, 'b'), (3, 'c')) as t(x, y)\n"
         + "where x in\n"
         + "(select x from (values (1, 'd'), (3, 'g')) as t2(x, y))";
-    try {
+    try (Planner planner = sql(sql).createPlanner()) {
       SqlNode validate = planner.validate(planner.parse(sql));
       RelNode convert = planner.rel(validate).rel;
       final HepProgram program = new HepProgramBuilder()
@@ -475,6 +476,7 @@ class InterpreterTest {
       final HepPlanner hepPlanner = new HepPlanner(program);
       hepPlanner.setRoot(convert);
       final RelNode relNode = hepPlanner.findBestExp();
+      final MyDataContext dataContext = new MyDataContext(rootSchema, planner);
       assertInterpret(relNode, dataContext, true, "[1, a]", "[3, c]");
     } catch (ValidationException
         | SqlParseException
@@ -564,6 +566,56 @@ class InterpreterTest {
             "[7900, 1981-12-03]", "[7902, 1981-12-03]", "[7934, 1982-01-23]");
   }
 
+  /** Tests a user-defined scalar function that is non-static. */
+  @Test void testInterpretFunction() {
+    SchemaPlus schema = rootSchema.add("s", new AbstractSchema());
+    final ScalarFunction f =
+        ScalarFunctionImpl.create(Smalls.MY_PLUS_EVAL_METHOD);
+    schema.add("myPlus", f);
+    final String sql = "select x, \"s\".\"myPlus\"(x, 1)\n"
+        + "from (values (2), (4), (7)) as t (x)";
+    String[] rows = {"[2, 3]", "[4, 5]", "[7, 8]"};
+    final int n = Smalls.MyPlusFunction.INSTANCE_COUNT.get().get();
+    sql(sql).returnsRows(rows);
+    final int n2 = Smalls.MyPlusFunction.INSTANCE_COUNT.get().get();
+    assertThat(n2, is(n + 1)); // instantiated once per run, not once per row
+  }
+
+  /** Tests a user-defined scalar function that is non-static and has a
+   * constructor that uses
+   * {@link org.apache.calcite.schema.FunctionContext}. */
+  @Test void testInterpretFunctionWithInitializer() {
+    SchemaPlus schema = rootSchema.add("s", new AbstractSchema());
+    final ScalarFunction f =
+        ScalarFunctionImpl.create(Smalls.MY_PLUS_INIT_EVAL_METHOD);
+    schema.add("myPlusInit", f);
+    final String sql = "select x, \"s\".\"myPlusInit\"(x, 1)\n"
+        + "from (values (2), (4), (7)) as t (x)";
+    String[] rows = {"[2, 3]", "[4, 5]", "[7, 8]"};
+    final int n = Smalls.MyPlusInitFunction.INSTANCE_COUNT.get().get();
+    sql(sql).returnsRows(rows);
+    final int n2 = Smalls.MyPlusInitFunction.INSTANCE_COUNT.get().get();
+    assertThat(n2, is(n + 1)); // instantiated once per run, not once per row
+    final String digest = Smalls.MyPlusInitFunction.THREAD_DIGEST.get();
+    String expectedDigest = "parameterCount=2; "
+        + "argument 0 is not constant; "
+        + "argument 1 is constant and has value 1";
+    assertThat(digest, is(expectedDigest));
+
+    // Similar, but replace '1' with the expression '3 - 2'
+    final String sql2 = "select x, \"s\".\"myPlusInit\"(x, 3 - 2)\n"
+        + "from (values (2), (4), (7)) as t (x)";
+    String[] rows2 = {"[2, 102]", "[4, 104]", "[7, 107]"};
+    sql(sql2).returnsRows(rows2);
+    final int n3 = Smalls.MyPlusInitFunction.INSTANCE_COUNT.get().get();
+    assertThat(n3, is(n2 + 1)); // instantiated once per run, not once per row
+    final String digest2 = Smalls.MyPlusInitFunction.THREAD_DIGEST.get();
+    String expectedDigest2 = "parameterCount=2; "
+        + "argument 0 is not constant; "
+        + "argument 1 is not constant";
+    assertThat(digest2, is(expectedDigest2));
+  }
+
   /** Tests a table function. */
   @Test void testInterpretTableFunction() {
     SchemaPlus schema = rootSchema.add("s", new AbstractSchema());
@@ -610,4 +662,16 @@ class InterpreterTest {
         + "where \"i\" < 0 and \"d\" is not null";
     sql(sql).returnsRows();
   }
+
+  /** Tests a table function that is a non-static class. */
+  @Test void testInterpretNonStaticTableFunction() {
+    SchemaPlus schema = rootSchema.add("s", new AbstractSchema());
+    final TableFunction tableFunction =
+        Objects.requireNonNull(
+            TableFunctionImpl.create(Smalls.MyTableFunction.class));
+    schema.add("t", tableFunction);
+    final String sql = "select *\n"
+        + "from table(\"s\".\"t\"('=100='))";
+    sql(sql).returnsRows("[1]", "[3]", "[100]");
+  }
 }
diff --git a/core/src/test/java/org/apache/calcite/test/UdfTest.java b/core/src/test/java/org/apache/calcite/test/UdfTest.java
index 987e85d..9fee555 100644
--- a/core/src/test/java/org/apache/calcite/test/UdfTest.java
+++ b/core/src/test/java/org/apache/calcite/test/UdfTest.java
@@ -185,7 +185,7 @@ class UdfTest {
   /** Tests a user-defined function that is defined in terms of a class with
    * non-static methods. */
   @Disabled("[CALCITE-1561] Intermittent test failures")
-  @Test void testUserDefinedFunction() throws Exception {
+  @Test void testUserDefinedFunction() {
     final String sql = "select \"adhoc\".my_plus(\"deptno\", 100) as p\n"
         + "from \"adhoc\".EMPLOYEES";
     final AtomicInteger c = Smalls.MyPlusFunction.INSTANCE_COUNT.get();
@@ -202,7 +202,7 @@ class UdfTest {
    * instantiated exactly once, per
    * <a href="https://issues.apache.org/jira/browse/CALCITE-1548">[CALCITE-1548]
    * Instantiate function objects once per query</a>. */
-  @Test void testUserDefinedFunctionInstanceCount() throws Exception {
+  @Test void testUserDefinedFunctionInstanceCount() {
     final String sql = "select \"adhoc\".my_det_plus(\"deptno\", 100) as p\n"
         + "from \"adhoc\".EMPLOYEES";
     final AtomicInteger c = Smalls.MyDeterministicPlusFunction.INSTANCE_COUNT.get();
@@ -215,7 +215,7 @@ class UdfTest {
     assertThat(after, is(before + 1));
   }
 
-  @Test void testUserDefinedFunctionB() throws Exception {
+  @Test void testUserDefinedFunctionB() {
     final String sql = "select \"adhoc\".my_double(\"deptno\") as p\n"
         + "from \"adhoc\".EMPLOYEES";
     final String expected = "P=20\n"
@@ -225,7 +225,7 @@ class UdfTest {
     withUdf().query(sql).returns(expected);
   }
 
-  @Test void testUserDefinedFunctionWithNull() throws Exception {
+  @Test void testUserDefinedFunctionWithNull() {
     final String sql = "select \"adhoc\".my_det_plus(\"deptno\", 1 + null) as p\n"
         + "from \"adhoc\".EMPLOYEES where 1 > 0 or nullif(null, 1) is null";
     final AtomicInteger c = Smalls.MyDeterministicPlusFunction.INSTANCE_COUNT.get();
@@ -243,7 +243,7 @@ class UdfTest {
   /** Test case for
    * <a href="https://issues.apache.org/jira/browse/CALCITE-3195">[CALCITE-3195]
    * Handle a UDF that throws checked exceptions in the Enumerable code generator</a>. */
-  @Test void testUserDefinedFunctionWithException() throws Exception {
+  @Test void testUserDefinedFunctionWithException() {
     final String sql1 = "select \"adhoc\".my_exception(\"deptno\") as p\n"
         + "from \"adhoc\".EMPLOYEES";
     final String expected1 = "P=20\n"
@@ -294,10 +294,14 @@ class UdfTest {
             ImmutableList.of("POST", "V_EMP"), null));
 
     final String result = ""
-        + "EMPLOYEE_ID=100; EMPLOYEE_NAME=Bill Bill; EMPLOYEE_SALARY=10000.0; INCREMENTED_SALARY=110.0\n"
-        + "EMPLOYEE_ID=200; EMPLOYEE_NAME=Eric Eric; EMPLOYEE_SALARY=8000.0; INCREMENTED_SALARY=220.0\n"
-        + "EMPLOYEE_ID=150; EMPLOYEE_NAME=Sebastian Sebastian; EMPLOYEE_SALARY=7000.0; INCREMENTED_SALARY=165.0\n"
-        + "EMPLOYEE_ID=110; EMPLOYEE_NAME=Theodore Theodore; EMPLOYEE_SALARY=11500.0; INCREMENTED_SALARY=121.0\n";
+        + "EMPLOYEE_ID=100; EMPLOYEE_NAME=Bill Bill;"
+        + " EMPLOYEE_SALARY=10000.0; INCREMENTED_SALARY=110.0\n"
+        + "EMPLOYEE_ID=200; EMPLOYEE_NAME=Eric Eric;"
+        + " EMPLOYEE_SALARY=8000.0; INCREMENTED_SALARY=220.0\n"
+        + "EMPLOYEE_ID=150; EMPLOYEE_NAME=Sebastian Sebastian;"
+        + " EMPLOYEE_SALARY=7000.0; INCREMENTED_SALARY=165.0\n"
+        + "EMPLOYEE_ID=110; EMPLOYEE_NAME=Theodore Theodore;"
+        + " EMPLOYEE_SALARY=11500.0; INCREMENTED_SALARY=121.0\n";
 
     Statement statement = connection.createStatement();
     ResultSet resultSet = statement.executeQuery(viewSql);
@@ -501,14 +505,16 @@ class UdfTest {
 
   /** Test for
    * {@link org.apache.calcite.runtime.CalciteResource#requireDefaultConstructor(String)}. */
-  @Test void testUserDefinedFunction2() throws Exception {
-    withBadUdf(Smalls.AwkwardFunction.class)
-        .connectThrows(
-            "Declaring class 'org.apache.calcite.util.Smalls$AwkwardFunction' of non-static user-defined function must have a public constructor with zero parameters");
+  @Test void testUserDefinedFunction2() {
+    String message = "Declaring class "
+        + "'org.apache.calcite.util.Smalls$AwkwardFunction' of non-static "
+        + "user-defined function must have a public constructor with zero "
+        + "parameters";
+    withBadUdf(Smalls.AwkwardFunction.class).connectThrows(message);
   }
 
   /** Tests user-defined function, with multiple methods per class. */
-  @Test void testUserDefinedFunctionWithMethodName() throws Exception {
+  @Test void testUserDefinedFunctionWithMethodName() {
     // java.lang.Math has abs(int) and abs(double).
     final CalciteAssert.AssertThat with = withUdf();
     with.query("values abs(-4)").returnsValue("4");
@@ -525,7 +531,7 @@ class UdfTest {
   }
 
   /** Tests user-defined aggregate function. */
-  @Test void testUserDefinedAggregateFunction() throws Exception {
+  @Test void testUserDefinedAggregateFunction() {
     final String empDept = JdbcTest.EmpDeptTableFactory.class.getName();
     final String sum = Smalls.MyStaticSumFunction.class.getName();
     final String sum2 = Smalls.MySumFunction.class.getName();
@@ -585,7 +591,7 @@ class UdfTest {
   }
 
   /** Tests user-defined aggregate function. */
-  @Test void testUserDefinedAggregateFunctionWithMultipleParameters() throws Exception {
+  @Test void testUserDefinedAggregateFunctionWithMultipleParameters() {
     final String empDept = JdbcTest.EmpDeptTableFactory.class.getName();
     final String sum21 = Smalls.MyTwoParamsSumFunctionFilter1.class.getName();
     final String sum22 = Smalls.MyTwoParamsSumFunctionFilter2.class.getName();
@@ -627,16 +633,18 @@ class UdfTest {
         + "}")
         .withDefaultSchema("adhoc");
     with.withDefaultSchema(null)
-        .query(
-            "select \"adhoc\".my_sum3(\"deptno\",\"name\",'Eric') as p from \"adhoc\".EMPLOYEES\n")
+        .query("select \"adhoc\".my_sum3(\"deptno\",\"name\",'Eric') as p\n"
+            + "from \"adhoc\".EMPLOYEES\n")
         .returns("P=20\n");
     with.query("select \"adhoc\".my_sum3(\"empid\",\"deptno\",\"commission\") as p "
         + "from \"adhoc\".EMPLOYEES\n")
         .returns("P=330\n");
-    with.query("select \"adhoc\".my_sum3(\"empid\",\"deptno\",\"commission\"),\"name\" as p "
+    with.query("select \"adhoc\".my_sum3(\"empid\",\"deptno\",\"commission\"),\n"
+        + "  \"name\"\n"
         + "from \"adhoc\".EMPLOYEES\n")
         .throws_("Expression 'name' is not being grouped");
-    with.query("select \"name\",\"adhoc\".my_sum3(\"empid\",\"deptno\",\"commission\") as p "
+    with.query("select \"name\",\n"
+        + "  \"adhoc\".my_sum3(\"empid\",\"deptno\",\"commission\") as p\n"
         + "from \"adhoc\".EMPLOYEES\n"
         + "group by \"name\"")
         .returnsUnordered("name=Theodore; P=0",
@@ -644,26 +652,31 @@ class UdfTest {
             "name=Bill; P=110",
             "name=Sebastian; P=0");
     // implicit type coercion.
-    with.query("select \"adhoc\".my_sum3(\"empid\",\"deptno\",\"salary\") as p "
+    with.query("select \"adhoc\".my_sum3(\"empid\",\"deptno\",\"salary\") as p\n"
         + "from \"adhoc\".EMPLOYEES\n");
-    with.query("select \"adhoc\".my_sum3(\"empid\",\"deptno\",\"name\") as p "
+    with.query("select \"adhoc\".my_sum3(\"empid\",\"deptno\",\"name\") as p\n"
         + "from \"adhoc\".EMPLOYEES\n");
-    with.query("select \"adhoc\".my_sum2(\"commission\",250) as p "
+    with.query("select \"adhoc\".my_sum2(\"commission\",250) as p\n"
         + "from \"adhoc\".EMPLOYEES\n")
         .returns("P=1500\n");
     // implicit type coercion.
-    with.query("select \"adhoc\".my_sum2(\"name\",250) as p from \"adhoc\".EMPLOYEES\n")
+    with.query("select \"adhoc\".my_sum2(\"name\",250) as p\n"
+        + "from \"adhoc\".EMPLOYEES\n")
         .throws_("java.lang.NumberFormatException: For input string: \"Bill\"");
     // implicit type coercion.
-    with.query("select \"adhoc\".my_sum2(\"empid\",0.0) as p from \"adhoc\".EMPLOYEES\n")
+    with.query("select \"adhoc\".my_sum2(\"empid\",0.0) as p\n"
+        + "from \"adhoc\".EMPLOYEES\n")
         .returns("P=560\n");
   }
 
   /** Test for
    * {@link org.apache.calcite.runtime.CalciteResource#firstParameterOfAdd(String)}. */
-  @Test void testUserDefinedAggregateFunction3() throws Exception {
-    withBadUdf(Smalls.SumFunctionBadIAdd.class).connectThrows(
-        "Caused by: java.lang.RuntimeException: In user-defined aggregate class 'org.apache.calcite.util.Smalls$SumFunctionBadIAdd', first parameter to 'add' method must be the accumulator (the return type of the 'init' method)");
+  @Test void testUserDefinedAggregateFunction3() {
+    String message = "Caused by: java.lang.RuntimeException: In user-defined "
+        + "aggregate class 'org.apache.calcite.util.Smalls$SumFunctionBadIAdd'"
+        + ", first parameter to 'add' method must be the accumulator (the "
+        + "return type of the 'init' method)";
+    withBadUdf(Smalls.SumFunctionBadIAdd.class).connectThrows(message);
   }
 
   /** Test case for
@@ -722,7 +735,7 @@ class UdfTest {
             "deptno=10; P=30");
   }
 
-  private static CalciteAssert.AssertThat withBadUdf(Class clazz) {
+  private static CalciteAssert.AssertThat withBadUdf(Class<?> clazz) {
     final String empDept = JdbcTest.EmpDeptTableFactory.class.getName();
     final String className = clazz.getName();
     return CalciteAssert.model("{\n"
@@ -753,7 +766,7 @@ class UdfTest {
   /** Tests user-defined aggregate function with FILTER.
    *
    * <p>Also tests that we do not try to push ADAF to JDBC source. */
-  @Test void testUserDefinedAggregateFunctionWithFilter() throws Exception {
+  @Test void testUserDefinedAggregateFunctionWithFilter() {
     final String sum = Smalls.MyStaticSumFunction.class.getName();
     final String sum2 = Smalls.MySumFunction.class.getName();
     final CalciteAssert.AssertThat with = CalciteAssert.model("{\n"
@@ -797,7 +810,7 @@ class UdfTest {
   }
 
   /** Tests resolution of functions using schema paths. */
-  @Test void testPath() throws Exception {
+  @Test void testPath() {
     final String name = Smalls.MyPlusFunction.class.getName();
     final CalciteAssert.AssertThat with = CalciteAssert.model("{\n"
         + "  version: '1.0',\n"
@@ -1012,7 +1025,6 @@ class UdfTest {
            ResultSet resultSet = statement.executeQuery(sql)) {
         assertThat(CalciteAssert.toString(resultSet), is(result));
       }
-      connection.close();
     }
   }
 
@@ -1061,7 +1073,8 @@ class UdfTest {
 
   /** Function with signature "f(ARRAY OF INTEGER, INTEGER) returns ARRAY OF
    * INTEGER". */
-  private class ArrayAppendIntegerFunction extends ArrayAppendScalarFunction {
+  private static class ArrayAppendIntegerFunction
+      extends ArrayAppendScalarFunction {
     @Override public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
       return typeFactory.createArrayType(
           typeFactory.createSqlType(SqlTypeName.INTEGER), -1);
@@ -1077,7 +1090,8 @@ class UdfTest {
 
   /** Function with signature "f(ARRAY OF DOUBLE, INTEGER) returns ARRAY OF
    * DOUBLE". */
-  private class ArrayAppendDoubleFunction extends ArrayAppendScalarFunction {
+  private static class ArrayAppendDoubleFunction
+      extends ArrayAppendScalarFunction {
     public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
       return typeFactory.createArrayType(
           typeFactory.createSqlType(SqlTypeName.DOUBLE), -1);
diff --git a/core/src/test/java/org/apache/calcite/tools/PlannerTest.java b/core/src/test/java/org/apache/calcite/tools/PlannerTest.java
index 03a9ab1..f9622c4 100644
--- a/core/src/test/java/org/apache/calcite/tools/PlannerTest.java
+++ b/core/src/test/java/org/apache/calcite/tools/PlannerTest.java
@@ -247,7 +247,7 @@ class PlannerTest {
   @Test void testValidateUserDefinedFunctionInSchema() throws Exception {
     SchemaPlus rootSchema = Frameworks.createRootSchema(true);
     rootSchema.add("my_plus",
-        ScalarFunctionImpl.create(Smalls.MyPlusFunction.class, "eval"));
+        ScalarFunctionImpl.create(Smalls.MY_PLUS_EVAL_METHOD));
     final FrameworkConfig config = Frameworks.newConfigBuilder()
         .defaultSchema(
             CalciteAssert.addSchema(rootSchema, CalciteAssert.SchemaSpec.HR))
diff --git a/core/src/test/java/org/apache/calcite/util/Smalls.java b/core/src/test/java/org/apache/calcite/util/Smalls.java
index 6dab428..82fc906 100644
--- a/core/src/test/java/org/apache/calcite/util/Smalls.java
+++ b/core/src/test/java/org/apache/calcite/util/Smalls.java
@@ -35,6 +35,7 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.schema.FunctionContext;
 import org.apache.calcite.schema.QueryableTable;
 import org.apache.calcite.schema.ScannableTable;
 import org.apache.calcite.schema.Schema;
@@ -64,6 +65,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -115,6 +117,11 @@ public class Smalls {
   public static final Method PROCESS_CURSORS_METHOD =
       Types.lookupMethod(Smalls.class, "processCursors",
           int.class, Enumerable.class, Enumerable.class);
+  public static final Method MY_PLUS_EVAL_METHOD =
+      Types.lookupMethod(MyPlusFunction.class, "eval", int.class, int.class);
+  public static final Method MY_PLUS_INIT_EVAL_METHOD =
+      Types.lookupMethod(MyPlusInitFunction.class, "eval", int.class,
+          int.class);
 
   private Smalls() {}
 
@@ -441,6 +448,42 @@ public class Smalls {
     }
   }
 
+  /** As {@link MyPlusFunction} but constructor has a
+   *  {@link org.apache.calcite.schema.FunctionContext} parameter. */
+  public static class MyPlusInitFunction {
+    public static final ThreadLocal<AtomicInteger> INSTANCE_COUNT =
+        new ThreadLocal<>().withInitial(() -> new AtomicInteger(0));
+    public static final ThreadLocal<String> THREAD_DIGEST =
+        new ThreadLocal<>();
+
+    private final int initY;
+
+    public MyPlusInitFunction(FunctionContext fx) {
+      INSTANCE_COUNT.get().incrementAndGet();
+      final StringBuilder b = new StringBuilder();
+      final int parameterCount = fx.getParameterCount();
+      b.append("parameterCount=").append(parameterCount);
+      for (int i = 0; i < parameterCount; i++) {
+        b.append("; argument ").append(i);
+        if (fx.isArgumentConstant(i)) {
+          b.append(" is constant and has value ")
+              .append(fx.getArgumentValueAs(i, String.class));
+        } else {
+          b.append(" is not constant");
+        }
+      }
+      THREAD_DIGEST.set(b.toString());
+      this.initY = fx.isArgumentConstant(1)
+          ? fx.getArgumentValueAs(1, Integer.class)
+          : 100;
+    }
+
+    public int eval(@Parameter(name = "x") int x,
+        @Parameter(name = "y") int y) {
+      return x + initY;
+    }
+  }
+
   /** As {@link MyPlusFunction} but declared to be deterministic. */
   public static class MyDeterministicPlusFunction {
     public static final ThreadLocal<AtomicInteger> INSTANCE_COUNT =
@@ -774,9 +817,12 @@ public class Smalls {
     }
   }
 
-  /** Example of a user-defined aggregate function (UDAF). */
+  /** Example of a user-defined aggregate function (UDAF) with two parameters.
+   * The constructor has an initialization parameter. */
   public static class MyTwoParamsSumFunctionFilter1 {
-    public MyTwoParamsSumFunctionFilter1() {
+    public MyTwoParamsSumFunctionFilter1(FunctionContext fx) {
+      Objects.requireNonNull(fx, "fx");
+      assert fx.getParameterCount() == 2;
     }
     public int init() {
       return 0;
@@ -795,7 +841,8 @@ public class Smalls {
     }
   }
 
-  /** Example of a user-defined aggregate function (UDAF). */
+  /** Another example of a user-defined aggregate function (UDAF) with two
+   * parameters. */
   public static class MyTwoParamsSumFunctionFilter2 {
     public MyTwoParamsSumFunctionFilter2() {
     }
@@ -837,7 +884,8 @@ public class Smalls {
   }
 
   /** Example of a user-defined aggregate function (UDAF), whose methods are
-   * static. olny for validate to get exact function by calcite*/
+   * static. Similar to {@link MyThreeParamsSumFunctionWithFilter1}, but
+   * argument types are different. */
   public static class MyThreeParamsSumFunctionWithFilter2 {
     public static long init() {
       return 0L;
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/tree/BlockBuilder.java b/linq4j/src/main/java/org/apache/calcite/linq4j/tree/BlockBuilder.java
index 91786f2..ea8bd7a 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/tree/BlockBuilder.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/tree/BlockBuilder.java
@@ -47,9 +47,18 @@ public class BlockBuilder {
 
   private final boolean optimizing;
   private final @Nullable BlockBuilder parent;
+  private final boolean removeUnused;
 
   private static final Shuttle OPTIMIZE_SHUTTLE = new OptimizeShuttle();
 
+  /** Private constructor. */
+  private BlockBuilder(boolean optimizing, @Nullable BlockBuilder parent,
+      boolean removeUnused) {
+    this.optimizing = optimizing;
+    this.parent = parent;
+    this.removeUnused = removeUnused;
+  }
+
   /**
    * Creates a non-optimizing BlockBuilder.
    */
@@ -72,8 +81,7 @@ public class BlockBuilder {
    * @param optimizing Whether to eliminate common sub-expressions
    */
   public BlockBuilder(boolean optimizing, @Nullable BlockBuilder parent) {
-    this.optimizing = optimizing;
-    this.parent = parent;
+    this(optimizing, parent, true);
   }
 
   /**
@@ -322,7 +330,7 @@ public class BlockBuilder {
    * Returns a block consisting of the current list of statements.
    */
   public BlockStatement toBlock() {
-    if (optimizing) {
+    if (optimizing && removeUnused) {
       // We put an artificial limit of 10 iterations just to prevent an endless
       // loop. Optimize should not loop forever, however it is hard to prove if
       // it always finishes in reasonable time.
@@ -510,6 +518,10 @@ public class BlockBuilder {
     return this;
   }
 
+  public BlockBuilder withRemoveUnused(boolean removeUnused) {
+    return new BlockBuilder(optimizing, parent, removeUnused);
+  }
+
   /** Substitute Variable Visitor. */
   private static class SubstituteVariableVisitor extends Shuttle {
     protected final Map<ParameterExpression, Expression> map;
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/tree/Expressions.java b/linq4j/src/main/java/org/apache/calcite/linq4j/tree/Expressions.java
index 3038b07..3389f9d 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/tree/Expressions.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/tree/Expressions.java
@@ -2020,6 +2020,16 @@ public abstract class Expressions {
   /**
    * Creates a NewArrayExpression that represents creating an array
    * that has a specified rank.
+   *
+   * <p>For example,
+   * {@code newArrayBounds(int.class, 1, constant(8))}
+   * yields {@code new int[8]};
+   * {@code newArrayBounds(int.class, 3, constant(8))}
+   * yields {@code new int[8][][]};
+   *
+   * @param type Element type of the array
+   * @param dimension Dimension of the array
+   * @param bound Size of the first dimension
    */
   public static NewArrayExpression newArrayBounds(Type type, int dimension,
       @Nullable Expression bound) {
@@ -2031,7 +2041,12 @@ public abstract class Expressions {
    * one-dimensional array and initializing it from a list of
    * elements.
    *
-   * @param type Element type of the array.
+   * <p>For example, "{@code newArrayInit(int.class,
+   * Arrays.asList(constant(1), constant(2))}"
+   * yields "{@code new int[] {1, 2}}".
+   *
+   * @param type Element type of the array
+   * @param expressions Initializer expressions
    */
   public static NewArrayExpression newArrayInit(Type type,
       Iterable<? extends Expression> expressions) {
@@ -2043,7 +2058,11 @@ public abstract class Expressions {
    * one-dimensional array and initializing it from a list of
    * elements, using varargs.
    *
-   * @param type Element type of the array.
+   * <p>For example, "{@code newArrayInit(int.class, constant(1), constant(2)}"
+   * yields "{@code new int[] {1, 2}}".
+   *
+   * @param type Element type of the array
+   * @param expressions Initializer expressions
    */
   public static NewArrayExpression newArrayInit(Type type,
       Expression... expressions) {
@@ -2055,7 +2074,12 @@ public abstract class Expressions {
    * n-dimensional array and initializing it from a list of
    * elements.
    *
-   * @param type Element type of the array.
+   * <p>For example, "{@code newArrayInit(int.class, 2, Arrays.asList())}"
+   * yields "{@code new int[][] {}}".
+   *
+   * @param type Element type of the array
+   * @param dimension Dimension of the array
+   * @param expressions Initializer expressions
    */
   public static NewArrayExpression newArrayInit(Type type, int dimension,
       Iterable<? extends Expression> expressions) {
@@ -2067,7 +2091,12 @@ public abstract class Expressions {
    * n-dimensional array and initializing it from a list of
    * elements, using varargs.
    *
-   * @param type Element type of the array.
+   * <p>For example, "{@code newArrayInit(int.class, 2)}"
+   * yields "{@code new int[][] {}}".
+   *
+   * @param type Element type of the array
+   * @param dimension Dimension of the array
+   * @param expressions Initializer expressions
    */
   public static NewArrayExpression newArrayInit(Type type, int dimension,
       Expression... expressions) {
diff --git a/spark/src/main/java/org/apache/calcite/adapter/spark/SparkRules.java b/spark/src/main/java/org/apache/calcite/adapter/spark/SparkRules.java
index 6e35e04..69c237a 100644
--- a/spark/src/main/java/org/apache/calcite/adapter/spark/SparkRules.java
+++ b/spark/src/main/java/org/apache/calcite/adapter/spark/SparkRules.java
@@ -393,6 +393,7 @@ public abstract class SparkRules {
               conformance,
               builder2,
               null,
+              null,
               DataContext.ROOT,
               new RexToLixTranslator.InputGetterImpl(e_, result.physType),
               null);