You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/11/22 07:05:52 UTC
[flink] 01/03: [FLINK-24781][table-planner] Added CastRule#canFail and make sure ScalarOperatorGens wraps the cast invocation in a try-catch
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 92c02fc747f7794f2c20ac161ad5d7b9c0f2c0f8
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Mon Nov 15 13:39:51 2021 +0100
[FLINK-24781][table-planner] Added CastRule#canFail and make sure ScalarOperatorGens wraps the cast invocation in a try-catch
Signed-off-by: slinkydeveloper <fr...@gmail.com>
---
.../functions/casting/AbstractCastRule.java | 5 ++
.../AbstractExpressionCodeGeneratorCastRule.java | 2 +
.../table/planner/functions/casting/CastRule.java | 2 +
.../CodeGeneratedExpressionCastExecutor.java | 7 ++-
.../planner/codegen/calls/ScalarOperatorGens.scala | 60 +++++++++++++++++-----
5 files changed, 63 insertions(+), 13 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java
index c193139..840c8df 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java
@@ -31,4 +31,9 @@ abstract class AbstractCastRule<IN, OUT> implements CastRule<IN, OUT> {
public CastRulePredicate getPredicateDefinition() {
return predicate;
}
+
+ @Override
+ public boolean canFail() {
+ return false;
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
index 0b14ddc..aa0a50b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
@@ -25,7 +25,9 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
import java.util.Collections;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.boxedTypeTermForType;
import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.box;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.cast;
import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.unbox;
/**
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java
index e93effb..58217e4 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java
@@ -45,6 +45,8 @@ public interface CastRule<IN, OUT> {
CastExecutor<IN, OUT> create(
Context context, LogicalType inputLogicalType, LogicalType targetLogicalType);
+ boolean canFail();
+
/** Casting context. */
interface Context {
ZoneId getSessionZoneId();
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
index c94db8d..f39089a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
@@ -53,7 +53,12 @@ class CodeGeneratedExpressionCastExecutor<IN, OUT> implements CastExecutor<IN, O
inputArray[0] = value;
return (OUT) expressionEvaluator.evaluate(inputArray);
} catch (InvocationTargetException e) {
- throw new FlinkRuntimeException("Cannot execute the compiled expression", e);
+ if (e.getCause() instanceof TableException) {
+ // Expected exception created by the rule, so no need to wrap it
+ throw (TableException) e.getCause();
+ }
+ throw new TableException(
+ "Cannot execute the compiled expression for an unknown cause", e);
}
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 6a7fbb2..0cb0bea 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.planner.codegen.calls
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.data.binary.BinaryArrayData
import org.apache.flink.table.planner.functions.casting.{CastRuleProvider, CodeGeneratorCastRule, ExpressionCodeGeneratorCastRule}
import org.apache.flink.table.data.util.MapDataUtil
@@ -953,17 +953,53 @@ object ScalarOperatorGens {
targetType
)
- val castCode = s"\n" +
- s"// --- Cast section generated by ${className(codeGeneratorCastRule.getClass)}\n" +
- s"${castCodeBlock.getCode}" +
- s"// --- End cast section\n"
-
- return GeneratedExpression(
- castCodeBlock.getReturnTerm,
- castCodeBlock.getIsNullTerm,
- operand.code + castCode,
- targetType
- )
+ if (codeGeneratorCastRule.canFail) {
+ val resultTerm = ctx.addReusableLocalVariable(
+ primitiveTypeTermForType(targetType),
+ "castRuleResult"
+ )
+ val nullTerm = ctx.addReusableLocalVariable(
+ "boolean",
+ "castRuleResultIsNull"
+ )
+
+ // TODO this code belongs to TRY_CAST, more than to CAST.
+ // See https://issues.apache.org/jira/browse/FLINK-24385 for more details
+ val castCode =
+ s"""
+ | // --- Cast section generated by ${className(codeGeneratorCastRule.getClass)}
+ | try {
+ | ${castCodeBlock.getCode}
+ | $resultTerm = ${castCodeBlock.getReturnTerm};
+ | $nullTerm = ${castCodeBlock.getIsNullTerm};
+ | } catch (${className[Throwable]} e) {
+ | $resultTerm = ${primitiveDefaultValue(targetType)};
+ | $nullTerm = true;
+ | }
+ | // --- End cast section
+ """.stripMargin
+
+ return GeneratedExpression(
+ resultTerm,
+ nullTerm,
+ operand.code + "\n" + castCode,
+ targetType
+ )
+ } else {
+ val castCode =
+ s"""
+ | // --- Cast section generated by ${className(codeGeneratorCastRule.getClass)}
+ | ${castCodeBlock.getCode}
+ | // --- End cast section
+ """.stripMargin
+
+ return GeneratedExpression(
+ castCodeBlock.getReturnTerm,
+ castCodeBlock.getIsNullTerm,
+ operand.code + castCode,
+ targetType
+ )
+ }
case _ =>
}