You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/24 12:26:23 UTC

[flink] branch release-1.9 updated: [FLINK-13378][table-planner-blink] Fix SINGLE_VALUE is not correctly supported in blink planner

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 353dd8d  [FLINK-13378][table-planner-blink] Fix SINGLE_VALUE is not correctly supported in blink planner
353dd8d is described below

commit 353dd8d8087f12fe155537acc8887831cc776c9b
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Tue Jul 23 21:28:26 2019 +0800

    [FLINK-13378][table-planner-blink] Fix SINGLE_VALUE is not correctly supported in blink planner
    
    This closes #9208
---
 .../planner/expressions/ExpressionBuilder.java     |  2 +-
 .../planner/expressions/RexNodeConverter.java      | 11 ++++++++++
 .../aggfunctions/SingleValueAggFunction.java       |  2 +-
 .../functions/sql/FlinkSqlOperatorTable.java       |  5 -----
 .../functions/sql/SqlThrowExceptionFunction.java   |  9 ++++----
 .../table/planner/codegen/ExprCodeGenerator.scala  |  9 +++++---
 .../expressions/PlannerExpressionConverter.scala   |  9 ++++++++
 .../flink/table/planner/expressions/call.scala     | 20 ++++++++++++++++-
 .../runtime/batch/sql/join/ScalarQueryITCase.scala | 25 +++++++++++++++++++++-
 9 files changed, 75 insertions(+), 17 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ExpressionBuilder.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ExpressionBuilder.java
index 512b775..d96e698 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ExpressionBuilder.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ExpressionBuilder.java
@@ -140,6 +140,6 @@ public class ExpressionBuilder {
 	}
 
 	public static Expression throwException(String msg, DataType type) {
-		return call(THROW_EXCEPTION, typeLiteral(type));
+		return call(THROW_EXCEPTION, literal(msg), typeLiteral(type));
 	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
index e562fed..a3d85e7 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
@@ -49,8 +49,11 @@ import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.calcite.RexAggLocalVariable;
 import org.apache.flink.table.planner.calcite.RexDistinctKeyVariable;
+import org.apache.flink.table.planner.functions.InternalFunctionDefinitions;
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.planner.functions.sql.SqlThrowExceptionFunction;
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -305,6 +308,14 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> {
 		conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.SHA1, exprs -> convert(FlinkSqlOperatorTable.SHA1, exprs));
 		conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.STREAM_RECORD_TIMESTAMP,
 				exprs -> convert(FlinkSqlOperatorTable.STREAMRECORD_TIMESTAMP, exprs));
+
+		// blink expression
+		conversionsOfBuiltInFunc.put(InternalFunctionDefinitions.THROW_EXCEPTION, exprs -> {
+			DataType type = ((TypeLiteralExpression) exprs.get(1)).getOutputDataType();
+			return convert(new SqlThrowExceptionFunction(
+					typeFactory.createFieldTypeFromLogicalType(fromDataTypeToLogicalType(type))),
+					exprs.subList(0, 1));
+		});
 	}
 
 	@Override
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
index 110f9d9..865c0c2 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
@@ -76,7 +76,7 @@ public abstract class SingleValueAggFunction extends DeclarativeAggregateFunctio
 	@Override
 	public Expression[] accumulateExpressions() {
 		return new Expression[] {
-			/* value = count == 0 ? exception : operand(0) */
+			/* value = count > 0 ? exception : operand(0) */
 			ifThenElse(greaterThan(count, ZERO),
 				throwException(ERROR_MSG, getResultType()),
 				operand(0)),
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index b2fd438..50ff193 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -955,11 +955,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	 */
 	public static final SqlIncrSumAggFunction INCR_SUM = new SqlIncrSumAggFunction();
 
-	/**
-	 * <code>THROW_EXCEPTION</code> scalar function. Only internal used.
-	 */
-	public static final SqlFunction THROW_EXCEPTION = new SqlThrowExceptionFunction();
-
 	// -----------------------------------------------------------------------------
 	// Window SQL functions
 	// -----------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlThrowExceptionFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlThrowExceptionFunction.java
index c767b82..4547a36 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlThrowExceptionFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlThrowExceptionFunction.java
@@ -18,24 +18,23 @@
 
 package org.apache.flink.table.planner.functions.sql;
 
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.OperandTypes;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.type.SqlTypeFamily;
 
 /**
  * Function used to throw an exception, only used internally.
  */
 public class SqlThrowExceptionFunction extends SqlFunction {
-	public SqlThrowExceptionFunction() {
+	public SqlThrowExceptionFunction(RelDataType returnType) {
 		super(
 			"THROW_EXCEPTION",
 			SqlKind.OTHER_FUNCTION,
-			ReturnTypes.ARG0_NULLABLE,
+			opBinding -> returnType,
 			null,
-			OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.ANY),
+			OperandTypes.STRING,
 			SqlFunctionCategory.USER_DEFINED_FUNCTION);
 	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 3e1a5d3..e641708 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.planner.codegen.GeneratedExpression.{NEVER_NULL, N
 import org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens._
 import org.apache.flink.table.planner.codegen.calls.{FunctionGenerator, ScalarFunctionCallGen, StringCallGen, TableFunctionCallGen}
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable._
+import org.apache.flink.table.planner.functions.sql.SqlThrowExceptionFunction
 import org.apache.flink.table.planner.functions.utils.{ScalarSqlFunction, TableSqlFunction}
 import org.apache.flink.table.runtime.types.PlannerTypeUtils.isInteroperable
 import org.apache.flink.table.runtime.typeutils.TypeCheckUtils
@@ -717,14 +718,16 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
       case STREAMRECORD_TIMESTAMP =>
         generateRowtimeAccess(ctx, contextTerm)
 
-      case THROW_EXCEPTION =>
+      case _: SqlThrowExceptionFunction =>
+        val nullValue = generateNullLiteral(resultType, nullCheck = true)
         val code =
           s"""
              |${operands.map(_.code).mkString("\n")}
+             |${nullValue.code}
              |org.apache.flink.util.ExceptionUtils.rethrow(
-             |  new RuntimeException(${operands(1).resultTerm}.toString()));
+             |  new RuntimeException(${operands.head.resultTerm}.toString()));
              |""".stripMargin
-        GeneratedExpression(operands.head.resultTerm, operands.head.nullTerm, code, resultType)
+        GeneratedExpression(nullValue.resultTerm, nullValue.nullTerm, code, resultType)
 
       case ssf: ScalarSqlFunction =>
         new ScalarFunctionCallGen(ssf.getScalarFunction).generate(ctx, operands, resultType)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index 7a6c3da..020e5cd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -24,9 +24,11 @@ import org.apache.flink.table.expressions.{ApiExpressionVisitor, CallExpression,
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions._
 import org.apache.flink.table.functions._
 import org.apache.flink.table.planner.expressions.{E => PlannerE, UUID => PlannerUUID}
+import org.apache.flink.table.planner.functions.InternalFunctionDefinitions.THROW_EXCEPTION
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
 import org.apache.flink.table.types.logical.LogicalTypeRoot.{CHAR, DECIMAL, SYMBOL, TIMESTAMP_WITHOUT_TIME_ZONE}
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks._
+import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
 
 import _root_.scala.collection.JavaConverters._
 
@@ -85,6 +87,13 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
         val windowReference = translateWindowReference(children.head)
         return RowtimeAttribute(windowReference)
 
+      case THROW_EXCEPTION =>
+        assert(children.size == 2)
+        return ThrowException(
+          children.head.accept(this),
+          fromDataTypeToLegacyInfo(
+            children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType))
+
       case _ =>
     }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
index 5b56b77..8629339 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.planner.expressions
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation, Types}
 import org.apache.flink.table.functions._
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
@@ -221,3 +221,21 @@ case class PlannerTableFunctionCall(
   override def toString =
     s"${tableFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
 }
+
+case class ThrowException(msg: PlannerExpression, tp: TypeInformation[_]) extends UnaryExpression {
+
+  override private[flink] def resultType: TypeInformation[_] = tp
+
+  override private[flink] def child: PlannerExpression = msg
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (child.resultType == Types.STRING) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"ThrowException operator requires String input, " +
+          s"but $child is of type ${child.resultType}")
+    }
+  }
+
+  override def toString: String = s"ThrowException($msg)"
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/ScalarQueryITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/ScalarQueryITCase.scala
index 1a948b0..c8db3d3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/ScalarQueryITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/ScalarQueryITCase.scala
@@ -18,8 +18,12 @@
 
 package org.apache.flink.table.planner.runtime.batch.sql.join
 
+import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.planner.runtime.utils.TestData._
+
+import org.junit.{Before, Test}
 
 import scala.collection.Seq
 
@@ -46,6 +50,25 @@ class ScalarQueryITCase extends BatchTestBase {
     row(6, null)
   )
 
-}
+  @Before
+  override def before(): Unit = {
+    super.before()
+    registerCollection("l", l, INT_DOUBLE, "a, b")
+    registerCollection("r", r, INT_DOUBLE, "c, d")
+  }
 
+  @Test
+  def testScalarSubQuery(): Unit = {
+    checkResult(
+      "SELECT * FROM l WHERE a = (SELECT c FROM r where c = 3)",
+      Seq(row(3, 3.0)))
+  }
+
+  @Test(expected = classOf[JobExecutionException])
+  def testScalarSubQueryException(): Unit = {
+    checkResult(
+      "SELECT * FROM l WHERE a = (SELECT c FROM r)",
+      Seq(row(3, 3.0)))
+  }
+}