You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/29 06:43:34 UTC

[flink] branch master updated (c14d843 -> e263856)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from c14d843  [hotfix][doc] fix catalog registration example
     new 1e92322  [FLINK-13774][table-planner-blink] Blink extended expressions should implement ResolvedExpression
     new 816233d  [FLINK-13774][table-planner-blink] Fix Reinterpret bug of PlannerExpressionConverter
     new bb7dc79  [FLINK-13774][table-planner-blink] Supports decimal with different precision for IF PlannerExpression
     new de22d7c  [FLINK-13774][table-planner-blink] Modify filterable table source accept ResolvedExpression
     new e922393  [FLINK-13774][table] FieldComputer should return ResolvedExpression
     new 43b8a36  [FLINK-13774][table-planner-blink] Expressions of DeclarativeAggregateFunction should be resolved
     new d2085a1  [FLINK-13774][table-planner-blink] Remove unresolved expression in RexNodeConverter
     new e263856  [FLINK-13774][table-planner-blink] Use LocalReferenceExpression and RexNodeExpression instead of blink expressions

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/sources/tsextractors/ExistingField.java  |  10 +-
 .../expressions/CallExpressionResolver.java        |  57 ++++++++
 .../expressions/DeclarativeExpressionResolver.java | 118 ++++++++++++++++
 .../planner/expressions/ExpressionBuilder.java     |  53 ++++----
 .../expressions/ResolvedAggInputReference.java     |  99 --------------
 .../expressions/ResolvedAggLocalReference.java     | 100 --------------
 .../expressions/ResolvedDistinctKeyReference.java  |  90 -------------
 .../planner/expressions/RexNodeConverter.java      |  99 ++------------
 .../aggfunctions/SingleValueAggFunction.java       |  12 +-
 .../table/planner/calcite/FlinkLocalRef.scala      |  47 +++++--
 .../table/planner/codegen/ExprCodeGenerator.scala  |  19 ++-
 .../codegen/agg/AggsHandlerCodeGenerator.scala     |  15 ++-
 .../codegen/agg/DeclarativeAggCodeGen.scala        | 149 ++++++++-------------
 .../planner/codegen/agg/ImperativeAggCodeGen.scala |  10 +-
 .../codegen/agg/batch/AggCodeGenHelper.scala       |  98 ++++----------
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   |  74 +++-------
 .../codegen/agg/batch/WindowCodeGenerator.scala    |   4 +-
 .../expressions/PlannerExpressionConverter.scala   |   9 +-
 .../planner/expressions/fieldExpression.scala      |  17 +++
 .../flink/table/planner/expressions/logic.scala    |   6 +-
 .../table/planner/plan/utils/AggregateUtil.scala   |   8 +-
 .../planner/plan/utils/RexNodeExtractor.scala      |  52 +++----
 .../table/planner/sources/TableSourceUtil.scala    |  15 ++-
 .../planner/expressions/ScalarFunctionsTest.scala  |  10 ++
 .../table/planner/utils/testTableSources.scala     |  12 +-
 25 files changed, 480 insertions(+), 703 deletions(-)
 create mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java
 create mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/DeclarativeExpressionResolver.java
 delete mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggInputReference.java
 delete mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggLocalReference.java
 delete mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedDistinctKeyReference.java


[flink] 02/08: [FLINK-13774][table-planner-blink] Fix Reinterpret bug of PlannerExpressionConverter

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 816233d020fb9ec96594a6512d94c015f70ee43b
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon Aug 19 17:26:26 2019 +0200

    [FLINK-13774][table-planner-blink] Fix Reinterpret bug of PlannerExpressionConverter
---
 .../flink/table/planner/expressions/PlannerExpressionConverter.scala    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index 90a8282..a8b4c11 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -60,7 +60,7 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
 
       case REINTERPRET_CAST =>
         assert(children.size == 3)
-        Reinterpret(
+        return Reinterpret(
           children.head.accept(this),
           fromDataTypeToTypeInfo(
             children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType),


[flink] 03/08: [FLINK-13774][table-planner-blink] Supports decimal with different precision for IF PlannerExpression

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bb7dc79569dc85d57b1b313c6681dd788308d32e
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon Aug 19 17:27:16 2019 +0200

    [FLINK-13774][table-planner-blink] Supports decimal with different precision for IF PlannerExpression
---
 .../org/apache/flink/table/planner/expressions/logic.scala     |  6 +++++-
 .../flink/table/planner/expressions/ScalarFunctionsTest.scala  | 10 ++++++++++
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/logic.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/logic.scala
index 7ef2adc..ec6c88f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/logic.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/logic.scala
@@ -19,6 +19,8 @@ package org.apache.flink.table.planner.expressions
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.table.planner.validate._
+import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isDecimal
 
 abstract class BinaryPredicate extends BinaryExpression {
   override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
@@ -76,7 +78,9 @@ case class If(
 
   override private[flink] def validateInput(): ValidationResult = {
     if (condition.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
-        ifTrue.resultType == ifFalse.resultType) {
+        ((isDecimal(fromTypeInfoToLogicalType(ifTrue.resultType)) &&
+            isDecimal(fromTypeInfoToLogicalType(ifFalse.resultType))) ||
+            ifTrue.resultType == ifFalse.resultType)) {
       ValidationSuccess
     } else {
       ValidationFailure(
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 7e094a4..01b0f3e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -4016,6 +4016,16 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   }
 
   @Test
+  def testIfDecimal(): Unit = {
+    // test DECIMAL, DECIMAL
+    testAllApis(
+      ifThenElse('f7 < 5, 'f31, 'f34),
+      "ifThenElse(f7 < 5, f31, f34)",
+      "IF(f7 < 5, f31, f34)",
+      "-0.1231231321321321111")
+  }
+
+  @Test
   def testIsDecimal(): Unit = {
     testSqlApi(
       "IS_DECIMAL('1')",


[flink] 06/08: [FLINK-13774][table-planner-blink] Expressions of DeclarativeAggregateFunction should be resolved

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 43b8a36efc9275c3b95dd2d6961ef11e8e5d07e9
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Aug 22 12:57:06 2019 +0200

    [FLINK-13774][table-planner-blink] Expressions of DeclarativeAggregateFunction should be resolved
---
 .../expressions/CallExpressionResolver.java        |  57 ++++++++++
 .../expressions/DeclarativeExpressionResolver.java |  95 ++++++++++++++++
 .../planner/expressions/ExpressionBuilder.java     |  53 +++++----
 .../aggfunctions/SingleValueAggFunction.java       |  12 ++-
 .../codegen/agg/DeclarativeAggCodeGen.scala        | 120 +++++++--------------
 .../codegen/agg/batch/AggCodeGenHelper.scala       |  78 ++++----------
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   |  71 ++++--------
 .../codegen/agg/batch/WindowCodeGenerator.scala    |   4 +-
 .../expressions/PlannerExpressionConverter.scala   |   8 ++
 .../planner/expressions/fieldExpression.scala      |  55 ++++++++++
 .../table/planner/plan/utils/AggregateUtil.scala   |   8 +-
 11 files changed, 339 insertions(+), 222 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java
new file mode 100644
index 0000000..a46c786
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions;
+
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Planner expression resolver for {@link UnresolvedCallExpression}.
+ */
+public class CallExpressionResolver {
+
+	private final ExpressionResolver resolver;
+
+	public CallExpressionResolver(RelBuilder relBuilder) {
+		// dummy way to get context
+		FlinkContext context = (FlinkContext) relBuilder
+			.values(new String[]{"dummyField"}, "dummyValue")
+			.build()
+			.getCluster().getPlanner().getContext();
+		this.resolver = ExpressionResolver.resolverFor(
+			name -> Optional.empty(),
+			context.getFunctionCatalog()).build();
+	}
+
+	public ResolvedExpression resolve(Expression expression) {
+		List<ResolvedExpression> resolved = resolver.resolve(Collections.singletonList(expression));
+		Preconditions.checkArgument(resolved.size() == 1);
+		return resolved.get(0);
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/DeclarativeExpressionResolver.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/DeclarativeExpressionResolver.java
new file mode 100644
index 0000000..6b1b429
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/DeclarativeExpressionResolver.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions;
+
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction;
+
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.util.stream.Collectors;
+
+/**
+ * Abstract class to resolve the expressions in {@link DeclarativeAggregateFunction}.
+ */
+public abstract class DeclarativeExpressionResolver extends ExpressionDefaultVisitor<ResolvedExpression> {
+
+	private final DeclarativeAggregateFunction function;
+	private final boolean isMerge;
+	private final CallExpressionResolver resolver;
+
+	public DeclarativeExpressionResolver(
+		RelBuilder relBuilder, DeclarativeAggregateFunction function, boolean isMerge) {
+		this.function = function;
+		this.isMerge = isMerge;
+		this.resolver = new CallExpressionResolver(relBuilder);
+	}
+
+	@Override
+	protected ResolvedExpression defaultMethod(Expression expression) {
+		if (expression instanceof UnresolvedReferenceExpression) {
+			UnresolvedReferenceExpression expr = (UnresolvedReferenceExpression) expression;
+			String name = expr.getName();
+			int localIndex = ArrayUtils.indexOf(function.aggBufferAttributes(), expr);
+			if (localIndex == -1) {
+				// We always use UnresolvedFieldReference to represent reference of input field.
+				// In non-merge case, the input is operand of the aggregate function. But in merge
+				// case, the input is aggregate buffers which sent by local aggregate.
+				if (isMerge) {
+					return toMergeInputExpr(name, ArrayUtils.indexOf(function.mergeOperands(), expr));
+				} else {
+					return toAccInputExpr(name, ArrayUtils.indexOf(function.operands(), expr));
+				}
+			} else {
+				return toAggBufferExpr(name, localIndex);
+			}
+		} else if (expression instanceof UnresolvedCallExpression) {
+			UnresolvedCallExpression unresolvedCall = (UnresolvedCallExpression) expression;
+			return resolver.resolve(new UnresolvedCallExpression(
+				unresolvedCall.getFunctionDefinition(),
+				unresolvedCall.getChildren().stream()
+					.map(c -> c.accept(DeclarativeExpressionResolver.this))
+					.collect(Collectors.toList())));
+		} else if (expression instanceof ResolvedExpression) {
+			return (ResolvedExpression) expression;
+		} else {
+			return resolver.resolve(expression);
+		}
+	}
+
+	/**
+	 * When merge phase, for inputs.
+	 */
+	public abstract ResolvedExpression toMergeInputExpr(String name, int localIndex);
+
+	/**
+	 * When accumulate phase, for inputs.
+	 */
+	public abstract ResolvedExpression toAccInputExpr(String name, int localIndex);
+
+	/**
+	 * For aggregate buffer.
+	 */
+	public abstract ResolvedExpression toAggBufferExpr(String name, int localIndex);
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ExpressionBuilder.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ExpressionBuilder.java
index d96e698..baddfd8 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ExpressionBuilder.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ExpressionBuilder.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.planner.expressions;
 
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.expressions.utils.ApiExpressionUtils;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.DataType;
@@ -42,92 +44,91 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.OR;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.PLUS;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REINTERPRET_CAST;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.TIMES;
-import static org.apache.flink.table.planner.functions.InternalFunctionDefinitions.THROW_EXCEPTION;
 
 /**
  * Builder for {@link Expression}s.
  */
 public class ExpressionBuilder {
 
-	public static Expression nullOf(DataType type) {
+	public static ValueLiteralExpression nullOf(DataType type) {
 		return literal(null, type);
 	}
 
-	public static Expression literal(Object value) {
+	public static ValueLiteralExpression literal(Object value) {
 		return ApiExpressionUtils.valueLiteral(value);
 	}
 
-	public static Expression literal(Object value, DataType type) {
+	public static ValueLiteralExpression literal(Object value, DataType type) {
 		return ApiExpressionUtils.valueLiteral(value, type);
 	}
 
-	public static Expression call(FunctionDefinition functionDefinition, Expression... args) {
+	public static UnresolvedCallExpression call(FunctionDefinition functionDefinition, Expression... args) {
 		return ApiExpressionUtils.unresolvedCall(functionDefinition, args);
 	}
 
-	public static Expression call(FunctionDefinition functionDefinition, List<Expression> args) {
+	public static UnresolvedCallExpression call(FunctionDefinition functionDefinition, List<Expression> args) {
 		return ApiExpressionUtils.unresolvedCall(functionDefinition, args.toArray(new Expression[0]));
 	}
 
-	public static Expression and(Expression arg1, Expression arg2) {
+	public static UnresolvedCallExpression and(Expression arg1, Expression arg2) {
 		return call(AND, arg1, arg2);
 	}
 
-	public static Expression or(Expression arg1, Expression arg2) {
+	public static UnresolvedCallExpression or(Expression arg1, Expression arg2) {
 		return call(OR, arg1, arg2);
 	}
 
-	public static Expression not(Expression arg) {
+	public static UnresolvedCallExpression not(Expression arg) {
 		return call(NOT, arg);
 	}
 
-	public static Expression isNull(Expression input) {
+	public static UnresolvedCallExpression isNull(Expression input) {
 		return call(IS_NULL, input);
 	}
 
-	public static Expression ifThenElse(Expression condition, Expression ifTrue,
-			Expression ifFalse) {
+	public static UnresolvedCallExpression ifThenElse(Expression condition, Expression ifTrue,
+		Expression ifFalse) {
 		return call(IF, condition, ifTrue, ifFalse);
 	}
 
-	public static Expression plus(Expression input1, Expression input2) {
+	public static UnresolvedCallExpression plus(Expression input1, Expression input2) {
 		return call(PLUS, input1, input2);
 	}
 
-	public static Expression minus(Expression input1, Expression input2) {
+	public static UnresolvedCallExpression minus(Expression input1, Expression input2) {
 		return call(MINUS, input1, input2);
 	}
 
-	public static Expression div(Expression input1, Expression input2) {
+	public static UnresolvedCallExpression div(Expression input1, Expression input2) {
 		return call(DIVIDE, input1, input2);
 	}
 
-	public static Expression times(Expression input1, Expression input2) {
+	public static UnresolvedCallExpression times(Expression input1, Expression input2) {
 		return call(TIMES, input1, input2);
 	}
 
-	public static Expression mod(Expression input1, Expression input2) {
+	public static UnresolvedCallExpression mod(Expression input1, Expression input2) {
 		return call(MOD, input1, input2);
 	}
 
-	public static Expression equalTo(Expression input1, Expression input2) {
+	public static UnresolvedCallExpression equalTo(Expression input1, Expression input2) {
 		return call(EQUALS, input1, input2);
 	}
 
-	public static Expression lessThan(Expression input1, Expression input2) {
+	public static UnresolvedCallExpression lessThan(Expression input1, Expression input2) {
 		return call(LESS_THAN, input1, input2);
 	}
 
-	public static Expression greaterThan(Expression input1, Expression input2) {
+	public static UnresolvedCallExpression greaterThan(Expression input1, Expression input2) {
 		return call(GREATER_THAN, input1, input2);
 	}
 
-	public static Expression cast(Expression child, Expression type) {
+	public static UnresolvedCallExpression cast(Expression child, Expression type) {
 		return call(CAST, child, type);
 	}
 
-	public static Expression reinterpretCast(Expression child, Expression type,
-			boolean checkOverflow) {
+	public static UnresolvedCallExpression reinterpretCast(Expression child, Expression type,
+		boolean checkOverflow) {
 		return call(REINTERPRET_CAST, child, type, literal(checkOverflow));
 	}
 
@@ -135,11 +136,7 @@ public class ExpressionBuilder {
 		return ApiExpressionUtils.typeLiteral(type);
 	}
 
-	public static Expression concat(Expression input1, Expression input2) {
+	public static UnresolvedCallExpression concat(Expression input1, Expression input2) {
 		return call(CONCAT, input1, input2);
 	}
-
-	public static Expression throwException(String msg, DataType type) {
-		return call(THROW_EXCEPTION, literal(msg), typeLiteral(type));
-	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
index 865c0c2..24b2d44 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
@@ -19,12 +19,15 @@
 package org.apache.flink.table.planner.functions.aggfunctions;
 
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.TimeType;
 
+import java.util.Arrays;
+
 import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.planner.expressions.ExpressionBuilder.equalTo;
 import static org.apache.flink.table.planner.expressions.ExpressionBuilder.greaterThan;
@@ -34,7 +37,8 @@ import static org.apache.flink.table.planner.expressions.ExpressionBuilder.minus
 import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
 import static org.apache.flink.table.planner.expressions.ExpressionBuilder.or;
 import static org.apache.flink.table.planner.expressions.ExpressionBuilder.plus;
-import static org.apache.flink.table.planner.expressions.ExpressionBuilder.throwException;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral;
+import static org.apache.flink.table.planner.functions.InternalFunctionDefinitions.THROW_EXCEPTION;
 
 /**
  * Base class for built-in single value aggregate function.
@@ -118,6 +122,12 @@ public abstract class SingleValueAggFunction extends DeclarativeAggregateFunctio
 		return value;
 	}
 
+	private static Expression throwException(String msg, DataType type) {
+		// it is the internal function without catalog.
+		// so it can not be find in any catalog or built-in functions.
+		return new CallExpression(THROW_EXCEPTION, Arrays.asList(literal(msg), typeLiteral(type)), type);
+	}
+
 	/**
 	 * Built-in byte single value aggregate function.
 	 */
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
index 30efc86..a175f64 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
@@ -18,19 +18,16 @@
 package org.apache.flink.table.planner.codegen.agg
 
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.expressions.utils.ApiExpressionUtils
 import org.apache.flink.table.planner.codegen.CodeGenUtils.primitiveTypeTermForType
 import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.DISTINCT_KEY_TERM
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, ExprCodeGenerator, GeneratedExpression}
-import org.apache.flink.table.planner.expressions.{ResolvedAggInputReference, ResolvedAggLocalReference, ResolvedDistinctKeyReference, RexNodeConverter}
+import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, ResolvedAggInputReference, ResolvedAggLocalReference, ResolvedDistinctKeyReference, RexNodeConverter}
 import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.planner.plan.utils.AggregateInfo
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
 import org.apache.flink.table.types.logical.LogicalType
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 
-import scala.collection.JavaConverters._
+import org.apache.calcite.tools.RelBuilder
 
 /**
   * It is for code generate aggregation functions that are specified using expressions.
@@ -216,87 +213,52 @@ class DeclarativeAggCodeGen(
     */
   private case class ResolveReference(
       isMerge: Boolean = false,
-      isDistinctMerge: Boolean = false) extends ExpressionVisitor[Expression] {
-
-    override def visit(call: CallExpression): Expression = ???
-
-    override def visit(valueLiteralExpression: ValueLiteralExpression): Expression = {
-      valueLiteralExpression
-    }
-
-    override def visit(input: FieldReferenceExpression): Expression = {
-      input
+      isDistinctMerge: Boolean = false)
+    extends DeclarativeExpressionResolver(relBuilder, function, isMerge) {
+
+    override def toMergeInputExpr(name: String, localIndex: Int): ResolvedExpression = {
+      // in merge case, the input1 is mergedAcc
+      new ResolvedAggInputReference(
+        name,
+        mergedAccOffset + bufferIndexes(localIndex),
+        bufferTypes(localIndex))
     }
 
-    override def visit(typeLiteral: TypeLiteralExpression): Expression = {
-      typeLiteral
-    }
-
-    private def visitUnresolvedCallExpression(
-        unresolvedCall: UnresolvedCallExpression): Expression = {
-      ApiExpressionUtils.unresolvedCall(
-        unresolvedCall.getFunctionDefinition,
-        unresolvedCall.getChildren.asScala.map(_.accept(this)): _*)
-    }
-
-    private def visitUnresolvedReference(input: UnresolvedReferenceExpression)
-      : Expression = {
-      function.aggBufferAttributes.indexOf(input) match {
-        case -1 =>
-          // Not find in agg buffers, it is a operand, represent reference of input field.
-          // In non-merge case, the input is the operand of the aggregate function.
-          // In merge case, the input is the aggregate buffers sent by local aggregate.
-          if (isMerge) {
-            val localIndex = function.mergeOperands.indexOf(input)
-            // in merge case, the input1 is mergedAcc
-            new ResolvedAggInputReference(
-              input.getName,
-              mergedAccOffset + bufferIndexes(localIndex),
-              bufferTypes(localIndex))
+    override def toAccInputExpr(name: String, localIndex: Int): ResolvedExpression = {
+      val inputIndex = argIndexes(localIndex)
+      if (inputIndex >= inputTypes.length) { // it is a constant
+        val constantIndex = inputIndex - inputTypes.length
+        val constantTerm = constantExprs(constantIndex).resultTerm
+        val nullTerm = constantExprs(constantIndex).nullTerm
+        val constantType = constantExprs(constantIndex).resultType
+        // constant is reused as member variable
+        new ResolvedAggLocalReference(
+          constantTerm,
+          nullTerm,
+          constantType)
+      } else { // it is a input field
+        if (isDistinctMerge) { // this is called from distinct merge
+          if (function.operandCount == 1) {
+            // the distinct key is a BoxedValue
+            new ResolvedDistinctKeyReference(name, argTypes(localIndex))
           } else {
-            val localIndex = function.operands.indexOf(input)
-            val inputIndex = argIndexes(localIndex)
-            if (inputIndex >= inputTypes.length) { // it is a constant
-              val constantIndex = inputIndex - inputTypes.length
-              val constantTerm = constantExprs(constantIndex).resultTerm
-              val nullTerm = constantExprs(constantIndex).nullTerm
-              val constantType = constantExprs(constantIndex).resultType
-              // constant is reused as member variable
-              new ResolvedAggLocalReference(
-                constantTerm,
-                nullTerm,
-                constantType)
-            } else { // it is a input field
-              if (isDistinctMerge) {  // this is called from distinct merge
-                if (function.operandCount == 1) {
-                  // the distinct key is a BoxedValue
-                  new ResolvedDistinctKeyReference(input.getName, argTypes(localIndex))
-                } else {
-                  // the distinct key is a BaseRow
-                  new ResolvedAggInputReference(input.getName, localIndex, argTypes(localIndex))
-                }
-              } else {
-                // the input is the inputRow
-                new ResolvedAggInputReference(
-                  input.getName, argIndexes(localIndex), argTypes(localIndex))
-              }
-            }
+            // the distinct key is a BaseRow
+            new ResolvedAggInputReference(name, localIndex, argTypes(localIndex))
           }
-        case localIndex =>
-          // it is a agg buffer.
-          val name = bufferTerms(localIndex)
-          val nullTerm = bufferNullTerms(localIndex)
-          // buffer access is reused as member variable
-          new ResolvedAggLocalReference(name, nullTerm, bufferTypes(localIndex))
+        } else {
+          // the input is the inputRow
+          new ResolvedAggInputReference(
+            name, argIndexes(localIndex), argTypes(localIndex))
+        }
       }
     }
 
-    override def visit(other: Expression): Expression = {
-      other match {
-        case u : UnresolvedReferenceExpression => visitUnresolvedReference(u)
-        case u : UnresolvedCallExpression => visitUnresolvedCallExpression(u)
-        case _ => other
-      }
+    override def toAggBufferExpr(name: String, localIndex: Int): ResolvedExpression = {
+        // it is a agg buffer.
+        val name = bufferTerms(localIndex)
+        val nullTerm = bufferNullTerms(localIndex)
+        // buffer access is reused as member variable
+        new ResolvedAggLocalReference(name, nullTerm, bufferTypes(localIndex))
     }
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
index 2b95509..d235f91 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
@@ -22,13 +22,12 @@ import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.runtime.util.SingleElementIterator
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator
 import org.apache.flink.table.dataformat.{BaseRow, GenericRow}
-import org.apache.flink.table.expressions.utils.ApiExpressionUtils
-import org.apache.flink.table.expressions.{Expression, ExpressionVisitor, FieldReferenceExpression, TypeLiteralExpression, UnresolvedCallExpression, UnresolvedReferenceExpression, ValueLiteralExpression, _}
+import org.apache.flink.table.expressions.{Expression, _}
 import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction}
 import org.apache.flink.table.planner.codegen.CodeGenUtils._
 import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.STREAM_RECORD
 import org.apache.flink.table.planner.codegen._
-import org.apache.flink.table.planner.expressions.{ResolvedAggInputReference, ResolvedAggLocalReference, RexNodeConverter}
+import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, ResolvedAggInputReference, ResolvedAggLocalReference, RexNodeConverter}
 import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.{getAccumulatorTypeOfAggregateFunction, getAggUserDefinedInputTypes}
 import org.apache.flink.table.runtime.context.ExecutionContextImpl
@@ -38,12 +37,11 @@ import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDat
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical.{LogicalType, RowType}
+
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
-import scala.collection.JavaConverters._
-
 /**
   * Batch aggregate code generate helper.
   */
@@ -253,60 +251,28 @@ object AggCodeGenHelper {
     */
   private case class ResolveReference(
       ctx: CodeGeneratorContext,
+      relBuilder: RelBuilder,
       isMerge: Boolean,
       agg: DeclarativeAggregateFunction,
       aggIndex: Int,
       argsMapping: Array[Array[(Int, LogicalType)]],
-      aggBufferTypes: Array[Array[LogicalType]]) extends ExpressionVisitor[Expression] {
-
-    override def visit(call: CallExpression): Expression = ???
+      aggBufferTypes: Array[Array[LogicalType]])
+    extends DeclarativeExpressionResolver(relBuilder, agg, isMerge) {
 
-    override def visit(valueLiteralExpression: ValueLiteralExpression): Expression = {
-      valueLiteralExpression
+    override def toMergeInputExpr(name: String, localIndex: Int): ResolvedExpression = {
+      val (inputIndex, inputType) = argsMapping(aggIndex)(localIndex)
+      new ResolvedAggInputReference(name, inputIndex, inputType)
     }
 
-    override def visit(input: FieldReferenceExpression): Expression = {
-      input
+    override def toAccInputExpr(name: String, localIndex: Int): ResolvedExpression = {
+      val (inputIndex, inputType) = argsMapping(aggIndex)(localIndex)
+      new ResolvedAggInputReference(name, inputIndex, inputType)
     }
 
-    override def visit(typeLiteral: TypeLiteralExpression): Expression = {
-      typeLiteral
-    }
-
-    private def visitUnresolvedCallExpression(
-        unresolvedCall: UnresolvedCallExpression): Expression = {
-      ApiExpressionUtils.unresolvedCall(
-        unresolvedCall.getFunctionDefinition,
-        unresolvedCall.getChildren.asScala.map(_.accept(this)): _*)
-    }
-
-    private def visitUnresolvedFieldReference(
-        input: UnresolvedReferenceExpression): Expression = {
-      agg.aggBufferAttributes.indexOf(input) match {
-        case -1 =>
-          // We always use UnresolvedFieldReference to represent reference of input field.
-          // In non-merge case, the input is operand of the aggregate function. But in merge
-          // case, the input is aggregate buffers which sent by local aggregate.
-          val localIndex = if (isMerge) {
-            agg.mergeOperands.indexOf(input)
-          } else {
-            agg.operands.indexOf(input)
-          }
-          val (inputIndex, inputType) = argsMapping(aggIndex)(localIndex)
-          new ResolvedAggInputReference(input.getName, inputIndex, inputType)
-        case localIndex =>
-          val variableName = s"agg${aggIndex}_${input.getName}"
-          newLocalReference(
-            ctx, variableName, aggBufferTypes(aggIndex)(localIndex))
-      }
-    }
-
-    override def visit(other: Expression): Expression = {
-      other match {
-        case u : UnresolvedReferenceExpression => visitUnresolvedFieldReference(u)
-        case u : UnresolvedCallExpression => visitUnresolvedCallExpression(u)
-        case _ => other
-      }
+    override def toAggBufferExpr(name: String, localIndex: Int): ResolvedExpression = {
+      val variableName = s"agg${aggIndex}_$name"
+      newLocalReference(
+        ctx, variableName, aggBufferTypes(aggIndex)(localIndex))
     }
   }
 
@@ -333,7 +299,7 @@ object AggCodeGenHelper {
       case (agg: DeclarativeAggregateFunction, aggIndex: Int) =>
         val idx = auxGrouping.length + aggIndex
         agg.aggBufferAttributes.map(_.accept(
-          ResolveReference(ctx, isMerge, agg, idx, argsMapping, aggBufferTypes)))
+          ResolveReference(ctx, builder, isMerge, agg, idx, argsMapping, aggBufferTypes)))
       case (_: AggregateFunction[_, _], aggIndex: Int) =>
         val idx = auxGrouping.length + aggIndex
         val variableName = aggBufferNames(idx)(0)
@@ -525,7 +491,7 @@ object AggCodeGenHelper {
       case (agg: DeclarativeAggregateFunction, aggIndex) =>
         val idx = auxGrouping.length + aggIndex
         agg.getValueExpression.accept(ResolveReference(
-          ctx, isMerge, agg, idx, argsMapping, aggBufferTypes))
+          ctx, builder, isMerge, agg, idx, argsMapping, aggBufferTypes))
       case (agg: AggregateFunction[_, _], aggIndex) =>
         val idx = auxGrouping.length + aggIndex
         (agg, idx)
@@ -567,8 +533,8 @@ object AggCodeGenHelper {
     aggregates.zipWithIndex.flatMap {
       case (agg: DeclarativeAggregateFunction, aggIndex) =>
         val idx = auxGrouping.length + aggIndex
-        agg.mergeExpressions.map(
-          _.accept(ResolveReference(ctx, isMerge = true, agg, idx, argsMapping, aggBufferTypes)))
+        agg.mergeExpressions.map(_.accept(ResolveReference(
+          ctx, builder, isMerge = true, agg, idx, argsMapping, aggBufferTypes)))
       case (agg: AggregateFunction[_, _], aggIndex) =>
         val idx = auxGrouping.length + aggIndex
         Some(agg, idx)
@@ -629,8 +595,8 @@ object AggCodeGenHelper {
         val aggCall = aggCallToAggFun._1
         aggCallToAggFun._2 match {
           case agg: DeclarativeAggregateFunction =>
-            agg.accumulateExpressions.map(_.accept(
-              ResolveReference(ctx, isMerge = false, agg, idx, argsMapping, aggBufferTypes)))
+            agg.accumulateExpressions.map(_.accept(ResolveReference(
+              ctx, builder, isMerge = false, agg, idx, argsMapping, aggBufferTypes)))
                 .map(e => (e, aggCall))
           case agg: AggregateFunction[_, _] =>
             val idx = auxGrouping.length + aggIndex
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
index 2fdf7b7..dec5e56 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -21,14 +21,13 @@ package org.apache.flink.table.planner.codegen.agg.batch
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.metrics.Gauge
 import org.apache.flink.table.dataformat.{BaseRow, BinaryRow, GenericRow, JoinedRow}
-import org.apache.flink.table.expressions.utils.ApiExpressionUtils
-import org.apache.flink.table.expressions.{Expression, ExpressionVisitor, FieldReferenceExpression, TypeLiteralExpression, UnresolvedCallExpression, UnresolvedReferenceExpression, ValueLiteralExpression, _}
+import org.apache.flink.table.expressions.{Expression, _}
 import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction}
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{binaryRowFieldSetAccess, binaryRowSetNull}
 import org.apache.flink.table.planner.codegen._
 import org.apache.flink.table.planner.codegen.agg.batch.AggCodeGenHelper.buildAggregateArgsMapping
 import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator
-import org.apache.flink.table.planner.expressions.{ResolvedAggInputReference, RexNodeConverter}
+import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, ResolvedAggInputReference, RexNodeConverter}
 import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.planner.plan.utils.SortUtil
 import org.apache.flink.table.runtime.generated.{NormalizedKeyComputer, RecordComparator}
@@ -37,6 +36,7 @@ import org.apache.flink.table.runtime.operators.sort.BufferedKVExternalSorter
 import org.apache.flink.table.runtime.typeutils.BinaryRowSerializer
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical.{LogicalType, RowType}
+
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.tools.RelBuilder
 
@@ -292,8 +292,8 @@ object HashAggCodeGenHelper {
       val getAggValueExprs = aggregates.zipWithIndex.map {
         case (agg: DeclarativeAggregateFunction, aggIndex) =>
           val idx = auxGrouping.length + aggIndex
-          agg.getValueExpression.accept(
-            ResolveReference(ctx, isMerge, bindRefOffset, agg, idx, argsMapping, aggBuffMapping))
+          agg.getValueExpression.accept(ResolveReference(
+            ctx, builder, isMerge, bindRefOffset, agg, idx, argsMapping, aggBuffMapping))
       }.map(_.accept(new RexNodeConverter(builder))).map(exprCodegen.generateExpression)
 
       val getValueExprs = getAuxGroupingExprs ++ getAggValueExprs
@@ -327,58 +327,28 @@ object HashAggCodeGenHelper {
     */
   private case class ResolveReference(
       ctx: CodeGeneratorContext,
+      relBuilder: RelBuilder,
       isMerge: Boolean,
       offset: Int,
       agg: DeclarativeAggregateFunction,
       aggIndex: Int,
       argsMapping: Array[Array[(Int, LogicalType)]],
-      aggBuffMapping: Array[Array[(Int, LogicalType)]]) extends ExpressionVisitor[Expression] {
-
-    override def visit(call: CallExpression): Expression = ???
-
-    override def visit(valueLiteralExpression: ValueLiteralExpression): Expression = {
-      valueLiteralExpression
-    }
-
-    override def visit(input: FieldReferenceExpression): Expression = {
-      input
-    }
-
-    override def visit(typeLiteral: TypeLiteralExpression): Expression = {
-      typeLiteral
-    }
+      aggBuffMapping: Array[Array[(Int, LogicalType)]])
+    extends DeclarativeExpressionResolver(relBuilder, agg, isMerge) {
 
-    private def visitUnresolvedCallExpression(
-        unresolvedCall: UnresolvedCallExpression): Expression = {
-      ApiExpressionUtils.unresolvedCall(
-        unresolvedCall.getFunctionDefinition,
-        unresolvedCall.getChildren.map(_.accept(this)): _*)
+    override def toMergeInputExpr(name: String, localIndex: Int): ResolvedExpression = {
+      val (inputIndex, inputType) = argsMapping(aggIndex)(localIndex)
+      new ResolvedAggInputReference(name, inputIndex, inputType)
     }
 
-    private def visitUnresolvedFieldReference(
-        input: UnresolvedReferenceExpression): Expression = {
-      agg.aggBufferAttributes.indexOf(input) match {
-        case -1 =>
-          // We always use UnresolvedFieldReference to represent reference of input field.
-          // In non-merge case, the input is operand of the aggregate function. But in merge
-          // case, the input is aggregate buffers which sent by local aggregate.
-          val localIndex =
-            if (isMerge) agg.mergeOperands.indexOf(input) else agg.operands.indexOf(input)
-          val (inputIndex, inputType) = argsMapping(aggIndex)(localIndex)
-          new ResolvedAggInputReference(input.getName, inputIndex, inputType)
-        case localIndex =>
-          val (aggBuffAttrIndex, aggBuffAttrType) = aggBuffMapping(aggIndex)(localIndex)
-          new ResolvedAggInputReference(
-            input.getName, offset + aggBuffAttrIndex, aggBuffAttrType)
-      }
+    override def toAccInputExpr(name: String, localIndex: Int): ResolvedExpression = {
+      val (inputIndex, inputType) = argsMapping(aggIndex)(localIndex)
+      new ResolvedAggInputReference(name, inputIndex, inputType)
     }
 
-    override def visit(other: Expression): Expression = {
-      other match {
-        case u : UnresolvedReferenceExpression => visitUnresolvedFieldReference(u)
-        case u : UnresolvedCallExpression => visitUnresolvedCallExpression(u)
-        case _ => other
-      }
+    override def toAggBufferExpr(name: String, localIndex: Int): ResolvedExpression = {
+      val (aggBuffAttrIndex, aggBuffAttrType) = aggBuffMapping(aggIndex)(localIndex)
+      new ResolvedAggInputReference(name, offset + aggBuffAttrIndex, aggBuffAttrType)
     }
   }
 
@@ -407,7 +377,7 @@ object HashAggCodeGenHelper {
         val bindRefOffset = inputType.getFieldCount
         agg.mergeExpressions.map(
           _.accept(ResolveReference(
-            ctx, isMerge = true, bindRefOffset, agg, idx, argsMapping, aggBuffMapping)))
+            ctx, builder, isMerge = true, bindRefOffset, agg, idx, argsMapping, aggBuffMapping)))
     }.map(_.accept(new RexNodeConverter(builder))).map(exprCodegen.generateExpression)
 
     val aggBufferTypeWithoutAuxGrouping = if (auxGrouping.nonEmpty) {
@@ -464,9 +434,8 @@ object HashAggCodeGenHelper {
         val aggCall = aggCallToAggFun._1
         aggCallToAggFun._2 match {
           case agg: DeclarativeAggregateFunction =>
-            agg.accumulateExpressions.map(
-              _.accept(ResolveReference(
-                ctx, isMerge = false, bindRefOffset, agg, idx, argsMapping, aggBuffMapping))
+            agg.accumulateExpressions.map(_.accept(ResolveReference(
+              ctx, builder, isMerge = false, bindRefOffset, agg, idx, argsMapping, aggBuffMapping))
             ).map(e => (e, aggCall))
         }
     }.map {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
index 785ac82..8a2d753 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
@@ -34,7 +34,7 @@ import org.apache.flink.table.planner.codegen._
 import org.apache.flink.table.planner.codegen.agg.batch.AggCodeGenHelper.{buildAggregateArgsMapping, genAggregateByFlatAggregateBuffer, genFlatAggBufferExprs, genInitFlatAggregateBuffer}
 import org.apache.flink.table.planner.codegen.agg.batch.WindowCodeGenerator.{asLong, isTimeIntervalLiteral}
 import org.apache.flink.table.planner.expressions.ExpressionBuilder._
-import org.apache.flink.table.planner.expressions.RexNodeConverter
+import org.apache.flink.table.planner.expressions.{CallExpressionResolver, RexNodeConverter}
 import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.getAccumulatorTypeOfAggregateFunction
 import org.apache.flink.table.planner.plan.logical.{LogicalWindow, SlidingGroupWindow, TumblingGroupWindow}
@@ -695,7 +695,7 @@ abstract class WindowCodeGenerator(
         plus(remainder, literal(slideSize)),
         remainder)),
       literal(index * slideSize))
-    exprCodegen.generateExpression(expr.accept(
+    exprCodegen.generateExpression(new CallExpressionResolver(relBuilder).resolve(expr).accept(
       new RexNodeConverter(relBuilder.values(inputRowType))))
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index a8b4c11..b6b6905 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -821,6 +821,14 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
     other match {
       // already converted planner expressions will pass this visitor without modification
       case plannerExpression: PlannerExpression => plannerExpression
+      case aggInput: ResolvedAggInputReference => PlannerResolvedAggInputReference(
+        aggInput.getName, aggInput.getIndex, fromDataTypeToTypeInfo(aggInput.getOutputDataType))
+      case aggLocal: ResolvedAggLocalReference => PlannerResolvedAggLocalReference(
+        aggLocal.getFieldTerm,
+        aggLocal.getNullTerm,
+        fromDataTypeToTypeInfo(aggLocal.getOutputDataType))
+      case key: ResolvedDistinctKeyReference => PlannerResolvedDistinctKeyReference(
+        key.getName, fromDataTypeToTypeInfo(key.getOutputDataType))
 
       case _ =>
         throw new TableException("Unrecognized expression: " + other)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
index 89e71db..2a3c621 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
@@ -19,6 +19,7 @@ package org.apache.flink.table.planner.expressions
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api._
+import org.apache.flink.table.expressions.ResolvedFieldReference
 import org.apache.flink.table.operations.QueryOperation
 import org.apache.flink.table.planner.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
@@ -228,3 +229,57 @@ case class StreamRecordTimestamp() extends LeafExpression {
 
   override private[flink] def resultType = Types.LONG
 }
+
+/**
+  * Normally we should use [[ResolvedFieldReference]] to represent an input field.
+  * [[ResolvedFieldReference]] uses name to locate the field, in aggregate case, we want to use
+  * field index.
+  */
+case class PlannerResolvedAggInputReference(
+    name: String,
+    index: Int,
+    resultType: TypeInformation[_]) extends Attribute {
+
+  override def toString = s"'$name"
+
+  override private[flink] def withName(newName: String): Attribute = {
+    if (newName == name) this
+    else PlannerResolvedAggInputReference(newName, index, resultType)
+  }
+}
+
+/**
+  * Special reference which represent a local filed, such as aggregate buffers or constants.
+  * We are stored as class members, so the field can be referenced directly.
+  * We should use an unique name to locate the field.
+  */
+case class PlannerResolvedAggLocalReference(
+    name: String,
+    nullTerm: String,
+    resultType: TypeInformation[_])
+    extends Attribute {
+
+  override def toString = s"'$name"
+
+  override private[flink] def withName(newName: String): Attribute = {
+    if (newName == name) this
+    else PlannerResolvedAggLocalReference(newName, nullTerm, resultType)
+  }
+}
+
+/**
+  * Special reference which represent a distinct key input filed,
+  * [[ResolvedDistinctKeyReference]] uses name to locate the field.
+  */
+case class PlannerResolvedDistinctKeyReference(
+    name: String,
+    resultType: TypeInformation[_])
+    extends Attribute {
+
+  override def toString = s"'$name"
+
+  override private[flink] def withName(newName: String): Attribute = {
+    if (newName == name) this
+    else PlannerResolvedDistinctKeyReference(newName, resultType)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index 929c242..ebd2863 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -30,9 +30,9 @@ import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindow
 import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.planner.dataview.DataViewUtils.useNullSerializerForStateViewFieldsFromAccType
 import org.apache.flink.table.planner.dataview.{DataViewSpec, MapViewSpec}
-import org.apache.flink.table.planner.expressions.{PlannerProctimeAttribute, PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowStart, RexNodeConverter}
+import org.apache.flink.table.planner.expressions.{PlannerProctimeAttribute, PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowStart}
 import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
-import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlListAggFunction, SqlFirstLastValueAggFunction}
+import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlFirstLastValueAggFunction, SqlListAggFunction}
 import org.apache.flink.table.planner.functions.utils.AggSqlFunction
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity
@@ -49,7 +49,6 @@ import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataTy
 
 import org.apache.calcite.rel.`type`._
 import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
-import org.apache.calcite.rex.RexInputRef
 import org.apache.calcite.sql.fun._
 import org.apache.calcite.sql.validate.SqlMonotonicity
 import org.apache.calcite.sql.{SqlKind, SqlRankFunction}
@@ -687,8 +686,7 @@ object AggregateUtil extends Enumeration {
     */
   def timeFieldIndex(
       inputType: RelDataType, relBuilder: RelBuilder, timeField: FieldReferenceExpression): Int = {
-    timeField.accept(new RexNodeConverter(relBuilder.values(inputType)))
-        .asInstanceOf[RexInputRef].getIndex
+    relBuilder.values(inputType).field(timeField.getName).getIndex
   }
 
   /**


[flink] 08/08: [FLINK-13774][table-planner-blink] Use LocalReferenceExpression and RexNodeExpression instead of blink expressions

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e263856e37627361feec578ec67841de70f415b6
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Aug 28 14:19:14 2019 +0800

    [FLINK-13774][table-planner-blink] Use LocalReferenceExpression and RexNodeExpression instead of blink expressions
    
    This closes #9484
---
 .../expressions/DeclarativeExpressionResolver.java |  23 +++++
 .../expressions/ResolvedAggInputReference.java     | 113 --------------------
 .../expressions/ResolvedAggLocalReference.java     | 114 ---------------------
 .../expressions/ResolvedDistinctKeyReference.java  | 104 -------------------
 .../planner/expressions/RexNodeConverter.java      |  47 ++-------
 .../table/planner/calcite/FlinkLocalRef.scala      |  47 ++++++---
 .../table/planner/codegen/ExprCodeGenerator.scala  |  19 ++--
 .../codegen/agg/AggsHandlerCodeGenerator.scala     |  15 +--
 .../codegen/agg/DeclarativeAggCodeGen.scala        |  69 ++++++-------
 .../planner/codegen/agg/ImperativeAggCodeGen.scala |  10 +-
 .../codegen/agg/batch/AggCodeGenHelper.scala       |  30 +++---
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   |  11 +-
 .../expressions/PlannerExpressionConverter.scala   |  15 +--
 .../planner/expressions/fieldExpression.scala      |  46 +--------
 14 files changed, 152 insertions(+), 511 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/DeclarativeExpressionResolver.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/DeclarativeExpressionResolver.java
index 6b1b429..3b6ad2d 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/DeclarativeExpressionResolver.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/DeclarativeExpressionResolver.java
@@ -23,13 +23,20 @@ import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.RexDistinctKeyVariable;
 import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction;
+import org.apache.flink.table.types.logical.LogicalType;
 
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.commons.lang3.ArrayUtils;
 
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
+
 /**
  * Abstract class to resolve the expressions in {@link DeclarativeAggregateFunction}.
  */
@@ -92,4 +99,20 @@ public abstract class DeclarativeExpressionResolver extends ExpressionDefaultVis
 	 * For aggregate buffer.
 	 */
 	public abstract ResolvedExpression toAggBufferExpr(String name, int localIndex);
+
+	public static ResolvedExpression toRexInputRef(RelBuilder builder, int i, LogicalType t) {
+		RelDataType tp = ((FlinkTypeFactory) builder.getTypeFactory())
+			.createFieldTypeFromLogicalType(t);
+		return new RexNodeExpression(new RexInputRef(i, tp), fromLogicalTypeToDataType(t));
+	}
+
+	public static ResolvedExpression toRexDistinctKey(RelBuilder builder, String name, LogicalType t) {
+		return new RexNodeExpression(
+			new RexDistinctKeyVariable(
+				name,
+				((FlinkTypeFactory) builder.getTypeFactory())
+					.createFieldTypeFromLogicalType(t),
+				t),
+			fromLogicalTypeToDataType(t));
+	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggInputReference.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggInputReference.java
deleted file mode 100644
index 2ad8177..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggInputReference.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.expressions;
-
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionVisitor;
-import org.apache.flink.table.expressions.FieldReferenceExpression;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
-
-/**
- * Normally we should use {@link FieldReferenceExpression} to represent an input field.
- * {@link FieldReferenceExpression} uses name to locate the field, in aggregate case, we want to use
- * field index.
- */
-public class ResolvedAggInputReference implements ResolvedExpression {
-
-	private final String name;
-	private final int index;
-	private final LogicalType resultType;
-
-	public ResolvedAggInputReference(String name, int index, LogicalType resultType) {
-		this.name = Preconditions.checkNotNull(name);
-		this.index = index;
-		this.resultType = resultType;
-	}
-
-	public String getName() {
-		return name;
-	}
-
-	public int getIndex() {
-		return index;
-	}
-
-	public LogicalType getResultType() {
-		return resultType;
-	}
-
-	@Override
-	public DataType getOutputDataType() {
-		return fromLogicalTypeToDataType(resultType);
-	}
-
-	@Override
-	public List<ResolvedExpression> getResolvedChildren() {
-		return Collections.emptyList();
-	}
-
-	@Override
-	public List<Expression> getChildren() {
-		return Collections.emptyList();
-	}
-
-	@Override
-	public String asSummaryString() {
-		return name;
-	}
-
-	@Override
-	public <R> R accept(ExpressionVisitor<R> visitor) {
-		return visitor.visit(this);
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		ResolvedAggInputReference that = (ResolvedAggInputReference) o;
-		return index == that.index && name.equals(that.name) && resultType.equals(that.resultType);
-	}
-
-	@Override
-	public int hashCode() {
-		int result = name.hashCode();
-		result = 31 * result + index;
-		result = 31 * result + resultType.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return asSummaryString();
-	}
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggLocalReference.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggLocalReference.java
deleted file mode 100644
index 055ed10..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggLocalReference.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.expressions;
-
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionVisitor;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
-
-/**
- * Special reference which represent a local filed, such as aggregate buffers or constants.
- * We are stored as class members, so the field can be referenced directly.
- * We should use an unique name to locate the field.
- *
- * <p>See {@link org.apache.flink.table.planner.codegen.ExprCodeGenerator#visitLocalRef}.
- */
-public class ResolvedAggLocalReference implements ResolvedExpression {
-
-	private final String fieldTerm;
-	private final String nullTerm;
-	private final LogicalType resultType;
-
-	public ResolvedAggLocalReference(String fieldTerm, String nullTerm, LogicalType resultType) {
-		this.fieldTerm = fieldTerm;
-		this.nullTerm = nullTerm;
-		this.resultType = resultType;
-	}
-
-	public String getFieldTerm() {
-		return fieldTerm;
-	}
-
-	public String getNullTerm() {
-		return nullTerm;
-	}
-
-	public LogicalType getResultType() {
-		return resultType;
-	}
-
-	@Override
-	public DataType getOutputDataType() {
-		return fromLogicalTypeToDataType(resultType);
-	}
-
-	@Override
-	public List<ResolvedExpression> getResolvedChildren() {
-		return Collections.emptyList();
-	}
-
-	@Override
-	public List<Expression> getChildren() {
-		return Collections.emptyList();
-	}
-
-	@Override
-	public String asSummaryString() {
-		return fieldTerm;
-	}
-
-	@Override
-	public <R> R accept(ExpressionVisitor<R> visitor) {
-		return visitor.visit(this);
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		ResolvedAggLocalReference that = (ResolvedAggLocalReference) o;
-
-		return fieldTerm.equals(that.fieldTerm) && nullTerm.equals(that.nullTerm) && resultType.equals(that.resultType);
-	}
-
-	@Override
-	public int hashCode() {
-		int result = fieldTerm.hashCode();
-		result = 31 * result + nullTerm.hashCode();
-		result = 31 * result + resultType.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return asSummaryString();
-	}
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedDistinctKeyReference.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedDistinctKeyReference.java
deleted file mode 100644
index d55905f..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedDistinctKeyReference.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.expressions;
-
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionVisitor;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
-
-/**
- * Resolved distinct key reference.
- */
-public class ResolvedDistinctKeyReference implements ResolvedExpression {
-
-	private final String name;
-	private final LogicalType resultType;
-
-	public ResolvedDistinctKeyReference(String name, LogicalType resultType) {
-		this.name = Preconditions.checkNotNull(name);
-		this.resultType = resultType;
-	}
-
-	public String getName() {
-		return name;
-	}
-
-	public LogicalType getResultType() {
-		return resultType;
-	}
-
-	@Override
-	public DataType getOutputDataType() {
-		return fromLogicalTypeToDataType(resultType);
-	}
-
-	@Override
-	public List<ResolvedExpression> getResolvedChildren() {
-		return Collections.emptyList();
-	}
-
-	@Override
-	public List<Expression> getChildren() {
-		return Collections.emptyList();
-	}
-
-	@Override
-	public String asSummaryString() {
-		return name;
-	}
-
-	@Override
-	public <R> R accept(ExpressionVisitor<R> visitor) {
-		return visitor.visit(this);
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		ResolvedDistinctKeyReference that = (ResolvedDistinctKeyReference) o;
-
-		return name.equals(that.name) && resultType.equals(that.resultType);
-	}
-
-	@Override
-	public int hashCode() {
-		int result = name.hashCode();
-		result = 31 * result + resultType.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return asSummaryString();
-	}
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
index 731d550..2e3159d 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
@@ -25,8 +25,8 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionUtils;
 import org.apache.flink.table.expressions.ExpressionVisitor;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.ResolvedExpressionVisitor;
 import org.apache.flink.table.expressions.TableReferenceExpression;
 import org.apache.flink.table.expressions.TimeIntervalUnit;
 import org.apache.flink.table.expressions.TimePointUnit;
@@ -46,8 +46,7 @@ import org.apache.flink.table.planner.calcite.FlinkContext;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.calcite.RexAggLocalVariable;
-import org.apache.flink.table.planner.calcite.RexDistinctKeyVariable;
+import org.apache.flink.table.planner.calcite.RexFieldVariable;
 import org.apache.flink.table.planner.functions.InternalFunctionDefinitions;
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
 import org.apache.flink.table.planner.functions.sql.SqlThrowExceptionFunction;
@@ -70,7 +69,6 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexFieldCollation;
-import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexSubQuery;
 import org.apache.calcite.rex.RexWindowBound;
@@ -118,9 +116,6 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoT
 
 /**
  * Visit expression to generator {@link RexNode}.
- *
- * <p>TODO remove blink expressions(like {@link ResolvedAggInputReference}) and use
- * {@link ResolvedExpressionVisitor}.
  */
 public class RexNodeConverter implements ExpressionVisitor<RexNode> {
 
@@ -875,43 +870,19 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> {
 
 	@Override
 	public RexNode visit(Expression other) {
-		if (other instanceof ResolvedAggInputReference) {
-			return visitResolvedAggInputReference((ResolvedAggInputReference) other);
-		} else if (other instanceof ResolvedAggLocalReference) {
-			return visitResolvedAggLocalReference((ResolvedAggLocalReference) other);
-		} else if (other instanceof ResolvedDistinctKeyReference) {
-			return visitResolvedDistinctKeyReference((ResolvedDistinctKeyReference) other);
-		} else if (other instanceof RexNodeExpression) {
+		if (other instanceof RexNodeExpression) {
 			return ((RexNodeExpression) other).getRexNode();
+		} else if (other instanceof LocalReferenceExpression) {
+			LocalReferenceExpression local = (LocalReferenceExpression) other;
+			return new RexFieldVariable(
+				local.getName(),
+				typeFactory.createFieldTypeFromLogicalType(
+					fromDataTypeToLogicalType(local.getOutputDataType())));
 		} else {
 			throw new UnsupportedOperationException(other.getClass().getSimpleName() + ":" + other.toString());
 		}
 	}
 
-	private RexNode visitResolvedAggInputReference(ResolvedAggInputReference reference) {
-		// using index to resolve field directly, name used in toString only
-		return new RexInputRef(
-				reference.getIndex(),
-				typeFactory.createFieldTypeFromLogicalType(reference.getResultType()));
-	}
-
-	private RexNode visitResolvedAggLocalReference(ResolvedAggLocalReference reference) {
-		LogicalType type = reference.getResultType();
-		return new RexAggLocalVariable(
-				reference.getFieldTerm(),
-				reference.getNullTerm(),
-				typeFactory.createFieldTypeFromLogicalType(type),
-				type);
-	}
-
-	private RexNode visitResolvedDistinctKeyReference(ResolvedDistinctKeyReference reference) {
-		LogicalType type = reference.getResultType();
-		return new RexDistinctKeyVariable(
-				reference.getName(),
-				typeFactory.createFieldTypeFromLogicalType(type),
-				type);
-	}
-
 	private RexNode createCollation(RexNode node, RelFieldCollation.Direction direction,
 			RelFieldCollation.NullDirection nullDirection, Set<SqlKind> kinds) {
 		switch (node.getKind()) {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLocalRef.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLocalRef.scala
index 74fe7d0..9e8f88e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLocalRef.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLocalRef.scala
@@ -17,33 +17,54 @@
  */
 package org.apache.flink.table.planner.calcite
 
+import org.apache.flink.table.planner.codegen.ExprCodeGenerator
 import org.apache.flink.table.types.logical.LogicalType
 
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.RexLocalRef
+import org.apache.calcite.rex.{RexBiVisitor, RexVariable, RexVisitor}
 
 /**
-  * Special reference which represent a local filed, such as aggregate buffers or constants.
+  * Special reference which represent a local field, such as aggregate buffers or constants.
   * We are stored as class members, so the field can be referenced directly.
   * We should use an unique name to locate the field.
-  *
-  * See [[org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitLocalRef()]]
   */
-case class RexAggLocalVariable(
+case class RexFieldVariable(
     fieldTerm: String,
-    nullTerm: String,
-    dataType: RelDataType,
-    internalType: LogicalType)
-  extends RexLocalRef(0, dataType)
+    dataType: RelDataType) extends RexVariable(fieldTerm, dataType) {
+  
+  override def accept[R](visitor: RexVisitor[R]): R = {
+    visitor match {
+      case gen: ExprCodeGenerator =>
+        gen.visitRexFieldVariable(this).asInstanceOf[R]
+      case _ =>
+        throw new RuntimeException("Not support visitor: " + visitor)
+    }
+  }
+  
+  override def accept[R, P](visitor: RexBiVisitor[R, P], arg: P): R = {
+    throw new RuntimeException("Not support visitor: " + visitor)
+  }
+}
 
 /**
   * Special reference which represent a distinct key input filed,
   * We use the name to locate the distinct key field.
-  *
-  * See [[org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitLocalRef()]]
   */
 case class RexDistinctKeyVariable(
     keyTerm: String,
     dataType: RelDataType,
-    internalType: LogicalType)
-  extends RexLocalRef(0, dataType)
+    internalType: LogicalType) extends RexVariable(keyTerm, dataType) {
+
+  override def accept[R](visitor: RexVisitor[R]): R = {
+    visitor match {
+      case gen: ExprCodeGenerator =>
+        gen.visitDistinctKeyVariable(this).asInstanceOf[R]
+      case _ =>
+        throw new RuntimeException("Not support visitor: " + visitor)
+    }
+  }
+
+  override def accept[R, P](visitor: RexBiVisitor[R, P], arg: P): R = {
+    throw new RuntimeException("Not support visitor: " + visitor)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 3a05fe5..34796eb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.dataformat.DataFormatConverters.{DataFormatConverter, getConverterForDataType}
 import org.apache.flink.table.dataformat._
-import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexAggLocalVariable, RexDistinctKeyVariable}
+import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexDistinctKeyVariable, RexFieldVariable}
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{requireTemporal, requireTimeInterval, _}
 import org.apache.flink.table.planner.codegen.GenerateUtils._
 import org.apache.flink.table.planner.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
@@ -396,10 +396,18 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
     GeneratedExpression(input1Term, NEVER_NULL, NO_CODE, input1Type)
   }
 
-  override def visitLocalRef(localRef: RexLocalRef): GeneratedExpression = localRef match {
-    case local: RexAggLocalVariable =>
-      GeneratedExpression(local.fieldTerm, local.nullTerm, NO_CODE, local.internalType)
-    case value: RexDistinctKeyVariable =>
+  override def visitLocalRef(localRef: RexLocalRef): GeneratedExpression =
+    throw new CodeGenException("RexLocalRef are not supported yet.")
+
+  def visitRexFieldVariable(variable: RexFieldVariable): GeneratedExpression = {
+      val internalType = FlinkTypeFactory.toLogicalType(variable.dataType)
+      val nullTerm = variable.fieldTerm + "IsNull" // not use newName, keep isNull unique.
+      ctx.addReusableMember(s"${primitiveTypeTermForType(internalType)} ${variable.fieldTerm};")
+      ctx.addReusableMember(s"boolean $nullTerm;")
+      GeneratedExpression(variable.fieldTerm, nullTerm, NO_CODE, internalType)
+  }
+
+  def visitDistinctKeyVariable(value: RexDistinctKeyVariable): GeneratedExpression = {
       val inputExpr = ctx.getReusableInputUnboxingExprs(input1Term, 0) match {
         case Some(expr) => expr
         case None =>
@@ -422,7 +430,6 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
       }
       // hide the generated code as it will be executed only once
       GeneratedExpression(inputExpr.resultTerm, inputExpr.nullTerm, NO_CODE, inputExpr.resultType)
-    case _ => throw new CodeGenException("Local variables are not supported yet.")
   }
 
   override def visitRangeRef(rangeRef: RexRangeRef): GeneratedExpression =
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
index 4b05525..9ab0b3b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
@@ -21,22 +21,23 @@ import org.apache.flink.table.api.TableException
 import org.apache.flink.table.dataformat.GenericRow
 import org.apache.flink.table.dataformat.util.BaseRowUtil
 import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.UserDefinedAggregateFunction
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{BASE_ROW, _}
 import org.apache.flink.table.planner.codegen.Indenter.toISC
 import org.apache.flink.table.planner.codegen._
 import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator._
 import org.apache.flink.table.planner.dataview.{DataViewSpec, ListViewSpec, MapViewSpec}
-import org.apache.flink.table.planner.expressions.{PlannerProctimeAttribute, PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowProperty, PlannerWindowStart, ResolvedAggInputReference}
+import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.toRexInputRef
+import org.apache.flink.table.planner.expressions.{PlannerProctimeAttribute, PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowProperty, PlannerWindowStart}
 import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.planner.plan.utils.AggregateInfoList
 import org.apache.flink.table.runtime.dataview.{StateListView, StateMapView}
-import org.apache.flink.table.runtime.generated.{AggsHandleFunction, TableAggsHandleFunction, GeneratedAggsHandleFunction, GeneratedNamespaceAggsHandleFunction, GeneratedNamespaceTableAggsHandleFunction, GeneratedTableAggsHandleFunction, NamespaceAggsHandleFunction, NamespaceTableAggsHandleFunction}
-import org.apache.flink.table.runtime.types.PlannerTypeUtils
+import org.apache.flink.table.runtime.generated.{AggsHandleFunction, GeneratedAggsHandleFunction, GeneratedNamespaceAggsHandleFunction, GeneratedNamespaceTableAggsHandleFunction, GeneratedTableAggsHandleFunction, NamespaceAggsHandleFunction, NamespaceTableAggsHandleFunction, TableAggsHandleFunction}
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
+import org.apache.flink.table.runtime.types.PlannerTypeUtils
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical.{BooleanType, IntType, LogicalType, RowType}
 import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
-import org.apache.flink.table.functions.UserDefinedAggregateFunction
 import org.apache.flink.util.Collector
 
 import org.apache.calcite.rex.RexLiteral
@@ -57,6 +58,7 @@ class AggsHandlerCodeGenerator(
   private val inputType = RowType.of(inputFieldTypes: _*)
 
   /** constant expressions that act like a second input in the parameter indices. */
+  private var constants: Seq[RexLiteral] = Seq()
   private var constantExprs: Seq[GeneratedExpression] = Seq()
 
   /** window properties like window_start and window_end, only used in window aggregates */
@@ -133,6 +135,7 @@ class AggsHandlerCodeGenerator(
     */
   def withConstants(literals: Seq[RexLiteral]): AggsHandlerCodeGenerator = {
     // create constants
+    this.constants = literals
     val exprGenerator = new ExprCodeGenerator(ctx, INPUT_NOT_NULL)
     val exprs = literals.map(exprGenerator.generateExpression)
     this.constantExprs = exprs.map(ctx.addReusableConstant(_, nullCheck = true))
@@ -222,7 +225,7 @@ class AggsHandlerCodeGenerator(
             aggBufferOffset,
             aggBufferSize,
             inputFieldTypes,
-            constantExprs,
+            constants,
             relBuilder)
         case _: UserDefinedAggregateFunction[_, _] =>
           new ImperativeAggCodeGen(
@@ -303,7 +306,7 @@ class AggsHandlerCodeGenerator(
         throw new TableException(s"filter arg must be boolean, but is $filterType, " +
             s"the aggregate is $aggName.")
       }
-      Some(new ResolvedAggInputReference(name, filterArg, inputFieldTypes(filterArg)))
+      Some(toRexInputRef(relBuilder, filterArg, inputFieldTypes(filterArg)))
     } else {
       None
     }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
index a175f64..afa4e5b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
@@ -18,15 +18,17 @@
 package org.apache.flink.table.planner.codegen.agg
 
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.planner.codegen.CodeGenUtils.primitiveTypeTermForType
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.DISTINCT_KEY_TERM
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, ExprCodeGenerator, GeneratedExpression}
-import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, ResolvedAggInputReference, ResolvedAggLocalReference, ResolvedDistinctKeyReference, RexNodeConverter}
+import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.{toRexDistinctKey, toRexInputRef}
+import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, RexNodeConverter, RexNodeExpression}
 import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.planner.plan.utils.AggregateInfo
-import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
+import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.{fromDataTypeToLogicalType, fromLogicalTypeToDataType}
 import org.apache.flink.table.types.logical.LogicalType
 
+import org.apache.calcite.rex.RexLiteral
 import org.apache.calcite.tools.RelBuilder
 
 /**
@@ -41,7 +43,7 @@ import org.apache.calcite.tools.RelBuilder
   * @param aggBufferOffset  the offset in the buffers of this aggregate
   * @param aggBufferSize  the total size of aggregate buffers
   * @param inputTypes   the input field type infos
-  * @param constantExprs  the constant expressions
+  * @param constants  the constant literals
   * @param relBuilder  the rel builder to translate expressions to calcite rex nodes
   */
 class DeclarativeAggCodeGen(
@@ -52,7 +54,7 @@ class DeclarativeAggCodeGen(
     aggBufferOffset: Int,
     aggBufferSize: Int,
     inputTypes: Seq[LogicalType],
-    constantExprs: Seq[GeneratedExpression],
+    constants: Seq[RexLiteral],
     relBuilder: RelBuilder)
   extends AggCodeGen {
 
@@ -62,14 +64,21 @@ class DeclarativeAggCodeGen(
   private val bufferIndexes = Array.range(aggBufferOffset, aggBufferOffset + bufferTypes.length)
   private val bufferTerms = function.aggBufferAttributes
       .map(a => s"agg${aggInfo.aggIndex}_${a.getName}")
-  private val bufferNullTerms = bufferTerms.map(_ + "_isNull")
+
+  private val rexNodeGen = new RexNodeConverter(relBuilder)
+
+  private val bufferNullTerms = {
+    val exprCodegen = new ExprCodeGenerator(ctx, false)
+    bufferTerms.zip(bufferTypes).map {
+      case (name, t) => new LocalReferenceExpression(name, fromLogicalTypeToDataType(t))
+    }.map(_.accept(rexNodeGen)).map(exprCodegen.generateExpression).map(_.nullTerm)
+  }
 
   private val argIndexes = aggInfo.argIndexes
   private val argTypes = {
-    val types = inputTypes ++ constantExprs.map(_.resultType)
+    val types = inputTypes ++ constants.map(t => FlinkTypeFactory.toLogicalType(t.getType))
     argIndexes.map(types(_))
   }
-  private val rexNodeGen = new RexNodeConverter(relBuilder)
 
   def createAccumulator(generator: ExprCodeGenerator): Seq[GeneratedExpression] = {
     function.initialValuesExpressions
@@ -79,21 +88,15 @@ class DeclarativeAggCodeGen(
   def setAccumulator(generator: ExprCodeGenerator): String = {
     val aggBufferAccesses = function.aggBufferAttributes.zipWithIndex
       .map { case (attr, index) =>
-        new ResolvedAggInputReference(
-          attr.getName, bufferIndexes(index), bufferTypes(index))
+        toRexInputRef(relBuilder, bufferIndexes(index), bufferTypes(index))
       }
       .map(expr => generator.generateExpression(expr.accept(rexNodeGen)))
 
     val setters = aggBufferAccesses.zipWithIndex.map {
       case (access, index) =>
-        val typeTerm = primitiveTypeTermForType(access.resultType)
-        val memberName = bufferTerms(index)
-        val memberNullTerm = bufferNullTerms(index)
-        ctx.addReusableMember(s"private $typeTerm $memberName;")
-        ctx.addReusableMember(s"private boolean $memberNullTerm;")
         s"""
-           |${access.copyResultTermToTargetIfChanged(ctx, memberName)};
-           |$memberNullTerm = ${access.nullTerm};
+           |${access.copyResultTermToTargetIfChanged(ctx, bufferTerms(index))};
+           |${bufferNullTerms(index)} = ${access.nullTerm};
          """.stripMargin
     }
 
@@ -218,8 +221,8 @@ class DeclarativeAggCodeGen(
 
     override def toMergeInputExpr(name: String, localIndex: Int): ResolvedExpression = {
       // in merge case, the input1 is mergedAcc
-      new ResolvedAggInputReference(
-        name,
+      toRexInputRef(
+        relBuilder,
         mergedAccOffset + bufferIndexes(localIndex),
         bufferTypes(localIndex))
     }
@@ -228,37 +231,31 @@ class DeclarativeAggCodeGen(
       val inputIndex = argIndexes(localIndex)
       if (inputIndex >= inputTypes.length) { // it is a constant
         val constantIndex = inputIndex - inputTypes.length
-        val constantTerm = constantExprs(constantIndex).resultTerm
-        val nullTerm = constantExprs(constantIndex).nullTerm
-        val constantType = constantExprs(constantIndex).resultType
-        // constant is reused as member variable
-        new ResolvedAggLocalReference(
-          constantTerm,
-          nullTerm,
-          constantType)
+        val constant = constants(constantIndex)
+        new RexNodeExpression(constant,
+          fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(constant.getType)))
       } else { // it is a input field
         if (isDistinctMerge) { // this is called from distinct merge
           if (function.operandCount == 1) {
             // the distinct key is a BoxedValue
-            new ResolvedDistinctKeyReference(name, argTypes(localIndex))
+            val t = argTypes(localIndex)
+            toRexDistinctKey(relBuilder, name, t)
           } else {
             // the distinct key is a BaseRow
-            new ResolvedAggInputReference(name, localIndex, argTypes(localIndex))
+            toRexInputRef(relBuilder, localIndex, argTypes(localIndex))
           }
         } else {
           // the input is the inputRow
-          new ResolvedAggInputReference(
-            name, argIndexes(localIndex), argTypes(localIndex))
+          toRexInputRef(relBuilder, argIndexes(localIndex), argTypes(localIndex))
         }
       }
     }
 
     override def toAggBufferExpr(name: String, localIndex: Int): ResolvedExpression = {
-        // it is a agg buffer.
-        val name = bufferTerms(localIndex)
-        val nullTerm = bufferNullTerms(localIndex)
-        // buffer access is reused as member variable
-        new ResolvedAggLocalReference(name, nullTerm, bufferTypes(localIndex))
+      // name => agg${aggInfo.aggIndex}_$name"
+      new LocalReferenceExpression(
+        bufferTerms(localIndex),
+        fromLogicalTypeToDataType(bufferTypes(localIndex)))
     }
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala
index 87f91c9..e5d0ea4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala
@@ -25,7 +25,8 @@ import org.apache.flink.table.planner.codegen.GenerateUtils.generateFieldAccess
 import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator._
 import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGeneratorContext, ExprCodeGenerator, GeneratedExpression}
 import org.apache.flink.table.planner.dataview.DataViewSpec
-import org.apache.flink.table.planner.expressions.{ResolvedAggInputReference, ResolvedDistinctKeyReference, RexNodeConverter}
+import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.toRexInputRef
+import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, RexNodeConverter}
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.{getAggFunctionUDIMethod, getAggUserDefinedInputTypes, getUserDefinedMethod, internalTypesToClasses, signatureToString}
 import org.apache.flink.table.planner.plan.utils.AggregateInfo
 import org.apache.flink.table.planner.utils.SingleElementIterator
@@ -282,14 +283,15 @@ class ImperativeAggCodeGen(
         val inputRef = if (generator.input1Term.startsWith(DISTINCT_KEY_TERM)) {
           if (argTypes.length == 1) {
             // called from distinct merge and the inputTerm is the only argument
-            new ResolvedDistinctKeyReference(generator.input1Term, inputTypes(f))
+            DeclarativeExpressionResolver.toRexDistinctKey(
+              relBuilder, generator.input1Term, inputTypes(f))
           } else {
             // called from distinct merge call and the inputTerm is BaseRow type
-            new ResolvedAggInputReference(f.toString, index, inputTypes(f))
+            toRexInputRef(relBuilder, index, inputTypes(f))
           }
         } else {
           // called from accumulate
-          new ResolvedAggInputReference(f.toString, f, inputTypes(f))
+          toRexInputRef(relBuilder, f, inputTypes(f))
         }
         var inputExpr = generator.generateExpression(inputRef.accept(rexNodeGen))
         if (inputFieldCopy) inputExpr = inputExpr.deepCopy(ctx)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
index d235f91..c02ae98 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
@@ -27,13 +27,14 @@ import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction}
 import org.apache.flink.table.planner.codegen.CodeGenUtils._
 import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.STREAM_RECORD
 import org.apache.flink.table.planner.codegen._
-import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, ResolvedAggInputReference, ResolvedAggLocalReference, RexNodeConverter}
+import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.toRexInputRef
+import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, RexNodeConverter}
 import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.{getAccumulatorTypeOfAggregateFunction, getAggUserDefinedInputTypes}
 import org.apache.flink.table.runtime.context.ExecutionContextImpl
 import org.apache.flink.table.runtime.generated.{GeneratedAggsHandleFunction, GeneratedOperator}
 import org.apache.flink.table.runtime.types.InternalSerializers
-import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
+import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.{fromDataTypeToLogicalType, fromLogicalTypeToDataType}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical.{LogicalType, RowType}
@@ -234,14 +235,8 @@ object AggCodeGenHelper {
     auxGroupingMapping ++ aggCallMapping
   }
 
-  def newLocalReference(
-      ctx: CodeGeneratorContext,
-      resultTerm: String,
-      resultType: LogicalType): ResolvedAggLocalReference = {
-    val nullTerm = resultTerm + "IsNull"
-    ctx.addReusableMember(s"${primitiveTypeTermForType(resultType)} $resultTerm;")
-    ctx.addReusableMember(s"boolean $nullTerm;")
-    new ResolvedAggLocalReference(resultTerm, nullTerm, resultType)
+  def newLocalReference(resultTerm: String, resultType: LogicalType): LocalReferenceExpression = {
+    new LocalReferenceExpression(resultTerm, fromLogicalTypeToDataType(resultType))
   }
 
   /**
@@ -261,18 +256,17 @@ object AggCodeGenHelper {
 
     override def toMergeInputExpr(name: String, localIndex: Int): ResolvedExpression = {
       val (inputIndex, inputType) = argsMapping(aggIndex)(localIndex)
-      new ResolvedAggInputReference(name, inputIndex, inputType)
+      toRexInputRef(relBuilder, inputIndex, inputType)
     }
 
     override def toAccInputExpr(name: String, localIndex: Int): ResolvedExpression = {
       val (inputIndex, inputType) = argsMapping(aggIndex)(localIndex)
-      new ResolvedAggInputReference(name, inputIndex, inputType)
+      toRexInputRef(relBuilder, inputIndex, inputType)
     }
 
     override def toAggBufferExpr(name: String, localIndex: Int): ResolvedExpression = {
       val variableName = s"agg${aggIndex}_$name"
-      newLocalReference(
-        ctx, variableName, aggBufferTypes(aggIndex)(localIndex))
+      newLocalReference(variableName, aggBufferTypes(aggIndex)(localIndex))
     }
   }
 
@@ -292,7 +286,7 @@ object AggCodeGenHelper {
     val converter = new RexNodeConverter(builder)
 
     val accessAuxGroupingExprs = auxGrouping.indices.map {
-      idx => newLocalReference(ctx, aggBufferNames(idx)(0), aggBufferTypes(idx)(0))
+      idx => newLocalReference(aggBufferNames(idx)(0), aggBufferTypes(idx)(0))
     }.map(_.accept(converter)).map(exprCodegen.generateExpression)
 
     val aggCallExprs = aggregates.zipWithIndex.flatMap {
@@ -303,7 +297,7 @@ object AggCodeGenHelper {
       case (_: AggregateFunction[_, _], aggIndex: Int) =>
         val idx = auxGrouping.length + aggIndex
         val variableName = aggBufferNames(idx)(0)
-        Some(newLocalReference(ctx, variableName, aggBufferTypes(idx)(0)))
+        Some(newLocalReference(variableName, aggBufferTypes(idx)(0)))
     }.map(_.accept(converter)).map(exprCodegen.generateExpression)
 
     accessAuxGroupingExprs ++ aggCallExprs
@@ -552,7 +546,7 @@ object AggCodeGenHelper {
       // UserDefinedAggregateFunction
       case ((agg: AggregateFunction[_, _], aggIndex: Int), aggBufVar) =>
         val (inputIndex, inputType) = argsMapping(aggIndex)(0)
-        val inputRef = new ResolvedAggInputReference(inputTerm, inputIndex, inputType)
+        val inputRef = toRexInputRef(builder, inputIndex, inputType)
         val inputExpr = exprCodegen.generateExpression(
           inputRef.accept(new RexNodeConverter(builder)))
         val singleIterableClass = classOf[SingleElementIterator[_]].getCanonicalName
@@ -621,7 +615,7 @@ object AggCodeGenHelper {
 
         val inputExprs = inFields.map {
           f =>
-            val inputRef = new ResolvedAggInputReference(inputTerm, f._1, f._2)
+            val inputRef = toRexInputRef(builder, f._1, f._2)
             exprCodegen.generateExpression(inputRef.accept(new RexNodeConverter(builder)))
         }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
index dec5e56..8cdaf05 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -27,7 +27,8 @@ import org.apache.flink.table.planner.codegen.CodeGenUtils.{binaryRowFieldSetAcc
 import org.apache.flink.table.planner.codegen._
 import org.apache.flink.table.planner.codegen.agg.batch.AggCodeGenHelper.buildAggregateArgsMapping
 import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator
-import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, ResolvedAggInputReference, RexNodeConverter}
+import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.toRexInputRef
+import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, RexNodeConverter}
 import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.planner.plan.utils.SortUtil
 import org.apache.flink.table.runtime.generated.{NormalizedKeyComputer, RecordComparator}
@@ -286,7 +287,7 @@ object HashAggCodeGenHelper {
       val bindRefOffset = inputType.getFieldCount
       val getAuxGroupingExprs = auxGrouping.indices.map { idx =>
         val (_, resultType) = aggBuffMapping(idx)(0)
-        new ResolvedAggInputReference("aux_group", bindRefOffset + idx, resultType)
+        toRexInputRef(builder, bindRefOffset + idx, resultType)
       }.map(_.accept(new RexNodeConverter(builder))).map(exprCodegen.generateExpression)
 
       val getAggValueExprs = aggregates.zipWithIndex.map {
@@ -338,17 +339,17 @@ object HashAggCodeGenHelper {
 
     override def toMergeInputExpr(name: String, localIndex: Int): ResolvedExpression = {
       val (inputIndex, inputType) = argsMapping(aggIndex)(localIndex)
-      new ResolvedAggInputReference(name, inputIndex, inputType)
+      toRexInputRef(relBuilder, inputIndex, inputType)
     }
 
     override def toAccInputExpr(name: String, localIndex: Int): ResolvedExpression = {
       val (inputIndex, inputType) = argsMapping(aggIndex)(localIndex)
-      new ResolvedAggInputReference(name, inputIndex, inputType)
+      toRexInputRef(relBuilder, inputIndex, inputType)
     }
 
     override def toAggBufferExpr(name: String, localIndex: Int): ResolvedExpression = {
       val (aggBuffAttrIndex, aggBuffAttrType) = aggBuffMapping(aggIndex)(localIndex)
-      new ResolvedAggInputReference(name, offset + aggBuffAttrIndex, aggBuffAttrType)
+      toRexInputRef(relBuilder, offset + aggBuffAttrIndex, aggBuffAttrType)
     }
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index b6b6905..6ce17a2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -810,9 +810,8 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
     )
   }
 
-  override def visit(localReference: LocalReferenceExpression): PlannerExpression =
-    throw new TableException(
-      "Local reference should be handled individually by a call: " + localReference)
+  override def visit(local: LocalReferenceExpression): PlannerExpression =
+    PlannerLocalReference(local.getName, fromDataTypeToTypeInfo(local.getOutputDataType))
 
   override def visit(lookupCall: LookupCallExpression): PlannerExpression =
     throw new TableException("Unsupported function call: " + lookupCall)
@@ -821,15 +820,7 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
     other match {
       // already converted planner expressions will pass this visitor without modification
       case plannerExpression: PlannerExpression => plannerExpression
-      case aggInput: ResolvedAggInputReference => PlannerResolvedAggInputReference(
-        aggInput.getName, aggInput.getIndex, fromDataTypeToTypeInfo(aggInput.getOutputDataType))
-      case aggLocal: ResolvedAggLocalReference => PlannerResolvedAggLocalReference(
-        aggLocal.getFieldTerm,
-        aggLocal.getNullTerm,
-        fromDataTypeToTypeInfo(aggLocal.getOutputDataType))
-      case key: ResolvedDistinctKeyReference => PlannerResolvedDistinctKeyReference(
-        key.getName, fromDataTypeToTypeInfo(key.getOutputDataType))
-
+      case expr: RexNodeExpression => RexPlannerExpression(expr.getRexNode)
       case _ =>
         throw new TableException("Unrecognized expression: " + other)
     }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
index 2a3c621..1a0f5b3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
@@ -19,7 +19,6 @@ package org.apache.flink.table.planner.expressions
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api._
-import org.apache.flink.table.expressions.ResolvedFieldReference
 import org.apache.flink.table.operations.QueryOperation
 import org.apache.flink.table.planner.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
@@ -231,55 +230,18 @@ case class StreamRecordTimestamp() extends LeafExpression {
 }
 
 /**
-  * Normally we should use [[ResolvedFieldReference]] to represent an input field.
-  * [[ResolvedFieldReference]] uses name to locate the field, in aggregate case, we want to use
-  * field index.
-  */
-case class PlannerResolvedAggInputReference(
-    name: String,
-    index: Int,
-    resultType: TypeInformation[_]) extends Attribute {
-
-  override def toString = s"'$name"
-
-  override private[flink] def withName(newName: String): Attribute = {
-    if (newName == name) this
-    else PlannerResolvedAggInputReference(newName, index, resultType)
-  }
-}
-
-/**
-  * Special reference which represent a local filed, such as aggregate buffers or constants.
+  * Special reference which represent a local field, such as aggregate buffers or constants.
   * We are stored as class members, so the field can be referenced directly.
   * We should use an unique name to locate the field.
   */
-case class PlannerResolvedAggLocalReference(
+case class PlannerLocalReference(
     name: String,
-    nullTerm: String,
-    resultType: TypeInformation[_])
-    extends Attribute {
-
-  override def toString = s"'$name"
-
-  override private[flink] def withName(newName: String): Attribute = {
-    if (newName == name) this
-    else PlannerResolvedAggLocalReference(newName, nullTerm, resultType)
-  }
-}
-
-/**
-  * Special reference which represent a distinct key input filed,
-  * [[ResolvedDistinctKeyReference]] uses name to locate the field.
-  */
-case class PlannerResolvedDistinctKeyReference(
-    name: String,
-    resultType: TypeInformation[_])
-    extends Attribute {
+    resultType: TypeInformation[_]) extends Attribute {
 
   override def toString = s"'$name"
 
   override private[flink] def withName(newName: String): Attribute = {
     if (newName == name) this
-    else PlannerResolvedDistinctKeyReference(newName, resultType)
+    else PlannerLocalReference(newName, resultType)
   }
 }


[flink] 05/08: [FLINK-13774][table] FieldComputer should return ResolvedExpression

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e9223939ad1efb839228032e43f0ca9d437d59f9
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Aug 22 12:52:28 2019 +0200

    [FLINK-13774][table] FieldComputer should return ResolvedExpression
---
 .../flink/table/sources/tsextractors/ExistingField.java   | 10 ++++++----
 .../flink/table/planner/sources/TableSourceUtil.scala     | 15 +++++++++------
 2 files changed, 15 insertions(+), 10 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
index 0e8d73f..f490949 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
@@ -22,18 +22,19 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.descriptors.Rowtime;
+import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ResolvedFieldReference;
 import org.apache.flink.table.types.DataType;
 
 import java.sql.Timestamp;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
 import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.typeLiteral;
-import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST;
 import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -99,10 +100,11 @@ public final class ExistingField extends TimestampExtractor {
 			case TIMESTAMP_WITHOUT_TIME_ZONE:
 				return fieldReferenceExpr;
 			case VARCHAR:
-				return unresolvedCall(
+				DataType outputType = TIMESTAMP(3).bridgedTo(Timestamp.class);
+				return new CallExpression(
 						CAST,
-						fieldReferenceExpr,
-						typeLiteral(TIMESTAMP(3).bridgedTo(Timestamp.class)));
+						Arrays.asList(fieldReferenceExpr, typeLiteral(outputType)),
+						outputType);
 			default:
 				throw new RuntimeException("Unsupport type: " + type);
 		}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
index 489e3e2..d46849f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
@@ -21,8 +21,8 @@ package org.apache.flink.table.planner.sources
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.api.{DataTypes, ValidationException}
-import org.apache.flink.table.expressions.ResolvedFieldReference
-import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, unresolvedCall, valueLiteral}
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, valueLiteral}
+import org.apache.flink.table.expressions.{CallExpression, ResolvedExpression, ResolvedFieldReference}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.expressions.RexNodeConverter
@@ -285,11 +285,14 @@ object TableSourceUtil {
       // add cast to requested type and convert expression to RexNode
       // blink runner treats numeric types as seconds in the cast of timestamp and numerical types.
       // So we use REINTERPRET_CAST to keep the mills of numeric types.
-      val castExpression = unresolvedCall(
+      val outputType = DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp])
+      val castExpression = new CallExpression(
         BuiltInFunctionDefinitions.REINTERPRET_CAST,
-        expression,
-        typeLiteral(DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp])),
-        valueLiteral(false))
+        Seq(
+          expression.asInstanceOf[ResolvedExpression],
+          typeLiteral(outputType),
+          valueLiteral(false)),
+        outputType)
       val rexExpression = castExpression.accept(new RexNodeConverter(relBuilder))
       relBuilder.clear()
       rexExpression


[flink] 01/08: [FLINK-13774][table-planner-blink] Blink extended expressions should implement ResolvedExpression

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1e9232296eaef0e1f3a8ba32b416d26143a501c5
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon Aug 19 16:21:51 2019 +0200

    [FLINK-13774][table-planner-blink] Blink extended expressions should implement ResolvedExpression
---
 .../expressions/ResolvedAggInputReference.java       | 20 +++++++++++++++++---
 .../expressions/ResolvedAggLocalReference.java       | 20 +++++++++++++++++---
 .../expressions/ResolvedDistinctKeyReference.java    | 20 +++++++++++++++++---
 3 files changed, 51 insertions(+), 9 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggInputReference.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggInputReference.java
index 5430da6..2ad8177 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggInputReference.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggInputReference.java
@@ -21,18 +21,22 @@ package org.apache.flink.table.planner.expressions;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionVisitor;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
+
 /**
  * Normally we should use {@link FieldReferenceExpression} to represent an input field.
  * {@link FieldReferenceExpression} uses name to locate the field, in aggregate case, we want to use
  * field index.
  */
-public class ResolvedAggInputReference implements Expression {
+public class ResolvedAggInputReference implements ResolvedExpression {
 
 	private final String name;
 	private final int index;
@@ -57,8 +61,13 @@ public class ResolvedAggInputReference implements Expression {
 	}
 
 	@Override
-	public String asSummaryString() {
-		return name;
+	public DataType getOutputDataType() {
+		return fromLogicalTypeToDataType(resultType);
+	}
+
+	@Override
+	public List<ResolvedExpression> getResolvedChildren() {
+		return Collections.emptyList();
 	}
 
 	@Override
@@ -67,6 +76,11 @@ public class ResolvedAggInputReference implements Expression {
 	}
 
 	@Override
+	public String asSummaryString() {
+		return name;
+	}
+
+	@Override
 	public <R> R accept(ExpressionVisitor<R> visitor) {
 		return visitor.visit(this);
 	}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggLocalReference.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggLocalReference.java
index 205c578..055ed10 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggLocalReference.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggLocalReference.java
@@ -20,11 +20,15 @@ package org.apache.flink.table.planner.expressions;
 
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
+
 /**
  * Special reference which represent a local filed, such as aggregate buffers or constants.
  * We are stored as class members, so the field can be referenced directly.
@@ -32,7 +36,7 @@ import java.util.List;
  *
  * <p>See {@link org.apache.flink.table.planner.codegen.ExprCodeGenerator#visitLocalRef}.
  */
-public class ResolvedAggLocalReference implements Expression {
+public class ResolvedAggLocalReference implements ResolvedExpression {
 
 	private final String fieldTerm;
 	private final String nullTerm;
@@ -57,8 +61,13 @@ public class ResolvedAggLocalReference implements Expression {
 	}
 
 	@Override
-	public String asSummaryString() {
-		return fieldTerm;
+	public DataType getOutputDataType() {
+		return fromLogicalTypeToDataType(resultType);
+	}
+
+	@Override
+	public List<ResolvedExpression> getResolvedChildren() {
+		return Collections.emptyList();
 	}
 
 	@Override
@@ -67,6 +76,11 @@ public class ResolvedAggLocalReference implements Expression {
 	}
 
 	@Override
+	public String asSummaryString() {
+		return fieldTerm;
+	}
+
+	@Override
 	public <R> R accept(ExpressionVisitor<R> visitor) {
 		return visitor.visit(this);
 	}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedDistinctKeyReference.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedDistinctKeyReference.java
index 8b99601..d55905f 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedDistinctKeyReference.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedDistinctKeyReference.java
@@ -20,16 +20,20 @@ package org.apache.flink.table.planner.expressions;
 
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
+
 /**
  * Resolved distinct key reference.
  */
-public class ResolvedDistinctKeyReference implements Expression {
+public class ResolvedDistinctKeyReference implements ResolvedExpression {
 
 	private final String name;
 	private final LogicalType resultType;
@@ -48,8 +52,13 @@ public class ResolvedDistinctKeyReference implements Expression {
 	}
 
 	@Override
-	public String asSummaryString() {
-		return name;
+	public DataType getOutputDataType() {
+		return fromLogicalTypeToDataType(resultType);
+	}
+
+	@Override
+	public List<ResolvedExpression> getResolvedChildren() {
+		return Collections.emptyList();
 	}
 
 	@Override
@@ -58,6 +67,11 @@ public class ResolvedDistinctKeyReference implements Expression {
 	}
 
 	@Override
+	public String asSummaryString() {
+		return name;
+	}
+
+	@Override
 	public <R> R accept(ExpressionVisitor<R> visitor) {
 		return visitor.visit(this);
 	}


[flink] 04/08: [FLINK-13774][table-planner-blink] Modify filterable table source accept ResolvedExpression

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit de22d7c0d5afd3233ab8e174ed4d837e08438ab3
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Aug 22 12:46:52 2019 +0200

    [FLINK-13774][table-planner-blink] Modify filterable table source accept ResolvedExpression
---
 .../planner/plan/utils/RexNodeExtractor.scala      | 52 ++++++++++++----------
 .../table/planner/utils/testTableSources.scala     | 12 ++---
 2 files changed, 35 insertions(+), 29 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
index f938c79..b4535bf 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.utils.Logging
 import org.apache.flink.table.runtime.functions.SqlDateTimeUtils.unixTimestampToLocalDateTime
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
+import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.util.Preconditions
 
@@ -294,9 +295,9 @@ class RexNodeToExpressionConverter(
     inputNames: Array[String],
     functionCatalog: FunctionCatalog,
     timeZone: TimeZone)
-  extends RexVisitor[Option[Expression]] {
+  extends RexVisitor[Option[ResolvedExpression]] {
 
-  override def visitInputRef(inputRef: RexInputRef): Option[Expression] = {
+  override def visitInputRef(inputRef: RexInputRef): Option[ResolvedExpression] = {
     Preconditions.checkArgument(inputRef.getIndex < inputNames.length)
     Some(new FieldReferenceExpression(
       inputNames(inputRef.getIndex),
@@ -306,14 +307,14 @@ class RexNodeToExpressionConverter(
     ))
   }
 
-  override def visitTableInputRef(rexTableInputRef: RexTableInputRef): Option[Expression] =
+  override def visitTableInputRef(rexTableInputRef: RexTableInputRef): Option[ResolvedExpression] =
     visitInputRef(rexTableInputRef)
 
-  override def visitLocalRef(localRef: RexLocalRef): Option[Expression] = {
+  override def visitLocalRef(localRef: RexLocalRef): Option[ResolvedExpression] = {
     throw new TableException("Bug: RexLocalRef should have been expanded")
   }
 
-  override def visitLiteral(literal: RexLiteral): Option[Expression] = {
+  override def visitLiteral(literal: RexLiteral): Option[ResolvedExpression] = {
     // TODO support SqlTrimFunction.Flag
     literal.getValue match {
       case _: SqlTrimFunction.Flag => return None
@@ -384,53 +385,58 @@ class RexNodeToExpressionConverter(
       fromLogicalTypeToDataType(literalType)))
   }
 
-  override def visitCall(rexCall: RexCall): Option[Expression] = {
+  override def visitCall(rexCall: RexCall): Option[ResolvedExpression] = {
     val operands = rexCall.getOperands.map(
       operand => operand.accept(this).orNull
     )
 
+    val outputType = fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(rexCall.getType))
+
     // return null if we cannot translate all the operands of the call
     if (operands.contains(null)) {
       None
     } else {
       rexCall.getOperator match {
         case SqlStdOperatorTable.OR =>
-          Option(operands.reduceLeft { (l, r) => unresolvedCall(OR, l, r) })
+          Option(operands.reduceLeft((l, r) => new CallExpression(OR, Seq(l, r), outputType)))
         case SqlStdOperatorTable.AND =>
-          Option(operands.reduceLeft { (l, r) => unresolvedCall(AND, l, r) })
+          Option(operands.reduceLeft((l, r) => new CallExpression(AND, Seq(l, r), outputType)))
         case SqlStdOperatorTable.CAST =>
-          Option(unresolvedCall(CAST, operands.head,
-            typeLiteral(fromLogicalTypeToDataType(
-              FlinkTypeFactory.toLogicalType(rexCall.getType)))))
+          Option(new CallExpression(CAST, Seq(operands.head, typeLiteral(outputType)), outputType))
         case function: SqlFunction =>
-          lookupFunction(replace(function.getName), operands)
+          lookupFunction(replace(function.getName), operands, outputType)
         case postfix: SqlPostfixOperator =>
-          lookupFunction(replace(postfix.getName), operands)
+          lookupFunction(replace(postfix.getName), operands, outputType)
         case operator@_ =>
-          lookupFunction(replace(s"${operator.getKind}"), operands)
+          lookupFunction(replace(s"${operator.getKind}"), operands, outputType)
       }
     }
   }
 
-  override def visitFieldAccess(fieldAccess: RexFieldAccess): Option[Expression] = None
+  override def visitFieldAccess(fieldAccess: RexFieldAccess): Option[ResolvedExpression] = None
 
-  override def visitCorrelVariable(correlVariable: RexCorrelVariable): Option[Expression] = None
+  override def visitCorrelVariable(
+      correlVariable: RexCorrelVariable): Option[ResolvedExpression] = None
 
-  override def visitRangeRef(rangeRef: RexRangeRef): Option[Expression] = None
+  override def visitRangeRef(rangeRef: RexRangeRef): Option[ResolvedExpression] = None
 
-  override def visitSubQuery(subQuery: RexSubQuery): Option[Expression] = None
+  override def visitSubQuery(subQuery: RexSubQuery): Option[ResolvedExpression] = None
 
-  override def visitDynamicParam(dynamicParam: RexDynamicParam): Option[Expression] = None
+  override def visitDynamicParam(dynamicParam: RexDynamicParam): Option[ResolvedExpression] = None
 
-  override def visitOver(over: RexOver): Option[Expression] = None
+  override def visitOver(over: RexOver): Option[ResolvedExpression] = None
 
-  override def visitPatternFieldRef(fieldRef: RexPatternFieldRef): Option[Expression] = None
+  override def visitPatternFieldRef(
+      fieldRef: RexPatternFieldRef): Option[ResolvedExpression] = None
 
-  private def lookupFunction(name: String, operands: Seq[Expression]): Option[Expression] = {
+  private def lookupFunction(
+      name: String,
+      operands: Seq[ResolvedExpression],
+      outputType: DataType): Option[ResolvedExpression] = {
     Try(functionCatalog.lookupFunction(name)) match {
       case Success(f: java.util.Optional[FunctionLookup.Result]) =>
         if (f.isPresent) {
-          Some(unresolvedCall(f.get().getFunctionDefinition, operands: _*))
+          Some(new CallExpression(f.get().getFunctionDefinition, operands, outputType))
         } else {
           None
         }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
index 44cb4eb..24fab42 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.{TableSchema, Types}
 import org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall
-import org.apache.flink.table.expressions.{Expression, FieldReferenceExpression, UnresolvedCallExpression, ValueLiteralExpression}
+import org.apache.flink.table.expressions.{CallExpression, Expression, FieldReferenceExpression, ValueLiteralExpression}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
@@ -398,12 +398,12 @@ class TestFilterableTableSource(
 
   private def shouldPushDown(expr: Expression): Boolean = {
     expr match {
-      case expr: UnresolvedCallExpression if expr.getChildren.size() == 2 => shouldPushDown(expr)
+      case expr: CallExpression if expr.getChildren.size() == 2 => shouldPushDown(expr)
       case _ => false
     }
   }
 
-  private def shouldPushDown(binExpr: UnresolvedCallExpression): Boolean = {
+  private def shouldPushDown(binExpr: CallExpression): Boolean = {
     val children = binExpr.getChildren
     require(children.size() == 2)
     (children.head, children.last) match {
@@ -419,13 +419,13 @@ class TestFilterableTableSource(
 
   private def shouldKeep(row: Row): Boolean = {
     filterPredicates.isEmpty || filterPredicates.forall {
-      case expr: UnresolvedCallExpression if expr.getChildren.size() == 2 =>
+      case expr: CallExpression if expr.getChildren.size() == 2 =>
         binaryFilterApplies(expr, row)
       case expr => throw new RuntimeException(expr + " not supported!")
     }
   }
 
-  private def binaryFilterApplies(binExpr: UnresolvedCallExpression, row: Row): Boolean = {
+  private def binaryFilterApplies(binExpr: CallExpression, row: Row): Boolean = {
     val children = binExpr.getChildren
     require(children.size() == 2)
     val (lhsValue, rhsValue) = extractValues(binExpr, row)
@@ -447,7 +447,7 @@ class TestFilterableTableSource(
   }
 
   private def extractValues(
-      binExpr: UnresolvedCallExpression,
+      binExpr: CallExpression,
       row: Row): (Comparable[Any], Comparable[Any]) = {
     val children = binExpr.getChildren
     require(children.size() == 2)


[flink] 07/08: [FLINK-13774][table-planner-blink] Remove unresolved expression in RexNodeConverter

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d2085a18169da7bf84fdff98c5f743a8e75b34a7
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Aug 22 12:58:01 2019 +0200

    [FLINK-13774][table-planner-blink] Remove unresolved expression in RexNodeConverter
---
 .../planner/expressions/RexNodeConverter.java      | 58 ++--------------------
 1 file changed, 5 insertions(+), 53 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
index 68b7b7f..731d550 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
@@ -31,8 +31,6 @@ import org.apache.flink.table.expressions.TableReferenceExpression;
 import org.apache.flink.table.expressions.TimeIntervalUnit;
 import org.apache.flink.table.expressions.TimePointUnit;
 import org.apache.flink.table.expressions.TypeLiteralExpression;
-import org.apache.flink.table.expressions.UnresolvedCallExpression;
-import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.expressions.utils.ApiExpressionUtils;
 import org.apache.flink.table.functions.AggregateFunction;
@@ -121,8 +119,8 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoT
 /**
  * Visit expression to generator {@link RexNode}.
  *
- * <p>TODO actually we should use {@link ResolvedExpressionVisitor} here as it is the output of the API.
- * we will update it after introduce Expression resolve in AggCodeGen.
+ * <p>TODO remove blink expressions(like {@link ResolvedAggInputReference}) and use
+ * {@link ResolvedExpressionVisitor}.
  */
 public class RexNodeConverter implements ExpressionVisitor<RexNode> {
 
@@ -356,7 +354,7 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> {
 			FunctionDefinition def = call.getFunctionDefinition();
 			if (conversionsOfBuiltInFunc.containsKey(def)) {
 				RexNodeConversion conversion = conversionsOfBuiltInFunc.get(def);
-				return conversion.convert(call);
+				return conversion.convert(call.getChildren());
 			} else {
 				throw new UnsupportedOperationException(def.toString());
 			}
@@ -877,16 +875,12 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> {
 
 	@Override
 	public RexNode visit(Expression other) {
-		if (other instanceof UnresolvedReferenceExpression) {
-			return visitUnresolvedReferenceExpression((UnresolvedReferenceExpression) other);
-		} else if (other instanceof ResolvedAggInputReference) {
+		if (other instanceof ResolvedAggInputReference) {
 			return visitResolvedAggInputReference((ResolvedAggInputReference) other);
 		} else if (other instanceof ResolvedAggLocalReference) {
 			return visitResolvedAggLocalReference((ResolvedAggLocalReference) other);
 		} else if (other instanceof ResolvedDistinctKeyReference) {
 			return visitResolvedDistinctKeyReference((ResolvedDistinctKeyReference) other);
-		} else if (other instanceof UnresolvedCallExpression) {
-			return visit((UnresolvedCallExpression) other);
 		} else if (other instanceof RexNodeExpression) {
 			return ((RexNodeExpression) other).getRexNode();
 		} else {
@@ -894,10 +888,6 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> {
 		}
 	}
 
-	private RexNode visitUnresolvedReferenceExpression(UnresolvedReferenceExpression field) {
-		return relBuilder.field(field.getName());
-	}
-
 	private RexNode visitResolvedAggInputReference(ResolvedAggInputReference reference) {
 		// using index to resolve field directly, name used in toString only
 		return new RexInputRef(
@@ -922,34 +912,6 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> {
 				type);
 	}
 
-	private RexNode visit(UnresolvedCallExpression call) {
-		FunctionDefinition func = call.getFunctionDefinition();
-		switch (func.getKind()) {
-			case SCALAR:
-				if (func instanceof ScalarFunctionDefinition) {
-					ScalarFunction scalaFunc = ((ScalarFunctionDefinition) func).getScalarFunction();
-					List<RexNode> child = convertCallChildren(call.getChildren());
-					SqlFunction sqlFunction = UserDefinedFunctionUtils.createScalarSqlFunction(
-							scalaFunc.functionIdentifier(),
-							scalaFunc.toString(),
-							scalaFunc,
-							typeFactory);
-					return relBuilder.call(sqlFunction, child);
-				} else {
-					FunctionDefinition def = call.getFunctionDefinition();
-					if (conversionsOfBuiltInFunc.containsKey(def)) {
-						RexNodeConversion conversion = conversionsOfBuiltInFunc.get(def);
-						return conversion.convert(call);
-					} else {
-						throw new UnsupportedOperationException(def.toString());
-					}
-				}
-
-			default:
-				throw new UnsupportedOperationException();
-		}
-	}
-
 	private RexNode createCollation(RexNode node, RelFieldCollation.Direction direction,
 			RelFieldCollation.NullDirection nullDirection, Set<SqlKind> kinds) {
 		switch (node.getKind()) {
@@ -1025,20 +987,10 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> {
 	}
 
 	/**
-	 * RexNodeConversion to define how to convert a {@link CallExpression} or a {@link UnresolvedCallExpression} which
+	 * RexNodeConversion to define how to convert a {@link CallExpression} which
 	 * has built-in FunctionDefinition to RexNode.
 	 */
 	private interface RexNodeConversion {
-
 		RexNode convert(List<Expression> children);
-
-		default RexNode convert(CallExpression expression) {
-			return convert(expression.getChildren());
-		}
-
-		default RexNode convert(UnresolvedCallExpression unresolvedCallExpression) {
-			return convert(unresolvedCallExpression.getChildren());
-		}
 	}
-
 }