You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/29 06:43:42 UTC
[flink] 08/08: [FLINK-13774][table-planner-blink] Use
LocalReferenceExpression and RexNodeExpression instead of blink expressions
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e263856e37627361feec578ec67841de70f415b6
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Aug 28 14:19:14 2019 +0800
[FLINK-13774][table-planner-blink] Use LocalReferenceExpression and RexNodeExpression instead of blink expressions
This closes #9484
---
.../expressions/DeclarativeExpressionResolver.java | 23 +++++
.../expressions/ResolvedAggInputReference.java | 113 --------------------
.../expressions/ResolvedAggLocalReference.java | 114 ---------------------
.../expressions/ResolvedDistinctKeyReference.java | 104 -------------------
.../planner/expressions/RexNodeConverter.java | 47 ++-------
.../table/planner/calcite/FlinkLocalRef.scala | 47 ++++++---
.../table/planner/codegen/ExprCodeGenerator.scala | 19 ++--
.../codegen/agg/AggsHandlerCodeGenerator.scala | 15 +--
.../codegen/agg/DeclarativeAggCodeGen.scala | 69 ++++++-------
.../planner/codegen/agg/ImperativeAggCodeGen.scala | 10 +-
.../codegen/agg/batch/AggCodeGenHelper.scala | 30 +++---
.../codegen/agg/batch/HashAggCodeGenHelper.scala | 11 +-
.../expressions/PlannerExpressionConverter.scala | 15 +--
.../planner/expressions/fieldExpression.scala | 46 +--------
14 files changed, 152 insertions(+), 511 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/DeclarativeExpressionResolver.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/DeclarativeExpressionResolver.java
index 6b1b429..3b6ad2d 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/DeclarativeExpressionResolver.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/DeclarativeExpressionResolver.java
@@ -23,13 +23,20 @@ import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.UnresolvedCallExpression;
import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.RexDistinctKeyVariable;
import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.tools.RelBuilder;
import org.apache.commons.lang3.ArrayUtils;
import java.util.stream.Collectors;
+import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
+
/**
* Abstract class to resolve the expressions in {@link DeclarativeAggregateFunction}.
*/
@@ -92,4 +99,20 @@ public abstract class DeclarativeExpressionResolver extends ExpressionDefaultVis
* For aggregate buffer.
*/
public abstract ResolvedExpression toAggBufferExpr(String name, int localIndex);
+
+ public static ResolvedExpression toRexInputRef(RelBuilder builder, int i, LogicalType t) {
+ RelDataType tp = ((FlinkTypeFactory) builder.getTypeFactory())
+ .createFieldTypeFromLogicalType(t);
+ return new RexNodeExpression(new RexInputRef(i, tp), fromLogicalTypeToDataType(t));
+ }
+
+ public static ResolvedExpression toRexDistinctKey(RelBuilder builder, String name, LogicalType t) {
+ return new RexNodeExpression(
+ new RexDistinctKeyVariable(
+ name,
+ ((FlinkTypeFactory) builder.getTypeFactory())
+ .createFieldTypeFromLogicalType(t),
+ t),
+ fromLogicalTypeToDataType(t));
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggInputReference.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggInputReference.java
deleted file mode 100644
index 2ad8177..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggInputReference.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.expressions;
-
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionVisitor;
-import org.apache.flink.table.expressions.FieldReferenceExpression;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
-
-/**
- * Normally we should use {@link FieldReferenceExpression} to represent an input field.
- * {@link FieldReferenceExpression} uses name to locate the field, in aggregate case, we want to use
- * field index.
- */
-public class ResolvedAggInputReference implements ResolvedExpression {
-
- private final String name;
- private final int index;
- private final LogicalType resultType;
-
- public ResolvedAggInputReference(String name, int index, LogicalType resultType) {
- this.name = Preconditions.checkNotNull(name);
- this.index = index;
- this.resultType = resultType;
- }
-
- public String getName() {
- return name;
- }
-
- public int getIndex() {
- return index;
- }
-
- public LogicalType getResultType() {
- return resultType;
- }
-
- @Override
- public DataType getOutputDataType() {
- return fromLogicalTypeToDataType(resultType);
- }
-
- @Override
- public List<ResolvedExpression> getResolvedChildren() {
- return Collections.emptyList();
- }
-
- @Override
- public List<Expression> getChildren() {
- return Collections.emptyList();
- }
-
- @Override
- public String asSummaryString() {
- return name;
- }
-
- @Override
- public <R> R accept(ExpressionVisitor<R> visitor) {
- return visitor.visit(this);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- ResolvedAggInputReference that = (ResolvedAggInputReference) o;
- return index == that.index && name.equals(that.name) && resultType.equals(that.resultType);
- }
-
- @Override
- public int hashCode() {
- int result = name.hashCode();
- result = 31 * result + index;
- result = 31 * result + resultType.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return asSummaryString();
- }
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggLocalReference.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggLocalReference.java
deleted file mode 100644
index 055ed10..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedAggLocalReference.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.expressions;
-
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionVisitor;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
-
-/**
- * Special reference which represent a local filed, such as aggregate buffers or constants.
- * We are stored as class members, so the field can be referenced directly.
- * We should use an unique name to locate the field.
- *
- * <p>See {@link org.apache.flink.table.planner.codegen.ExprCodeGenerator#visitLocalRef}.
- */
-public class ResolvedAggLocalReference implements ResolvedExpression {
-
- private final String fieldTerm;
- private final String nullTerm;
- private final LogicalType resultType;
-
- public ResolvedAggLocalReference(String fieldTerm, String nullTerm, LogicalType resultType) {
- this.fieldTerm = fieldTerm;
- this.nullTerm = nullTerm;
- this.resultType = resultType;
- }
-
- public String getFieldTerm() {
- return fieldTerm;
- }
-
- public String getNullTerm() {
- return nullTerm;
- }
-
- public LogicalType getResultType() {
- return resultType;
- }
-
- @Override
- public DataType getOutputDataType() {
- return fromLogicalTypeToDataType(resultType);
- }
-
- @Override
- public List<ResolvedExpression> getResolvedChildren() {
- return Collections.emptyList();
- }
-
- @Override
- public List<Expression> getChildren() {
- return Collections.emptyList();
- }
-
- @Override
- public String asSummaryString() {
- return fieldTerm;
- }
-
- @Override
- public <R> R accept(ExpressionVisitor<R> visitor) {
- return visitor.visit(this);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- ResolvedAggLocalReference that = (ResolvedAggLocalReference) o;
-
- return fieldTerm.equals(that.fieldTerm) && nullTerm.equals(that.nullTerm) && resultType.equals(that.resultType);
- }
-
- @Override
- public int hashCode() {
- int result = fieldTerm.hashCode();
- result = 31 * result + nullTerm.hashCode();
- result = 31 * result + resultType.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return asSummaryString();
- }
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedDistinctKeyReference.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedDistinctKeyReference.java
deleted file mode 100644
index d55905f..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ResolvedDistinctKeyReference.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.expressions;
-
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionVisitor;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
-
-/**
- * Resolved distinct key reference.
- */
-public class ResolvedDistinctKeyReference implements ResolvedExpression {
-
- private final String name;
- private final LogicalType resultType;
-
- public ResolvedDistinctKeyReference(String name, LogicalType resultType) {
- this.name = Preconditions.checkNotNull(name);
- this.resultType = resultType;
- }
-
- public String getName() {
- return name;
- }
-
- public LogicalType getResultType() {
- return resultType;
- }
-
- @Override
- public DataType getOutputDataType() {
- return fromLogicalTypeToDataType(resultType);
- }
-
- @Override
- public List<ResolvedExpression> getResolvedChildren() {
- return Collections.emptyList();
- }
-
- @Override
- public List<Expression> getChildren() {
- return Collections.emptyList();
- }
-
- @Override
- public String asSummaryString() {
- return name;
- }
-
- @Override
- public <R> R accept(ExpressionVisitor<R> visitor) {
- return visitor.visit(this);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- ResolvedDistinctKeyReference that = (ResolvedDistinctKeyReference) o;
-
- return name.equals(that.name) && resultType.equals(that.resultType);
- }
-
- @Override
- public int hashCode() {
- int result = name.hashCode();
- result = 31 * result + resultType.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return asSummaryString();
- }
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
index 731d550..2e3159d 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
@@ -25,8 +25,8 @@ import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionUtils;
import org.apache.flink.table.expressions.ExpressionVisitor;
import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.ResolvedExpressionVisitor;
import org.apache.flink.table.expressions.TableReferenceExpression;
import org.apache.flink.table.expressions.TimeIntervalUnit;
import org.apache.flink.table.expressions.TimePointUnit;
@@ -46,8 +46,7 @@ import org.apache.flink.table.planner.calcite.FlinkContext;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.calcite.RexAggLocalVariable;
-import org.apache.flink.table.planner.calcite.RexDistinctKeyVariable;
+import org.apache.flink.table.planner.calcite.RexFieldVariable;
import org.apache.flink.table.planner.functions.InternalFunctionDefinitions;
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
import org.apache.flink.table.planner.functions.sql.SqlThrowExceptionFunction;
@@ -70,7 +69,6 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexFieldCollation;
-import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexSubQuery;
import org.apache.calcite.rex.RexWindowBound;
@@ -118,9 +116,6 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoT
/**
* Visit expression to generator {@link RexNode}.
- *
- * <p>TODO remove blink expressions(like {@link ResolvedAggInputReference}) and use
- * {@link ResolvedExpressionVisitor}.
*/
public class RexNodeConverter implements ExpressionVisitor<RexNode> {
@@ -875,43 +870,19 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> {
@Override
public RexNode visit(Expression other) {
- if (other instanceof ResolvedAggInputReference) {
- return visitResolvedAggInputReference((ResolvedAggInputReference) other);
- } else if (other instanceof ResolvedAggLocalReference) {
- return visitResolvedAggLocalReference((ResolvedAggLocalReference) other);
- } else if (other instanceof ResolvedDistinctKeyReference) {
- return visitResolvedDistinctKeyReference((ResolvedDistinctKeyReference) other);
- } else if (other instanceof RexNodeExpression) {
+ if (other instanceof RexNodeExpression) {
return ((RexNodeExpression) other).getRexNode();
+ } else if (other instanceof LocalReferenceExpression) {
+ LocalReferenceExpression local = (LocalReferenceExpression) other;
+ return new RexFieldVariable(
+ local.getName(),
+ typeFactory.createFieldTypeFromLogicalType(
+ fromDataTypeToLogicalType(local.getOutputDataType())));
} else {
throw new UnsupportedOperationException(other.getClass().getSimpleName() + ":" + other.toString());
}
}
- private RexNode visitResolvedAggInputReference(ResolvedAggInputReference reference) {
- // using index to resolve field directly, name used in toString only
- return new RexInputRef(
- reference.getIndex(),
- typeFactory.createFieldTypeFromLogicalType(reference.getResultType()));
- }
-
- private RexNode visitResolvedAggLocalReference(ResolvedAggLocalReference reference) {
- LogicalType type = reference.getResultType();
- return new RexAggLocalVariable(
- reference.getFieldTerm(),
- reference.getNullTerm(),
- typeFactory.createFieldTypeFromLogicalType(type),
- type);
- }
-
- private RexNode visitResolvedDistinctKeyReference(ResolvedDistinctKeyReference reference) {
- LogicalType type = reference.getResultType();
- return new RexDistinctKeyVariable(
- reference.getName(),
- typeFactory.createFieldTypeFromLogicalType(type),
- type);
- }
-
private RexNode createCollation(RexNode node, RelFieldCollation.Direction direction,
RelFieldCollation.NullDirection nullDirection, Set<SqlKind> kinds) {
switch (node.getKind()) {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLocalRef.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLocalRef.scala
index 74fe7d0..9e8f88e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLocalRef.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLocalRef.scala
@@ -17,33 +17,54 @@
*/
package org.apache.flink.table.planner.calcite
+import org.apache.flink.table.planner.codegen.ExprCodeGenerator
import org.apache.flink.table.types.logical.LogicalType
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.RexLocalRef
+import org.apache.calcite.rex.{RexBiVisitor, RexVariable, RexVisitor}
/**
- * Special reference which represent a local filed, such as aggregate buffers or constants.
+ * Special reference which represent a local field, such as aggregate buffers or constants.
* We are stored as class members, so the field can be referenced directly.
* We should use an unique name to locate the field.
- *
- * See [[org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitLocalRef()]]
*/
-case class RexAggLocalVariable(
+case class RexFieldVariable(
fieldTerm: String,
- nullTerm: String,
- dataType: RelDataType,
- internalType: LogicalType)
- extends RexLocalRef(0, dataType)
+ dataType: RelDataType) extends RexVariable(fieldTerm, dataType) {
+
+ override def accept[R](visitor: RexVisitor[R]): R = {
+ visitor match {
+ case gen: ExprCodeGenerator =>
+ gen.visitRexFieldVariable(this).asInstanceOf[R]
+ case _ =>
+ throw new RuntimeException("Not support visitor: " + visitor)
+ }
+ }
+
+ override def accept[R, P](visitor: RexBiVisitor[R, P], arg: P): R = {
+ throw new RuntimeException("Not support visitor: " + visitor)
+ }
+}
/**
* Special reference which represent a distinct key input filed,
* We use the name to locate the distinct key field.
- *
- * See [[org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitLocalRef()]]
*/
case class RexDistinctKeyVariable(
keyTerm: String,
dataType: RelDataType,
- internalType: LogicalType)
- extends RexLocalRef(0, dataType)
+ internalType: LogicalType) extends RexVariable(keyTerm, dataType) {
+
+ override def accept[R](visitor: RexVisitor[R]): R = {
+ visitor match {
+ case gen: ExprCodeGenerator =>
+ gen.visitDistinctKeyVariable(this).asInstanceOf[R]
+ case _ =>
+ throw new RuntimeException("Not support visitor: " + visitor)
+ }
+ }
+
+ override def accept[R, P](visitor: RexBiVisitor[R, P], arg: P): R = {
+ throw new RuntimeException("Not support visitor: " + visitor)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 3a05fe5..34796eb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.table.api.TableException
import org.apache.flink.table.dataformat.DataFormatConverters.{DataFormatConverter, getConverterForDataType}
import org.apache.flink.table.dataformat._
-import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexAggLocalVariable, RexDistinctKeyVariable}
+import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexDistinctKeyVariable, RexFieldVariable}
import org.apache.flink.table.planner.codegen.CodeGenUtils.{requireTemporal, requireTimeInterval, _}
import org.apache.flink.table.planner.codegen.GenerateUtils._
import org.apache.flink.table.planner.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
@@ -396,10 +396,18 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
GeneratedExpression(input1Term, NEVER_NULL, NO_CODE, input1Type)
}
- override def visitLocalRef(localRef: RexLocalRef): GeneratedExpression = localRef match {
- case local: RexAggLocalVariable =>
- GeneratedExpression(local.fieldTerm, local.nullTerm, NO_CODE, local.internalType)
- case value: RexDistinctKeyVariable =>
+ override def visitLocalRef(localRef: RexLocalRef): GeneratedExpression =
+ throw new CodeGenException("RexLocalRef are not supported yet.")
+
+ def visitRexFieldVariable(variable: RexFieldVariable): GeneratedExpression = {
+ val internalType = FlinkTypeFactory.toLogicalType(variable.dataType)
+ val nullTerm = variable.fieldTerm + "IsNull" // not use newName, keep isNull unique.
+ ctx.addReusableMember(s"${primitiveTypeTermForType(internalType)} ${variable.fieldTerm};")
+ ctx.addReusableMember(s"boolean $nullTerm;")
+ GeneratedExpression(variable.fieldTerm, nullTerm, NO_CODE, internalType)
+ }
+
+ def visitDistinctKeyVariable(value: RexDistinctKeyVariable): GeneratedExpression = {
val inputExpr = ctx.getReusableInputUnboxingExprs(input1Term, 0) match {
case Some(expr) => expr
case None =>
@@ -422,7 +430,6 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
}
// hide the generated code as it will be executed only once
GeneratedExpression(inputExpr.resultTerm, inputExpr.nullTerm, NO_CODE, inputExpr.resultType)
- case _ => throw new CodeGenException("Local variables are not supported yet.")
}
override def visitRangeRef(rangeRef: RexRangeRef): GeneratedExpression =
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
index 4b05525..9ab0b3b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
@@ -21,22 +21,23 @@ import org.apache.flink.table.api.TableException
import org.apache.flink.table.dataformat.GenericRow
import org.apache.flink.table.dataformat.util.BaseRowUtil
import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.UserDefinedAggregateFunction
import org.apache.flink.table.planner.codegen.CodeGenUtils.{BASE_ROW, _}
import org.apache.flink.table.planner.codegen.Indenter.toISC
import org.apache.flink.table.planner.codegen._
import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator._
import org.apache.flink.table.planner.dataview.{DataViewSpec, ListViewSpec, MapViewSpec}
-import org.apache.flink.table.planner.expressions.{PlannerProctimeAttribute, PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowProperty, PlannerWindowStart, ResolvedAggInputReference}
+import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.toRexInputRef
+import org.apache.flink.table.planner.expressions.{PlannerProctimeAttribute, PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowProperty, PlannerWindowStart}
import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
import org.apache.flink.table.planner.plan.utils.AggregateInfoList
import org.apache.flink.table.runtime.dataview.{StateListView, StateMapView}
-import org.apache.flink.table.runtime.generated.{AggsHandleFunction, TableAggsHandleFunction, GeneratedAggsHandleFunction, GeneratedNamespaceAggsHandleFunction, GeneratedNamespaceTableAggsHandleFunction, GeneratedTableAggsHandleFunction, NamespaceAggsHandleFunction, NamespaceTableAggsHandleFunction}
-import org.apache.flink.table.runtime.types.PlannerTypeUtils
+import org.apache.flink.table.runtime.generated.{AggsHandleFunction, GeneratedAggsHandleFunction, GeneratedNamespaceAggsHandleFunction, GeneratedNamespaceTableAggsHandleFunction, GeneratedTableAggsHandleFunction, NamespaceAggsHandleFunction, NamespaceTableAggsHandleFunction, TableAggsHandleFunction}
import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
+import org.apache.flink.table.runtime.types.PlannerTypeUtils
import org.apache.flink.table.types.DataType
import org.apache.flink.table.types.logical.{BooleanType, IntType, LogicalType, RowType}
import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
-import org.apache.flink.table.functions.UserDefinedAggregateFunction
import org.apache.flink.util.Collector
import org.apache.calcite.rex.RexLiteral
@@ -57,6 +58,7 @@ class AggsHandlerCodeGenerator(
private val inputType = RowType.of(inputFieldTypes: _*)
/** constant expressions that act like a second input in the parameter indices. */
+ private var constants: Seq[RexLiteral] = Seq()
private var constantExprs: Seq[GeneratedExpression] = Seq()
/** window properties like window_start and window_end, only used in window aggregates */
@@ -133,6 +135,7 @@ class AggsHandlerCodeGenerator(
*/
def withConstants(literals: Seq[RexLiteral]): AggsHandlerCodeGenerator = {
// create constants
+ this.constants = literals
val exprGenerator = new ExprCodeGenerator(ctx, INPUT_NOT_NULL)
val exprs = literals.map(exprGenerator.generateExpression)
this.constantExprs = exprs.map(ctx.addReusableConstant(_, nullCheck = true))
@@ -222,7 +225,7 @@ class AggsHandlerCodeGenerator(
aggBufferOffset,
aggBufferSize,
inputFieldTypes,
- constantExprs,
+ constants,
relBuilder)
case _: UserDefinedAggregateFunction[_, _] =>
new ImperativeAggCodeGen(
@@ -303,7 +306,7 @@ class AggsHandlerCodeGenerator(
throw new TableException(s"filter arg must be boolean, but is $filterType, " +
s"the aggregate is $aggName.")
}
- Some(new ResolvedAggInputReference(name, filterArg, inputFieldTypes(filterArg)))
+ Some(toRexInputRef(relBuilder, filterArg, inputFieldTypes(filterArg)))
} else {
None
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
index a175f64..afa4e5b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
@@ -18,15 +18,17 @@
package org.apache.flink.table.planner.codegen.agg
import org.apache.flink.table.expressions._
-import org.apache.flink.table.planner.codegen.CodeGenUtils.primitiveTypeTermForType
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.DISTINCT_KEY_TERM
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, ExprCodeGenerator, GeneratedExpression}
-import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, ResolvedAggInputReference, ResolvedAggLocalReference, ResolvedDistinctKeyReference, RexNodeConverter}
+import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.{toRexDistinctKey, toRexInputRef}
+import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, RexNodeConverter, RexNodeExpression}
import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
import org.apache.flink.table.planner.plan.utils.AggregateInfo
-import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
+import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.{fromDataTypeToLogicalType, fromLogicalTypeToDataType}
import org.apache.flink.table.types.logical.LogicalType
+import org.apache.calcite.rex.RexLiteral
import org.apache.calcite.tools.RelBuilder
/**
@@ -41,7 +43,7 @@ import org.apache.calcite.tools.RelBuilder
* @param aggBufferOffset the offset in the buffers of this aggregate
* @param aggBufferSize the total size of aggregate buffers
* @param inputTypes the input field type infos
- * @param constantExprs the constant expressions
+ * @param constants the constant literals
* @param relBuilder the rel builder to translate expressions to calcite rex nodes
*/
class DeclarativeAggCodeGen(
@@ -52,7 +54,7 @@ class DeclarativeAggCodeGen(
aggBufferOffset: Int,
aggBufferSize: Int,
inputTypes: Seq[LogicalType],
- constantExprs: Seq[GeneratedExpression],
+ constants: Seq[RexLiteral],
relBuilder: RelBuilder)
extends AggCodeGen {
@@ -62,14 +64,21 @@ class DeclarativeAggCodeGen(
private val bufferIndexes = Array.range(aggBufferOffset, aggBufferOffset + bufferTypes.length)
private val bufferTerms = function.aggBufferAttributes
.map(a => s"agg${aggInfo.aggIndex}_${a.getName}")
- private val bufferNullTerms = bufferTerms.map(_ + "_isNull")
+
+ private val rexNodeGen = new RexNodeConverter(relBuilder)
+
+ private val bufferNullTerms = {
+ val exprCodegen = new ExprCodeGenerator(ctx, false)
+ bufferTerms.zip(bufferTypes).map {
+ case (name, t) => new LocalReferenceExpression(name, fromLogicalTypeToDataType(t))
+ }.map(_.accept(rexNodeGen)).map(exprCodegen.generateExpression).map(_.nullTerm)
+ }
private val argIndexes = aggInfo.argIndexes
private val argTypes = {
- val types = inputTypes ++ constantExprs.map(_.resultType)
+ val types = inputTypes ++ constants.map(t => FlinkTypeFactory.toLogicalType(t.getType))
argIndexes.map(types(_))
}
- private val rexNodeGen = new RexNodeConverter(relBuilder)
def createAccumulator(generator: ExprCodeGenerator): Seq[GeneratedExpression] = {
function.initialValuesExpressions
@@ -79,21 +88,15 @@ class DeclarativeAggCodeGen(
def setAccumulator(generator: ExprCodeGenerator): String = {
val aggBufferAccesses = function.aggBufferAttributes.zipWithIndex
.map { case (attr, index) =>
- new ResolvedAggInputReference(
- attr.getName, bufferIndexes(index), bufferTypes(index))
+ toRexInputRef(relBuilder, bufferIndexes(index), bufferTypes(index))
}
.map(expr => generator.generateExpression(expr.accept(rexNodeGen)))
val setters = aggBufferAccesses.zipWithIndex.map {
case (access, index) =>
- val typeTerm = primitiveTypeTermForType(access.resultType)
- val memberName = bufferTerms(index)
- val memberNullTerm = bufferNullTerms(index)
- ctx.addReusableMember(s"private $typeTerm $memberName;")
- ctx.addReusableMember(s"private boolean $memberNullTerm;")
s"""
- |${access.copyResultTermToTargetIfChanged(ctx, memberName)};
- |$memberNullTerm = ${access.nullTerm};
+ |${access.copyResultTermToTargetIfChanged(ctx, bufferTerms(index))};
+ |${bufferNullTerms(index)} = ${access.nullTerm};
""".stripMargin
}
@@ -218,8 +221,8 @@ class DeclarativeAggCodeGen(
override def toMergeInputExpr(name: String, localIndex: Int): ResolvedExpression = {
// in merge case, the input1 is mergedAcc
- new ResolvedAggInputReference(
- name,
+ toRexInputRef(
+ relBuilder,
mergedAccOffset + bufferIndexes(localIndex),
bufferTypes(localIndex))
}
@@ -228,37 +231,31 @@ class DeclarativeAggCodeGen(
val inputIndex = argIndexes(localIndex)
if (inputIndex >= inputTypes.length) { // it is a constant
val constantIndex = inputIndex - inputTypes.length
- val constantTerm = constantExprs(constantIndex).resultTerm
- val nullTerm = constantExprs(constantIndex).nullTerm
- val constantType = constantExprs(constantIndex).resultType
- // constant is reused as member variable
- new ResolvedAggLocalReference(
- constantTerm,
- nullTerm,
- constantType)
+ val constant = constants(constantIndex)
+ new RexNodeExpression(constant,
+ fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(constant.getType)))
} else { // it is a input field
if (isDistinctMerge) { // this is called from distinct merge
if (function.operandCount == 1) {
// the distinct key is a BoxedValue
- new ResolvedDistinctKeyReference(name, argTypes(localIndex))
+ val t = argTypes(localIndex)
+ toRexDistinctKey(relBuilder, name, t)
} else {
// the distinct key is a BaseRow
- new ResolvedAggInputReference(name, localIndex, argTypes(localIndex))
+ toRexInputRef(relBuilder, localIndex, argTypes(localIndex))
}
} else {
// the input is the inputRow
- new ResolvedAggInputReference(
- name, argIndexes(localIndex), argTypes(localIndex))
+ toRexInputRef(relBuilder, argIndexes(localIndex), argTypes(localIndex))
}
}
}
override def toAggBufferExpr(name: String, localIndex: Int): ResolvedExpression = {
- // it is a agg buffer.
- val name = bufferTerms(localIndex)
- val nullTerm = bufferNullTerms(localIndex)
- // buffer access is reused as member variable
- new ResolvedAggLocalReference(name, nullTerm, bufferTypes(localIndex))
+ // name => agg${aggInfo.aggIndex}_$name"
+ new LocalReferenceExpression(
+ bufferTerms(localIndex),
+ fromLogicalTypeToDataType(bufferTypes(localIndex)))
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala
index 87f91c9..e5d0ea4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala
@@ -25,7 +25,8 @@ import org.apache.flink.table.planner.codegen.GenerateUtils.generateFieldAccess
import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator._
import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGeneratorContext, ExprCodeGenerator, GeneratedExpression}
import org.apache.flink.table.planner.dataview.DataViewSpec
-import org.apache.flink.table.planner.expressions.{ResolvedAggInputReference, ResolvedDistinctKeyReference, RexNodeConverter}
+import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.toRexInputRef
+import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, RexNodeConverter}
import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.{getAggFunctionUDIMethod, getAggUserDefinedInputTypes, getUserDefinedMethod, internalTypesToClasses, signatureToString}
import org.apache.flink.table.planner.plan.utils.AggregateInfo
import org.apache.flink.table.planner.utils.SingleElementIterator
@@ -282,14 +283,15 @@ class ImperativeAggCodeGen(
val inputRef = if (generator.input1Term.startsWith(DISTINCT_KEY_TERM)) {
if (argTypes.length == 1) {
// called from distinct merge and the inputTerm is the only argument
- new ResolvedDistinctKeyReference(generator.input1Term, inputTypes(f))
+ DeclarativeExpressionResolver.toRexDistinctKey(
+ relBuilder, generator.input1Term, inputTypes(f))
} else {
// called from distinct merge call and the inputTerm is BaseRow type
- new ResolvedAggInputReference(f.toString, index, inputTypes(f))
+ toRexInputRef(relBuilder, index, inputTypes(f))
}
} else {
// called from accumulate
- new ResolvedAggInputReference(f.toString, f, inputTypes(f))
+ toRexInputRef(relBuilder, f, inputTypes(f))
}
var inputExpr = generator.generateExpression(inputRef.accept(rexNodeGen))
if (inputFieldCopy) inputExpr = inputExpr.deepCopy(ctx)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
index d235f91..c02ae98 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
@@ -27,13 +27,14 @@ import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction}
import org.apache.flink.table.planner.codegen.CodeGenUtils._
import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.STREAM_RECORD
import org.apache.flink.table.planner.codegen._
-import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, ResolvedAggInputReference, ResolvedAggLocalReference, RexNodeConverter}
+import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.toRexInputRef
+import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, RexNodeConverter}
import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.{getAccumulatorTypeOfAggregateFunction, getAggUserDefinedInputTypes}
import org.apache.flink.table.runtime.context.ExecutionContextImpl
import org.apache.flink.table.runtime.generated.{GeneratedAggsHandleFunction, GeneratedOperator}
import org.apache.flink.table.runtime.types.InternalSerializers
-import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
+import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.{fromDataTypeToLogicalType, fromLogicalTypeToDataType}
import org.apache.flink.table.types.DataType
import org.apache.flink.table.types.logical.LogicalTypeRoot._
import org.apache.flink.table.types.logical.{LogicalType, RowType}
@@ -234,14 +235,8 @@ object AggCodeGenHelper {
auxGroupingMapping ++ aggCallMapping
}
- def newLocalReference(
- ctx: CodeGeneratorContext,
- resultTerm: String,
- resultType: LogicalType): ResolvedAggLocalReference = {
- val nullTerm = resultTerm + "IsNull"
- ctx.addReusableMember(s"${primitiveTypeTermForType(resultType)} $resultTerm;")
- ctx.addReusableMember(s"boolean $nullTerm;")
- new ResolvedAggLocalReference(resultTerm, nullTerm, resultType)
+ def newLocalReference(resultTerm: String, resultType: LogicalType): LocalReferenceExpression = {
+ new LocalReferenceExpression(resultTerm, fromLogicalTypeToDataType(resultType))
}
/**
@@ -261,18 +256,17 @@ object AggCodeGenHelper {
override def toMergeInputExpr(name: String, localIndex: Int): ResolvedExpression = {
val (inputIndex, inputType) = argsMapping(aggIndex)(localIndex)
- new ResolvedAggInputReference(name, inputIndex, inputType)
+ toRexInputRef(relBuilder, inputIndex, inputType)
}
override def toAccInputExpr(name: String, localIndex: Int): ResolvedExpression = {
val (inputIndex, inputType) = argsMapping(aggIndex)(localIndex)
- new ResolvedAggInputReference(name, inputIndex, inputType)
+ toRexInputRef(relBuilder, inputIndex, inputType)
}
override def toAggBufferExpr(name: String, localIndex: Int): ResolvedExpression = {
val variableName = s"agg${aggIndex}_$name"
- newLocalReference(
- ctx, variableName, aggBufferTypes(aggIndex)(localIndex))
+ newLocalReference(variableName, aggBufferTypes(aggIndex)(localIndex))
}
}
@@ -292,7 +286,7 @@ object AggCodeGenHelper {
val converter = new RexNodeConverter(builder)
val accessAuxGroupingExprs = auxGrouping.indices.map {
- idx => newLocalReference(ctx, aggBufferNames(idx)(0), aggBufferTypes(idx)(0))
+ idx => newLocalReference(aggBufferNames(idx)(0), aggBufferTypes(idx)(0))
}.map(_.accept(converter)).map(exprCodegen.generateExpression)
val aggCallExprs = aggregates.zipWithIndex.flatMap {
@@ -303,7 +297,7 @@ object AggCodeGenHelper {
case (_: AggregateFunction[_, _], aggIndex: Int) =>
val idx = auxGrouping.length + aggIndex
val variableName = aggBufferNames(idx)(0)
- Some(newLocalReference(ctx, variableName, aggBufferTypes(idx)(0)))
+ Some(newLocalReference(variableName, aggBufferTypes(idx)(0)))
}.map(_.accept(converter)).map(exprCodegen.generateExpression)
accessAuxGroupingExprs ++ aggCallExprs
@@ -552,7 +546,7 @@ object AggCodeGenHelper {
// UserDefinedAggregateFunction
case ((agg: AggregateFunction[_, _], aggIndex: Int), aggBufVar) =>
val (inputIndex, inputType) = argsMapping(aggIndex)(0)
- val inputRef = new ResolvedAggInputReference(inputTerm, inputIndex, inputType)
+ val inputRef = toRexInputRef(builder, inputIndex, inputType)
val inputExpr = exprCodegen.generateExpression(
inputRef.accept(new RexNodeConverter(builder)))
val singleIterableClass = classOf[SingleElementIterator[_]].getCanonicalName
@@ -621,7 +615,7 @@ object AggCodeGenHelper {
val inputExprs = inFields.map {
f =>
- val inputRef = new ResolvedAggInputReference(inputTerm, f._1, f._2)
+ val inputRef = toRexInputRef(builder, f._1, f._2)
exprCodegen.generateExpression(inputRef.accept(new RexNodeConverter(builder)))
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
index dec5e56..8cdaf05 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -27,7 +27,8 @@ import org.apache.flink.table.planner.codegen.CodeGenUtils.{binaryRowFieldSetAcc
import org.apache.flink.table.planner.codegen._
import org.apache.flink.table.planner.codegen.agg.batch.AggCodeGenHelper.buildAggregateArgsMapping
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator
-import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, ResolvedAggInputReference, RexNodeConverter}
+import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.toRexInputRef
+import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, RexNodeConverter}
import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
import org.apache.flink.table.planner.plan.utils.SortUtil
import org.apache.flink.table.runtime.generated.{NormalizedKeyComputer, RecordComparator}
@@ -286,7 +287,7 @@ object HashAggCodeGenHelper {
val bindRefOffset = inputType.getFieldCount
val getAuxGroupingExprs = auxGrouping.indices.map { idx =>
val (_, resultType) = aggBuffMapping(idx)(0)
- new ResolvedAggInputReference("aux_group", bindRefOffset + idx, resultType)
+ toRexInputRef(builder, bindRefOffset + idx, resultType)
}.map(_.accept(new RexNodeConverter(builder))).map(exprCodegen.generateExpression)
val getAggValueExprs = aggregates.zipWithIndex.map {
@@ -338,17 +339,17 @@ object HashAggCodeGenHelper {
override def toMergeInputExpr(name: String, localIndex: Int): ResolvedExpression = {
val (inputIndex, inputType) = argsMapping(aggIndex)(localIndex)
- new ResolvedAggInputReference(name, inputIndex, inputType)
+ toRexInputRef(relBuilder, inputIndex, inputType)
}
override def toAccInputExpr(name: String, localIndex: Int): ResolvedExpression = {
val (inputIndex, inputType) = argsMapping(aggIndex)(localIndex)
- new ResolvedAggInputReference(name, inputIndex, inputType)
+ toRexInputRef(relBuilder, inputIndex, inputType)
}
override def toAggBufferExpr(name: String, localIndex: Int): ResolvedExpression = {
val (aggBuffAttrIndex, aggBuffAttrType) = aggBuffMapping(aggIndex)(localIndex)
- new ResolvedAggInputReference(name, offset + aggBuffAttrIndex, aggBuffAttrType)
+ toRexInputRef(relBuilder, offset + aggBuffAttrIndex, aggBuffAttrType)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index b6b6905..6ce17a2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -810,9 +810,8 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
)
}
- override def visit(localReference: LocalReferenceExpression): PlannerExpression =
- throw new TableException(
- "Local reference should be handled individually by a call: " + localReference)
+ override def visit(local: LocalReferenceExpression): PlannerExpression =
+ PlannerLocalReference(local.getName, fromDataTypeToTypeInfo(local.getOutputDataType))
override def visit(lookupCall: LookupCallExpression): PlannerExpression =
throw new TableException("Unsupported function call: " + lookupCall)
@@ -821,15 +820,7 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
other match {
// already converted planner expressions will pass this visitor without modification
case plannerExpression: PlannerExpression => plannerExpression
- case aggInput: ResolvedAggInputReference => PlannerResolvedAggInputReference(
- aggInput.getName, aggInput.getIndex, fromDataTypeToTypeInfo(aggInput.getOutputDataType))
- case aggLocal: ResolvedAggLocalReference => PlannerResolvedAggLocalReference(
- aggLocal.getFieldTerm,
- aggLocal.getNullTerm,
- fromDataTypeToTypeInfo(aggLocal.getOutputDataType))
- case key: ResolvedDistinctKeyReference => PlannerResolvedDistinctKeyReference(
- key.getName, fromDataTypeToTypeInfo(key.getOutputDataType))
-
+ case expr: RexNodeExpression => RexPlannerExpression(expr.getRexNode)
case _ =>
throw new TableException("Unrecognized expression: " + other)
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
index 2a3c621..1a0f5b3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
@@ -19,7 +19,6 @@ package org.apache.flink.table.planner.expressions
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api._
-import org.apache.flink.table.expressions.ResolvedFieldReference
import org.apache.flink.table.operations.QueryOperation
import org.apache.flink.table.planner.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
@@ -231,55 +230,18 @@ case class StreamRecordTimestamp() extends LeafExpression {
}
/**
- * Normally we should use [[ResolvedFieldReference]] to represent an input field.
- * [[ResolvedFieldReference]] uses name to locate the field, in aggregate case, we want to use
- * field index.
- */
-case class PlannerResolvedAggInputReference(
- name: String,
- index: Int,
- resultType: TypeInformation[_]) extends Attribute {
-
- override def toString = s"'$name"
-
- override private[flink] def withName(newName: String): Attribute = {
- if (newName == name) this
- else PlannerResolvedAggInputReference(newName, index, resultType)
- }
-}
-
-/**
- * Special reference which represent a local filed, such as aggregate buffers or constants.
+ * Special reference which represent a local field, such as aggregate buffers or constants.
* We are stored as class members, so the field can be referenced directly.
* We should use an unique name to locate the field.
*/
-case class PlannerResolvedAggLocalReference(
+case class PlannerLocalReference(
name: String,
- nullTerm: String,
- resultType: TypeInformation[_])
- extends Attribute {
-
- override def toString = s"'$name"
-
- override private[flink] def withName(newName: String): Attribute = {
- if (newName == name) this
- else PlannerResolvedAggLocalReference(newName, nullTerm, resultType)
- }
-}
-
-/**
- * Special reference which represent a distinct key input filed,
- * [[ResolvedDistinctKeyReference]] uses name to locate the field.
- */
-case class PlannerResolvedDistinctKeyReference(
- name: String,
- resultType: TypeInformation[_])
- extends Attribute {
+ resultType: TypeInformation[_]) extends Attribute {
override def toString = s"'$name"
override private[flink] def withName(newName: String): Attribute = {
if (newName == name) this
- else PlannerResolvedDistinctKeyReference(newName, resultType)
+ else PlannerLocalReference(newName, resultType)
}
}