You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/24 12:26:23 UTC
[flink] branch release-1.9 updated:
[FLINK-13378][table-planner-blink] Fix SINGLE_VALUE is not correctly
supported in blink planner
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 353dd8d [FLINK-13378][table-planner-blink] Fix SINGLE_VALUE is not correctly supported in blink planner
353dd8d is described below
commit 353dd8d8087f12fe155537acc8887831cc776c9b
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Tue Jul 23 21:28:26 2019 +0800
[FLINK-13378][table-planner-blink] Fix SINGLE_VALUE is not correctly supported in blink planner
This closes #9208
---
.../planner/expressions/ExpressionBuilder.java | 2 +-
.../planner/expressions/RexNodeConverter.java | 11 ++++++++++
.../aggfunctions/SingleValueAggFunction.java | 2 +-
.../functions/sql/FlinkSqlOperatorTable.java | 5 -----
.../functions/sql/SqlThrowExceptionFunction.java | 9 ++++----
.../table/planner/codegen/ExprCodeGenerator.scala | 9 +++++---
.../expressions/PlannerExpressionConverter.scala | 9 ++++++++
.../flink/table/planner/expressions/call.scala | 20 ++++++++++++++++-
.../runtime/batch/sql/join/ScalarQueryITCase.scala | 25 +++++++++++++++++++++-
9 files changed, 75 insertions(+), 17 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ExpressionBuilder.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ExpressionBuilder.java
index 512b775..d96e698 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ExpressionBuilder.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ExpressionBuilder.java
@@ -140,6 +140,6 @@ public class ExpressionBuilder {
}
public static Expression throwException(String msg, DataType type) {
- return call(THROW_EXCEPTION, typeLiteral(type));
+ return call(THROW_EXCEPTION, literal(msg), typeLiteral(type));
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
index e562fed..a3d85e7 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
@@ -49,8 +49,11 @@ import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.calcite.RexAggLocalVariable;
import org.apache.flink.table.planner.calcite.RexDistinctKeyVariable;
+import org.apache.flink.table.planner.functions.InternalFunctionDefinitions;
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.planner.functions.sql.SqlThrowExceptionFunction;
import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -305,6 +308,14 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> {
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.SHA1, exprs -> convert(FlinkSqlOperatorTable.SHA1, exprs));
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.STREAM_RECORD_TIMESTAMP,
exprs -> convert(FlinkSqlOperatorTable.STREAMRECORD_TIMESTAMP, exprs));
+
+ // blink expression
+ conversionsOfBuiltInFunc.put(InternalFunctionDefinitions.THROW_EXCEPTION, exprs -> {
+ DataType type = ((TypeLiteralExpression) exprs.get(1)).getOutputDataType();
+ return convert(new SqlThrowExceptionFunction(
+ typeFactory.createFieldTypeFromLogicalType(fromDataTypeToLogicalType(type))),
+ exprs.subList(0, 1));
+ });
}
@Override
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
index 110f9d9..865c0c2 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
@@ -76,7 +76,7 @@ public abstract class SingleValueAggFunction extends DeclarativeAggregateFunctio
@Override
public Expression[] accumulateExpressions() {
return new Expression[] {
- /* value = count == 0 ? exception : operand(0) */
+ /* value = count > 0 ? exception : operand(0) */
ifThenElse(greaterThan(count, ZERO),
throwException(ERROR_MSG, getResultType()),
operand(0)),
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index b2fd438..50ff193 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -955,11 +955,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
*/
public static final SqlIncrSumAggFunction INCR_SUM = new SqlIncrSumAggFunction();
- /**
- * <code>THROW_EXCEPTION</code> scalar function. Only internal used.
- */
- public static final SqlFunction THROW_EXCEPTION = new SqlThrowExceptionFunction();
-
// -----------------------------------------------------------------------------
// Window SQL functions
// -----------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlThrowExceptionFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlThrowExceptionFunction.java
index c767b82..4547a36 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlThrowExceptionFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlThrowExceptionFunction.java
@@ -18,24 +18,23 @@
package org.apache.flink.table.planner.functions.sql;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.OperandTypes;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.type.SqlTypeFamily;
/**
* Function used to throw an exception, only used internally.
*/
public class SqlThrowExceptionFunction extends SqlFunction {
- public SqlThrowExceptionFunction() {
+ public SqlThrowExceptionFunction(RelDataType returnType) {
super(
"THROW_EXCEPTION",
SqlKind.OTHER_FUNCTION,
- ReturnTypes.ARG0_NULLABLE,
+ opBinding -> returnType,
null,
- OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.ANY),
+ OperandTypes.STRING,
SqlFunctionCategory.USER_DEFINED_FUNCTION);
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 3e1a5d3..e641708 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.planner.codegen.GeneratedExpression.{NEVER_NULL, N
import org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens._
import org.apache.flink.table.planner.codegen.calls.{FunctionGenerator, ScalarFunctionCallGen, StringCallGen, TableFunctionCallGen}
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable._
+import org.apache.flink.table.planner.functions.sql.SqlThrowExceptionFunction
import org.apache.flink.table.planner.functions.utils.{ScalarSqlFunction, TableSqlFunction}
import org.apache.flink.table.runtime.types.PlannerTypeUtils.isInteroperable
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils
@@ -717,14 +718,16 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
case STREAMRECORD_TIMESTAMP =>
generateRowtimeAccess(ctx, contextTerm)
- case THROW_EXCEPTION =>
+ case _: SqlThrowExceptionFunction =>
+ val nullValue = generateNullLiteral(resultType, nullCheck = true)
val code =
s"""
|${operands.map(_.code).mkString("\n")}
+ |${nullValue.code}
|org.apache.flink.util.ExceptionUtils.rethrow(
- | new RuntimeException(${operands(1).resultTerm}.toString()));
+ | new RuntimeException(${operands.head.resultTerm}.toString()));
|""".stripMargin
- GeneratedExpression(operands.head.resultTerm, operands.head.nullTerm, code, resultType)
+ GeneratedExpression(nullValue.resultTerm, nullValue.nullTerm, code, resultType)
case ssf: ScalarSqlFunction =>
new ScalarFunctionCallGen(ssf.getScalarFunction).generate(ctx, operands, resultType)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index 7a6c3da..020e5cd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -24,9 +24,11 @@ import org.apache.flink.table.expressions.{ApiExpressionVisitor, CallExpression,
import org.apache.flink.table.functions.BuiltInFunctionDefinitions._
import org.apache.flink.table.functions._
import org.apache.flink.table.planner.expressions.{E => PlannerE, UUID => PlannerUUID}
+import org.apache.flink.table.planner.functions.InternalFunctionDefinitions.THROW_EXCEPTION
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
import org.apache.flink.table.types.logical.LogicalTypeRoot.{CHAR, DECIMAL, SYMBOL, TIMESTAMP_WITHOUT_TIME_ZONE}
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks._
+import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
import _root_.scala.collection.JavaConverters._
@@ -85,6 +87,13 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
val windowReference = translateWindowReference(children.head)
return RowtimeAttribute(windowReference)
+ case THROW_EXCEPTION =>
+ assert(children.size == 2)
+ return ThrowException(
+ children.head.accept(this),
+ fromDataTypeToLegacyInfo(
+ children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType))
+
case _ =>
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
index 5b56b77..8629339 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
@@ -17,7 +17,7 @@
*/
package org.apache.flink.table.planner.expressions
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation, Types}
import org.apache.flink.table.functions._
import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
@@ -221,3 +221,21 @@ case class PlannerTableFunctionCall(
override def toString =
s"${tableFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
}
+
+case class ThrowException(msg: PlannerExpression, tp: TypeInformation[_]) extends UnaryExpression {
+
+ override private[flink] def resultType: TypeInformation[_] = tp
+
+ override private[flink] def child: PlannerExpression = msg
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (child.resultType == Types.STRING) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"ThrowException operator requires String input, " +
+ s"but $child is of type ${child.resultType}")
+ }
+ }
+
+ override def toString: String = s"ThrowException($msg)"
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/ScalarQueryITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/ScalarQueryITCase.scala
index 1a948b0..c8db3d3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/ScalarQueryITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/ScalarQueryITCase.scala
@@ -18,8 +18,12 @@
package org.apache.flink.table.planner.runtime.batch.sql.join
+import org.apache.flink.runtime.client.JobExecutionException
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.planner.runtime.utils.TestData._
+
+import org.junit.{Before, Test}
import scala.collection.Seq
@@ -46,6 +50,25 @@ class ScalarQueryITCase extends BatchTestBase {
row(6, null)
)
-}
+ @Before
+ override def before(): Unit = {
+ super.before()
+ registerCollection("l", l, INT_DOUBLE, "a, b")
+ registerCollection("r", r, INT_DOUBLE, "c, d")
+ }
+ @Test
+ def testScalarSubQuery(): Unit = {
+ checkResult(
+ "SELECT * FROM l WHERE a = (SELECT c FROM r where c = 3)",
+ Seq(row(3, 3.0)))
+ }
+
+ @Test(expected = classOf[JobExecutionException])
+ def testScalarSubQueryException(): Unit = {
+ checkResult(
+ "SELECT * FROM l WHERE a = (SELECT c FROM r)",
+ Seq(row(3, 3.0)))
+ }
+}