You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/29 06:43:42 UTC

[flink] 08/08: [FLINK-13774][table-planner-blink] Use LocalReferenceExpression and RexNodeExpression instead of blink expressions

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e263856e37627361feec578ec67841de70f415b6
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Aug 28 14:19:14 2019 +0800

    [FLINK-13774][table-planner-blink] Use LocalReferenceExpression and RexNodeExpression instead of blink expressions
    
    This closes #9484
---
 .../expressions/DeclarativeExpressionResolver.java |  23 +++++
 .../expressions/ResolvedAggInputReference.java     | 113 --------------------
 .../expressions/ResolvedAggLocalReference.java     | 114 ---------------------
 .../expressions/ResolvedDistinctKeyReference.java  | 104 -------------------
 .../planner/expressions/RexNodeConverter.java      |  47 ++-------
 .../table/planner/calcite/FlinkLocalRef.scala      |  47 ++++++---
 .../table/planner/codegen/ExprCodeGenerator.scala  |  19 ++--
 .../codegen/agg/AggsHandlerCodeGenerator.scala     |  15 +--
 .../codegen/agg/DeclarativeAggCodeGen.scala        |  69 ++++++-------
 .../planner/codegen/agg/ImperativeAggCodeGen.scala |  10 +-
 .../codegen/agg/batch/AggCodeGenHelper.scala       |  30 +++---
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   |  11 +-
 .../expressions/PlannerExpressionConverter.scala   |  15 +--
 .../planner/expressions/fieldExpression.scala      |  46 +--------
 14 files changed, 152 insertions(+), 511 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/DeclarativeExpressionResolver.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/DeclarativeExpressionResolver.java
index 6b1b429..3b6ad2d 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/DeclarativeExpressionResolver.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/DeclarativeExpressionResolver.java
@@ -23,13 +23,20 @@ import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.RexDistinctKeyVariable;
 import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction;
+import org.apache.flink.table.types.logical.LogicalType;
 
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.commons.lang3.ArrayUtils;
 
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
+
 /**
  * Abstract class to resolve the expressions in {@link DeclarativeAggregateFunction}.
  */
@@ -92,4 +99,20 @@ public abstract class DeclarativeExpressionResolver extends ExpressionDefaultVis
 	 * For aggregate buffer.
 	 */
 	public abstract ResolvedExpression toAggBufferExpr(String name, int localIndex);
+
+	public static ResolvedExpression toRexInputRef(RelBuilder builder, int i, LogicalType t) {
+		RelDataType tp = ((FlinkTypeFactory) builder.getTypeFactory())
+			.createFieldTypeFromLogicalType(t);
+		return new RexNodeExpression(new RexInputRef(i, tp), fromLogicalTypeToDataType(t));
+	}
+
+	public static ResolvedExpression toRexDistinctKey(RelBuilder builder, String name, LogicalType t) {
+		return new RexNodeExpression(
+			new RexDistinctKeyVariable(
+				name,
+				((FlinkTypeFactory) builder.getTypeFactory())
+					.createFieldTypeFromLogicalType(t),
+				t),
+			fromLogicalTypeToDataType(t));
+	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggInputReference.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggInputReference.java
deleted file mode 100644
index 2ad8177..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggInputReference.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.expressions;
-
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionVisitor;
-import org.apache.flink.table.expressions.FieldReferenceExpression;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
-
-/**
- * Normally we should use {@link FieldReferenceExpression} to represent an input field.
- * {@link FieldReferenceExpression} uses name to locate the field, in aggregate case, we want to use
- * field index.
- */
-public class ResolvedAggInputReference implements ResolvedExpression {
-
-	private final String name;
-	private final int index;
-	private final LogicalType resultType;
-
-	public ResolvedAggInputReference(String name, int index, LogicalType resultType) {
-		this.name = Preconditions.checkNotNull(name);
-		this.index = index;
-		this.resultType = resultType;
-	}
-
-	public String getName() {
-		return name;
-	}
-
-	public int getIndex() {
-		return index;
-	}
-
-	public LogicalType getResultType() {
-		return resultType;
-	}
-
-	@Override
-	public DataType getOutputDataType() {
-		return fromLogicalTypeToDataType(resultType);
-	}
-
-	@Override
-	public List<ResolvedExpression> getResolvedChildren() {
-		return Collections.emptyList();
-	}
-
-	@Override
-	public List<Expression> getChildren() {
-		return Collections.emptyList();
-	}
-
-	@Override
-	public String asSummaryString() {
-		return name;
-	}
-
-	@Override
-	public <R> R accept(ExpressionVisitor<R> visitor) {
-		return visitor.visit(this);
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		ResolvedAggInputReference that = (ResolvedAggInputReference) o;
-		return index == that.index && name.equals(that.name) && resultType.equals(that.resultType);
-	}
-
-	@Override
-	public int hashCode() {
-		int result = name.hashCode();
-		result = 31 * result + index;
-		result = 31 * result + resultType.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return asSummaryString();
-	}
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggLocalReference.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggLocalReference.java
deleted file mode 100644
index 055ed10..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggLocalReference.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.expressions;
-
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionVisitor;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
-
-/**
- * Special reference which represent a local filed, such as aggregate buffers or constants.
- * We are stored as class members, so the field can be referenced directly.
- * We should use an unique name to locate the field.
- *
- * <p>See {@link org.apache.flink.table.planner.codegen.ExprCodeGenerator#visitLocalRef}.
- */
-public class ResolvedAggLocalReference implements ResolvedExpression {
-
-	private final String fieldTerm;
-	private final String nullTerm;
-	private final LogicalType resultType;
-
-	public ResolvedAggLocalReference(String fieldTerm, String nullTerm, LogicalType resultType) {
-		this.fieldTerm = fieldTerm;
-		this.nullTerm = nullTerm;
-		this.resultType = resultType;
-	}
-
-	public String getFieldTerm() {
-		return fieldTerm;
-	}
-
-	public String getNullTerm() {
-		return nullTerm;
-	}
-
-	public LogicalType getResultType() {
-		return resultType;
-	}
-
-	@Override
-	public DataType getOutputDataType() {
-		return fromLogicalTypeToDataType(resultType);
-	}
-
-	@Override
-	public List<ResolvedExpression> getResolvedChildren() {
-		return Collections.emptyList();
-	}
-
-	@Override
-	public List<Expression> getChildren() {
-		return Collections.emptyList();
-	}
-
-	@Override
-	public String asSummaryString() {
-		return fieldTerm;
-	}
-
-	@Override
-	public <R> R accept(ExpressionVisitor<R> visitor) {
-		return visitor.visit(this);
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		ResolvedAggLocalReference that = (ResolvedAggLocalReference) o;
-
-		return fieldTerm.equals(that.fieldTerm) && nullTerm.equals(that.nullTerm) && resultType.equals(that.resultType);
-	}
-
-	@Override
-	public int hashCode() {
-		int result = fieldTerm.hashCode();
-		result = 31 * result + nullTerm.hashCode();
-		result = 31 * result + resultType.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return asSummaryString();
-	}
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedDistinctKeyReference.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedDistinctKeyReference.java
deleted file mode 100644
index d55905f..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedDistinctKeyReference.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.expressions;
-
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionVisitor;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
-
-/**
- * Resolved distinct key reference.
- */
-public class ResolvedDistinctKeyReference implements ResolvedExpression {
-
-	private final String name;
-	private final LogicalType resultType;
-
-	public ResolvedDistinctKeyReference(String name, LogicalType resultType) {
-		this.name = Preconditions.checkNotNull(name);
-		this.resultType = resultType;
-	}
-
-	public String getName() {
-		return name;
-	}
-
-	public LogicalType getResultType() {
-		return resultType;
-	}
-
-	@Override
-	public DataType getOutputDataType() {
-		return fromLogicalTypeToDataType(resultType);
-	}
-
-	@Override
-	public List<ResolvedExpression> getResolvedChildren() {
-		return Collections.emptyList();
-	}
-
-	@Override
-	public List<Expression> getChildren() {
-		return Collections.emptyList();
-	}
-
-	@Override
-	public String asSummaryString() {
-		return name;
-	}
-
-	@Override
-	public <R> R accept(ExpressionVisitor<R> visitor) {
-		return visitor.visit(this);
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		ResolvedDistinctKeyReference that = (ResolvedDistinctKeyReference) o;
-
-		return name.equals(that.name) && resultType.equals(that.resultType);
-	}
-
-	@Override
-	public int hashCode() {
-		int result = name.hashCode();
-		result = 31 * result + resultType.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return asSummaryString();
-	}
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
index 731d550..2e3159d 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
@@ -25,8 +25,8 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionUtils;
 import org.apache.flink.table.expressions.ExpressionVisitor;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.ResolvedExpressionVisitor;
 import org.apache.flink.table.expressions.TableReferenceExpression;
 import org.apache.flink.table.expressions.TimeIntervalUnit;
 import org.apache.flink.table.expressions.TimePointUnit;
@@ -46,8 +46,7 @@ import org.apache.flink.table.planner.calcite.FlinkContext;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.calcite.RexAggLocalVariable;
-import org.apache.flink.table.planner.calcite.RexDistinctKeyVariable;
+import org.apache.flink.table.planner.calcite.RexFieldVariable;
 import org.apache.flink.table.planner.functions.InternalFunctionDefinitions;
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
 import org.apache.flink.table.planner.functions.sql.SqlThrowExceptionFunction;
@@ -70,7 +69,6 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexFieldCollation;
-import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexSubQuery;
 import org.apache.calcite.rex.RexWindowBound;
@@ -118,9 +116,6 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoT
 
 /**
  * Visit expression to generator {@link RexNode}.
- *
- * <p>TODO remove blink expressions(like {@link ResolvedAggInputReference}) and use
- * {@link ResolvedExpressionVisitor}.
  */
 public class RexNodeConverter implements ExpressionVisitor<RexNode> {
 
@@ -875,43 +870,19 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> {
 
 	@Override
 	public RexNode visit(Expression other) {
-		if (other instanceof ResolvedAggInputReference) {
-			return visitResolvedAggInputReference((ResolvedAggInputReference) other);
-		} else if (other instanceof ResolvedAggLocalReference) {
-			return visitResolvedAggLocalReference((ResolvedAggLocalReference) other);
-		} else if (other instanceof ResolvedDistinctKeyReference) {
-			return visitResolvedDistinctKeyReference((ResolvedDistinctKeyReference) other);
-		} else if (other instanceof RexNodeExpression) {
+		if (other instanceof RexNodeExpression) {
 			return ((RexNodeExpression) other).getRexNode();
+		} else if (other instanceof LocalReferenceExpression) {
+			LocalReferenceExpression local = (LocalReferenceExpression) other;
+			return new RexFieldVariable(
+				local.getName(),
+				typeFactory.createFieldTypeFromLogicalType(
+					fromDataTypeToLogicalType(local.getOutputDataType())));
 		} else {
 			throw new UnsupportedOperationException(other.getClass().getSimpleName() + ":" + other.toString());
 		}
 	}
 
-	private RexNode visitResolvedAggInputReference(ResolvedAggInputReference reference) {
-		// using index to resolve field directly, name used in toString only
-		return new RexInputRef(
-				reference.getIndex(),
-				typeFactory.createFieldTypeFromLogicalType(reference.getResultType()));
-	}
-
-	private RexNode visitResolvedAggLocalReference(ResolvedAggLocalReference reference) {
-		LogicalType type = reference.getResultType();
-		return new RexAggLocalVariable(
-				reference.getFieldTerm(),
-				reference.getNullTerm(),
-				typeFactory.createFieldTypeFromLogicalType(type),
-				type);
-	}
-
-	private RexNode visitResolvedDistinctKeyReference(ResolvedDistinctKeyReference reference) {
-		LogicalType type = reference.getResultType();
-		return new RexDistinctKeyVariable(
-				reference.getName(),
-				typeFactory.createFieldTypeFromLogicalType(type),
-				type);
-	}
-
 	private RexNode createCollation(RexNode node, RelFieldCollation.Direction direction,
 			RelFieldCollation.NullDirection nullDirection, Set<SqlKind> kinds) {
 		switch (node.getKind()) {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLocalRef.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLocalRef.scala
index 74fe7d0..9e8f88e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLocalRef.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLocalRef.scala
@@ -17,33 +17,54 @@
  */
 package org.apache.flink.table.planner.calcite
 
+import org.apache.flink.table.planner.codegen.ExprCodeGenerator
 import org.apache.flink.table.types.logical.LogicalType
 
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.RexLocalRef
+import org.apache.calcite.rex.{RexBiVisitor, RexVariable, RexVisitor}
 
 /**
-  * Special reference which represent a local filed, such as aggregate buffers or constants.
+  * Special reference which represent a local field, such as aggregate buffers or constants.
   * We are stored as class members, so the field can be referenced directly.
   * We should use an unique name to locate the field.
-  *
-  * See [[org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitLocalRef()]]
   */
-case class RexAggLocalVariable(
+case class RexFieldVariable(
     fieldTerm: String,
-    nullTerm: String,
-    dataType: RelDataType,
-    internalType: LogicalType)
-  extends RexLocalRef(0, dataType)
+    dataType: RelDataType) extends RexVariable(fieldTerm, dataType) {
+  
+  override def accept[R](visitor: RexVisitor[R]): R = {
+    visitor match {
+      case gen: ExprCodeGenerator =>
+        gen.visitRexFieldVariable(this).asInstanceOf[R]
+      case _ =>
+        throw new RuntimeException("Not support visitor: " + visitor)
+    }
+  }
+  
+  override def accept[R, P](visitor: RexBiVisitor[R, P], arg: P): R = {
+    throw new RuntimeException("Not support visitor: " + visitor)
+  }
+}
 
 /**
   * Special reference which represent a distinct key input filed,
   * We use the name to locate the distinct key field.
-  *
-  * See [[org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitLocalRef()]]
   */
 case class RexDistinctKeyVariable(
     keyTerm: String,
     dataType: RelDataType,
-    internalType: LogicalType)
-  extends RexLocalRef(0, dataType)
+    internalType: LogicalType) extends RexVariable(keyTerm, dataType) {
+
+  override def accept[R](visitor: RexVisitor[R]): R = {
+    visitor match {
+      case gen: ExprCodeGenerator =>
+        gen.visitDistinctKeyVariable(this).asInstanceOf[R]
+      case _ =>
+        throw new RuntimeException("Not support visitor: " + visitor)
+    }
+  }
+
+  override def accept[R, P](visitor: RexBiVisitor[R, P], arg: P): R = {
+    throw new RuntimeException("Not support visitor: " + visitor)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 3a05fe5..34796eb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.dataformat.DataFormatConverters.{DataFormatConverter, getConverterForDataType}
 import org.apache.flink.table.dataformat._
-import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexAggLocalVariable, RexDistinctKeyVariable}
+import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexDistinctKeyVariable, RexFieldVariable}
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{requireTemporal, requireTimeInterval, _}
 import org.apache.flink.table.planner.codegen.GenerateUtils._
 import org.apache.flink.table.planner.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
@@ -396,10 +396,18 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
     GeneratedExpression(input1Term, NEVER_NULL, NO_CODE, input1Type)
   }
 
-  override def visitLocalRef(localRef: RexLocalRef): GeneratedExpression = localRef match {
-    case local: RexAggLocalVariable =>
-      GeneratedExpression(local.fieldTerm, local.nullTerm, NO_CODE, local.internalType)
-    case value: RexDistinctKeyVariable =>
+  override def visitLocalRef(localRef: RexLocalRef): GeneratedExpression =
+    throw new CodeGenException("RexLocalRef are not supported yet.")
+
+  def visitRexFieldVariable(variable: RexFieldVariable): GeneratedExpression = {
+      val internalType = FlinkTypeFactory.toLogicalType(variable.dataType)
+      val nullTerm = variable.fieldTerm + "IsNull" // not use newName, keep isNull unique.
+      ctx.addReusableMember(s"${primitiveTypeTermForType(internalType)} ${variable.fieldTerm};")
+      ctx.addReusableMember(s"boolean $nullTerm;")
+      GeneratedExpression(variable.fieldTerm, nullTerm, NO_CODE, internalType)
+  }
+
+  def visitDistinctKeyVariable(value: RexDistinctKeyVariable): GeneratedExpression = {
       val inputExpr = ctx.getReusableInputUnboxingExprs(input1Term, 0) match {
         case Some(expr) => expr
         case None =>
@@ -422,7 +430,6 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
       }
       // hide the generated code as it will be executed only once
       GeneratedExpression(inputExpr.resultTerm, inputExpr.nullTerm, NO_CODE, inputExpr.resultType)
-    case _ => throw new CodeGenException("Local variables are not supported yet.")
   }
 
   override def visitRangeRef(rangeRef: RexRangeRef): GeneratedExpression =
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
index 4b05525..9ab0b3b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
@@ -21,22 +21,23 @@ import org.apache.flink.table.api.TableException
 import org.apache.flink.table.dataformat.GenericRow
 import org.apache.flink.table.dataformat.util.BaseRowUtil
 import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.UserDefinedAggregateFunction
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{BASE_ROW, _}
 import org.apache.flink.table.planner.codegen.Indenter.toISC
 import org.apache.flink.table.planner.codegen._
 import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator._
 import org.apache.flink.table.planner.dataview.{DataViewSpec, ListViewSpec, MapViewSpec}
-import org.apache.flink.table.planner.expressions.{PlannerProctimeAttribute, PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowProperty, PlannerWindowStart, ResolvedAggInputReference}
+import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.toRexInputRef
+import org.apache.flink.table.planner.expressions.{PlannerProctimeAttribute, PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowProperty, PlannerWindowStart}
 import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.planner.plan.utils.AggregateInfoList
 import org.apache.flink.table.runtime.dataview.{StateListView, StateMapView}
-import org.apache.flink.table.runtime.generated.{AggsHandleFunction, TableAggsHandleFunction, GeneratedAggsHandleFunction, GeneratedNamespaceAggsHandleFunction, GeneratedNamespaceTableAggsHandleFunction, GeneratedTableAggsHandleFunction, NamespaceAggsHandleFunction, NamespaceTableAggsHandleFunction}
-import org.apache.flink.table.runtime.types.PlannerTypeUtils
+import org.apache.flink.table.runtime.generated.{AggsHandleFunction, GeneratedAggsHandleFunction, GeneratedNamespaceAggsHandleFunction, GeneratedNamespaceTableAggsHandleFunction, GeneratedTableAggsHandleFunction, NamespaceAggsHandleFunction, NamespaceTableAggsHandleFunction, TableAggsHandleFunction}
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
+import org.apache.flink.table.runtime.types.PlannerTypeUtils
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical.{BooleanType, IntType, LogicalType, RowType}
 import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
-import org.apache.flink.table.functions.UserDefinedAggregateFunction
 import org.apache.flink.util.Collector
 
 import org.apache.calcite.rex.RexLiteral
@@ -57,6 +58,7 @@ class AggsHandlerCodeGenerator(
   private val inputType = RowType.of(inputFieldTypes: _*)
 
   /** constant expressions that act like a second input in the parameter indices. */
+  private var constants: Seq[RexLiteral] = Seq()
   private var constantExprs: Seq[GeneratedExpression] = Seq()
 
   /** window properties like window_start and window_end, only used in window aggregates */
@@ -133,6 +135,7 @@ class AggsHandlerCodeGenerator(
     */
   def withConstants(literals: Seq[RexLiteral]): AggsHandlerCodeGenerator = {
     // create constants
+    this.constants = literals
     val exprGenerator = new ExprCodeGenerator(ctx, INPUT_NOT_NULL)
     val exprs = literals.map(exprGenerator.generateExpression)
     this.constantExprs = exprs.map(ctx.addReusableConstant(_, nullCheck = true))
@@ -222,7 +225,7 @@ class AggsHandlerCodeGenerator(
             aggBufferOffset,
             aggBufferSize,
             inputFieldTypes,
-            constantExprs,
+            constants,
             relBuilder)
         case _: UserDefinedAggregateFunction[_, _] =>
           new ImperativeAggCodeGen(
@@ -303,7 +306,7 @@ class AggsHandlerCodeGenerator(
         throw new TableException(s"filter arg must be boolean, but is $filterType, " +
             s"the aggregate is $aggName.")
       }
-      Some(new ResolvedAggInputReference(name, filterArg, inputFieldTypes(filterArg)))
+      Some(toRexInputRef(relBuilder, filterArg, inputFieldTypes(filterArg)))
     } else {
       None
     }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
index a175f64..afa4e5b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
@@ -18,15 +18,17 @@
 package org.apache.flink.table.planner.codegen.agg
 
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.planner.codegen.CodeGenUtils.primitiveTypeTermForType
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.DISTINCT_KEY_TERM
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, ExprCodeGenerator, GeneratedExpression}
-import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, ResolvedAggInputReference, ResolvedAggLocalReference, ResolvedDistinctKeyReference, RexNodeConverter}
+import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.{toRexDistinctKey, toRexInputRef}
+import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, RexNodeConverter, RexNodeExpression}
 import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.planner.plan.utils.AggregateInfo
-import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
+import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.{fromDataTypeToLogicalType, fromLogicalTypeToDataType}
 import org.apache.flink.table.types.logical.LogicalType
 
+import org.apache.calcite.rex.RexLiteral
 import org.apache.calcite.tools.RelBuilder
 
 /**
@@ -41,7 +43,7 @@ import org.apache.calcite.tools.RelBuilder
   * @param aggBufferOffset  the offset in the buffers of this aggregate
   * @param aggBufferSize  the total size of aggregate buffers
   * @param inputTypes   the input field type infos
-  * @param constantExprs  the constant expressions
+  * @param constants  the constant literals
   * @param relBuilder  the rel builder to translate expressions to calcite rex nodes
   */
 class DeclarativeAggCodeGen(
@@ -52,7 +54,7 @@ class DeclarativeAggCodeGen(
     aggBufferOffset: Int,
     aggBufferSize: Int,
     inputTypes: Seq[LogicalType],
-    constantExprs: Seq[GeneratedExpression],
+    constants: Seq[RexLiteral],
     relBuilder: RelBuilder)
   extends AggCodeGen {
 
@@ -62,14 +64,21 @@ class DeclarativeAggCodeGen(
   private val bufferIndexes = Array.range(aggBufferOffset, aggBufferOffset + bufferTypes.length)
   private val bufferTerms = function.aggBufferAttributes
       .map(a => s"agg${aggInfo.aggIndex}_${a.getName}")
-  private val bufferNullTerms = bufferTerms.map(_ + "_isNull")
+
+  private val rexNodeGen = new RexNodeConverter(relBuilder)
+
+  private val bufferNullTerms = {
+    val exprCodegen = new ExprCodeGenerator(ctx, false)
+    bufferTerms.zip(bufferTypes).map {
+      case (name, t) => new LocalReferenceExpression(name, fromLogicalTypeToDataType(t))
+    }.map(_.accept(rexNodeGen)).map(exprCodegen.generateExpression).map(_.nullTerm)
+  }
 
   private val argIndexes = aggInfo.argIndexes
   private val argTypes = {
-    val types = inputTypes ++ constantExprs.map(_.resultType)
+    val types = inputTypes ++ constants.map(t => FlinkTypeFactory.toLogicalType(t.getType))
     argIndexes.map(types(_))
   }
-  private val rexNodeGen = new RexNodeConverter(relBuilder)
 
   def createAccumulator(generator: ExprCodeGenerator): Seq[GeneratedExpression] = {
     function.initialValuesExpressions
@@ -79,21 +88,15 @@ class DeclarativeAggCodeGen(
   def setAccumulator(generator: ExprCodeGenerator): String = {
     val aggBufferAccesses = function.aggBufferAttributes.zipWithIndex
       .map { case (attr, index) =>
-        new ResolvedAggInputReference(
-          attr.getName, bufferIndexes(index), bufferTypes(index))
+        toRexInputRef(relBuilder, bufferIndexes(index), bufferTypes(index))
       }
       .map(expr => generator.generateExpression(expr.accept(rexNodeGen)))
 
     val setters = aggBufferAccesses.zipWithIndex.map {
       case (access, index) =>
-        val typeTerm = primitiveTypeTermForType(access.resultType)
-        val memberName = bufferTerms(index)
-        val memberNullTerm = bufferNullTerms(index)
-        ctx.addReusableMember(s"private $typeTerm $memberName;")
-        ctx.addReusableMember(s"private boolean $memberNullTerm;")
         s"""
-           |${access.copyResultTermToTargetIfChanged(ctx, memberName)};
-           |$memberNullTerm = ${access.nullTerm};
+           |${access.copyResultTermToTargetIfChanged(ctx, bufferTerms(index))};
+           |${bufferNullTerms(index)} = ${access.nullTerm};
          """.stripMargin
     }
 
@@ -218,8 +221,8 @@ class DeclarativeAggCodeGen(
 
     override def toMergeInputExpr(name: String, localIndex: Int): ResolvedExpression = {
       // in merge case, the input1 is mergedAcc
-      new ResolvedAggInputReference(
-        name,
+      toRexInputRef(
+        relBuilder,
         mergedAccOffset + bufferIndexes(localIndex),
         bufferTypes(localIndex))
     }
@@ -228,37 +231,31 @@ class DeclarativeAggCodeGen(
       val inputIndex = argIndexes(localIndex)
       if (inputIndex >= inputTypes.length) { // it is a constant
         val constantIndex = inputIndex - inputTypes.length
-        val constantTerm = constantExprs(constantIndex).resultTerm
-        val nullTerm = constantExprs(constantIndex).nullTerm
-        val constantType = constantExprs(constantIndex).resultType
-        // constant is reused as member variable
-        new ResolvedAggLocalReference(
-          constantTerm,
-          nullTerm,
-          constantType)
+        val constant = constants(constantIndex)
+        new RexNodeExpression(constant,
+          fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(constant.getType)))
       } else { // it is a input field
         if (isDistinctMerge) { // this is called from distinct merge
           if (function.operandCount == 1) {
             // the distinct key is a BoxedValue
-            new ResolvedDistinctKeyReference(name, argTypes(localIndex))
+            val t = argTypes(localIndex)
+            toRexDistinctKey(relBuilder, name, t)
           } else {
             // the distinct key is a BaseRow
-            new ResolvedAggInputReference(name, localIndex, argTypes(localIndex))
+            toRexInputRef(relBuilder, localIndex, argTypes(localIndex))
           }
         } else {
           // the input is the inputRow
-          new ResolvedAggInputReference(
-            name, argIndexes(localIndex), argTypes(localIndex))
+          toRexInputRef(relBuilder, argIndexes(localIndex), argTypes(localIndex))
         }
       }
     }
 
     override def toAggBufferExpr(name: String, localIndex: Int): ResolvedExpression = {
-        // it is a agg buffer.
-        val name = bufferTerms(localIndex)
-        val nullTerm = bufferNullTerms(localIndex)
-        // buffer access is reused as member variable
-        new ResolvedAggLocalReference(name, nullTerm, bufferTypes(localIndex))
+      // name => agg${aggInfo.aggIndex}_$name"
+      new LocalReferenceExpression(
+        bufferTerms(localIndex),
+        fromLogicalTypeToDataType(bufferTypes(localIndex)))
     }
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala
index 87f91c9..e5d0ea4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala
@@ -25,7 +25,8 @@ import org.apache.flink.table.planner.codegen.GenerateUtils.generateFieldAccess
 import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator._
 import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGeneratorContext, ExprCodeGenerator, GeneratedExpression}
 import org.apache.flink.table.planner.dataview.DataViewSpec
-import org.apache.flink.table.planner.expressions.{ResolvedAggInputReference, ResolvedDistinctKeyReference, RexNodeConverter}
+import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.toRexInputRef
+import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, RexNodeConverter}
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.{getAggFunctionUDIMethod, getAggUserDefinedInputTypes, getUserDefinedMethod, internalTypesToClasses, signatureToString}
 import org.apache.flink.table.planner.plan.utils.AggregateInfo
 import org.apache.flink.table.planner.utils.SingleElementIterator
@@ -282,14 +283,15 @@ class ImperativeAggCodeGen(
         val inputRef = if (generator.input1Term.startsWith(DISTINCT_KEY_TERM)) {
           if (argTypes.length == 1) {
             // called from distinct merge and the inputTerm is the only argument
-            new ResolvedDistinctKeyReference(generator.input1Term, inputTypes(f))
+            DeclarativeExpressionResolver.toRexDistinctKey(
+              relBuilder, generator.input1Term, inputTypes(f))
           } else {
             // called from distinct merge call and the inputTerm is BaseRow type
-            new ResolvedAggInputReference(f.toString, index, inputTypes(f))
+            toRexInputRef(relBuilder, index, inputTypes(f))
           }
         } else {
           // called from accumulate
-          new ResolvedAggInputReference(f.toString, f, inputTypes(f))
+          toRexInputRef(relBuilder, f, inputTypes(f))
         }
         var inputExpr = generator.generateExpression(inputRef.accept(rexNodeGen))
         if (inputFieldCopy) inputExpr = inputExpr.deepCopy(ctx)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
index d235f91..c02ae98 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
@@ -27,13 +27,14 @@ import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction}
 import org.apache.flink.table.planner.codegen.CodeGenUtils._
 import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.STREAM_RECORD
 import org.apache.flink.table.planner.codegen._
-import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, ResolvedAggInputReference, ResolvedAggLocalReference, RexNodeConverter}
+import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.toRexInputRef
+import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, RexNodeConverter}
 import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.{getAccumulatorTypeOfAggregateFunction, getAggUserDefinedInputTypes}
 import org.apache.flink.table.runtime.context.ExecutionContextImpl
 import org.apache.flink.table.runtime.generated.{GeneratedAggsHandleFunction, GeneratedOperator}
 import org.apache.flink.table.runtime.types.InternalSerializers
-import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
+import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.{fromDataTypeToLogicalType, fromLogicalTypeToDataType}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical.{LogicalType, RowType}
@@ -234,14 +235,8 @@ object AggCodeGenHelper {
     auxGroupingMapping ++ aggCallMapping
   }
 
-  def newLocalReference(
-      ctx: CodeGeneratorContext,
-      resultTerm: String,
-      resultType: LogicalType): ResolvedAggLocalReference = {
-    val nullTerm = resultTerm + "IsNull"
-    ctx.addReusableMember(s"${primitiveTypeTermForType(resultType)} $resultTerm;")
-    ctx.addReusableMember(s"boolean $nullTerm;")
-    new ResolvedAggLocalReference(resultTerm, nullTerm, resultType)
+  def newLocalReference(resultTerm: String, resultType: LogicalType): LocalReferenceExpression = {
+    new LocalReferenceExpression(resultTerm, fromLogicalTypeToDataType(resultType))
   }
 
   /**
@@ -261,18 +256,17 @@ object AggCodeGenHelper {
 
     override def toMergeInputExpr(name: String, localIndex: Int): ResolvedExpression = {
       val (inputIndex, inputType) = argsMapping(aggIndex)(localIndex)
-      new ResolvedAggInputReference(name, inputIndex, inputType)
+      toRexInputRef(relBuilder, inputIndex, inputType)
     }
 
     override def toAccInputExpr(name: String, localIndex: Int): ResolvedExpression = {
       val (inputIndex, inputType) = argsMapping(aggIndex)(localIndex)
-      new ResolvedAggInputReference(name, inputIndex, inputType)
+      toRexInputRef(relBuilder, inputIndex, inputType)
     }
 
     override def toAggBufferExpr(name: String, localIndex: Int): ResolvedExpression = {
       val variableName = s"agg${aggIndex}_$name"
-      newLocalReference(
-        ctx, variableName, aggBufferTypes(aggIndex)(localIndex))
+      newLocalReference(variableName, aggBufferTypes(aggIndex)(localIndex))
     }
   }
 
@@ -292,7 +286,7 @@ object AggCodeGenHelper {
     val converter = new RexNodeConverter(builder)
 
     val accessAuxGroupingExprs = auxGrouping.indices.map {
-      idx => newLocalReference(ctx, aggBufferNames(idx)(0), aggBufferTypes(idx)(0))
+      idx => newLocalReference(aggBufferNames(idx)(0), aggBufferTypes(idx)(0))
     }.map(_.accept(converter)).map(exprCodegen.generateExpression)
 
     val aggCallExprs = aggregates.zipWithIndex.flatMap {
@@ -303,7 +297,7 @@ object AggCodeGenHelper {
       case (_: AggregateFunction[_, _], aggIndex: Int) =>
         val idx = auxGrouping.length + aggIndex
         val variableName = aggBufferNames(idx)(0)
-        Some(newLocalReference(ctx, variableName, aggBufferTypes(idx)(0)))
+        Some(newLocalReference(variableName, aggBufferTypes(idx)(0)))
     }.map(_.accept(converter)).map(exprCodegen.generateExpression)
 
     accessAuxGroupingExprs ++ aggCallExprs
@@ -552,7 +546,7 @@ object AggCodeGenHelper {
       // UserDefinedAggregateFunction
       case ((agg: AggregateFunction[_, _], aggIndex: Int), aggBufVar) =>
         val (inputIndex, inputType) = argsMapping(aggIndex)(0)
-        val inputRef = new ResolvedAggInputReference(inputTerm, inputIndex, inputType)
+        val inputRef = toRexInputRef(builder, inputIndex, inputType)
         val inputExpr = exprCodegen.generateExpression(
           inputRef.accept(new RexNodeConverter(builder)))
         val singleIterableClass = classOf[SingleElementIterator[_]].getCanonicalName
@@ -621,7 +615,7 @@ object AggCodeGenHelper {
 
         val inputExprs = inFields.map {
           f =>
-            val inputRef = new ResolvedAggInputReference(inputTerm, f._1, f._2)
+            val inputRef = toRexInputRef(builder, f._1, f._2)
             exprCodegen.generateExpression(inputRef.accept(new RexNodeConverter(builder)))
         }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
index dec5e56..8cdaf05 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -27,7 +27,8 @@ import org.apache.flink.table.planner.codegen.CodeGenUtils.{binaryRowFieldSetAcc
 import org.apache.flink.table.planner.codegen._
 import org.apache.flink.table.planner.codegen.agg.batch.AggCodeGenHelper.buildAggregateArgsMapping
 import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator
-import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, ResolvedAggInputReference, RexNodeConverter}
+import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.toRexInputRef
+import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, RexNodeConverter}
 import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.planner.plan.utils.SortUtil
 import org.apache.flink.table.runtime.generated.{NormalizedKeyComputer, RecordComparator}
@@ -286,7 +287,7 @@ object HashAggCodeGenHelper {
       val bindRefOffset = inputType.getFieldCount
       val getAuxGroupingExprs = auxGrouping.indices.map { idx =>
         val (_, resultType) = aggBuffMapping(idx)(0)
-        new ResolvedAggInputReference("aux_group", bindRefOffset + idx, resultType)
+        toRexInputRef(builder, bindRefOffset + idx, resultType)
       }.map(_.accept(new RexNodeConverter(builder))).map(exprCodegen.generateExpression)
 
       val getAggValueExprs = aggregates.zipWithIndex.map {
@@ -338,17 +339,17 @@ object HashAggCodeGenHelper {
 
     override def toMergeInputExpr(name: String, localIndex: Int): ResolvedExpression = {
       val (inputIndex, inputType) = argsMapping(aggIndex)(localIndex)
-      new ResolvedAggInputReference(name, inputIndex, inputType)
+      toRexInputRef(relBuilder, inputIndex, inputType)
     }
 
     override def toAccInputExpr(name: String, localIndex: Int): ResolvedExpression = {
       val (inputIndex, inputType) = argsMapping(aggIndex)(localIndex)
-      new ResolvedAggInputReference(name, inputIndex, inputType)
+      toRexInputRef(relBuilder, inputIndex, inputType)
     }
 
     override def toAggBufferExpr(name: String, localIndex: Int): ResolvedExpression = {
       val (aggBuffAttrIndex, aggBuffAttrType) = aggBuffMapping(aggIndex)(localIndex)
-      new ResolvedAggInputReference(name, offset + aggBuffAttrIndex, aggBuffAttrType)
+      toRexInputRef(relBuilder, offset + aggBuffAttrIndex, aggBuffAttrType)
     }
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index b6b6905..6ce17a2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -810,9 +810,8 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
     )
   }
 
-  override def visit(localReference: LocalReferenceExpression): PlannerExpression =
-    throw new TableException(
-      "Local reference should be handled individually by a call: " + localReference)
+  override def visit(local: LocalReferenceExpression): PlannerExpression =
+    PlannerLocalReference(local.getName, fromDataTypeToTypeInfo(local.getOutputDataType))
 
   override def visit(lookupCall: LookupCallExpression): PlannerExpression =
     throw new TableException("Unsupported function call: " + lookupCall)
@@ -821,15 +820,7 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
     other match {
       // already converted planner expressions will pass this visitor without modification
       case plannerExpression: PlannerExpression => plannerExpression
-      case aggInput: ResolvedAggInputReference => PlannerResolvedAggInputReference(
-        aggInput.getName, aggInput.getIndex, fromDataTypeToTypeInfo(aggInput.getOutputDataType))
-      case aggLocal: ResolvedAggLocalReference => PlannerResolvedAggLocalReference(
-        aggLocal.getFieldTerm,
-        aggLocal.getNullTerm,
-        fromDataTypeToTypeInfo(aggLocal.getOutputDataType))
-      case key: ResolvedDistinctKeyReference => PlannerResolvedDistinctKeyReference(
-        key.getName, fromDataTypeToTypeInfo(key.getOutputDataType))
-
+      case expr: RexNodeExpression => RexPlannerExpression(expr.getRexNode)
       case _ =>
         throw new TableException("Unrecognized expression: " + other)
     }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
index 2a3c621..1a0f5b3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
@@ -19,7 +19,6 @@ package org.apache.flink.table.planner.expressions
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api._
-import org.apache.flink.table.expressions.ResolvedFieldReference
 import org.apache.flink.table.operations.QueryOperation
 import org.apache.flink.table.planner.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
@@ -231,55 +230,18 @@ case class StreamRecordTimestamp() extends LeafExpression {
 }
 
 /**
-  * Normally we should use [[ResolvedFieldReference]] to represent an input field.
-  * [[ResolvedFieldReference]] uses name to locate the field, in aggregate case, we want to use
-  * field index.
-  */
-case class PlannerResolvedAggInputReference(
-    name: String,
-    index: Int,
-    resultType: TypeInformation[_]) extends Attribute {
-
-  override def toString = s"'$name"
-
-  override private[flink] def withName(newName: String): Attribute = {
-    if (newName == name) this
-    else PlannerResolvedAggInputReference(newName, index, resultType)
-  }
-}
-
-/**
-  * Special reference which represent a local filed, such as aggregate buffers or constants.
+  * Special reference which represent a local field, such as aggregate buffers or constants.
   * We are stored as class members, so the field can be referenced directly.
   * We should use an unique name to locate the field.
   */
-case class PlannerResolvedAggLocalReference(
+case class PlannerLocalReference(
     name: String,
-    nullTerm: String,
-    resultType: TypeInformation[_])
-    extends Attribute {
-
-  override def toString = s"'$name"
-
-  override private[flink] def withName(newName: String): Attribute = {
-    if (newName == name) this
-    else PlannerResolvedAggLocalReference(newName, nullTerm, resultType)
-  }
-}
-
-/**
-  * Special reference which represent a distinct key input filed,
-  * [[ResolvedDistinctKeyReference]] uses name to locate the field.
-  */
-case class PlannerResolvedDistinctKeyReference(
-    name: String,
-    resultType: TypeInformation[_])
-    extends Attribute {
+    resultType: TypeInformation[_]) extends Attribute {
 
   override def toString = s"'$name"
 
   override private[flink] def withName(newName: String): Attribute = {
     if (newName == name) this
-    else PlannerResolvedDistinctKeyReference(newName, resultType)
+    else PlannerLocalReference(newName, resultType)
   }
 }