You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/11/22 07:05:52 UTC

[flink] 01/03: [FLINK-24781][table-planner] Added CastRule#canFail and make sure ScalarOperatorGens wraps the cast invocation in a try-catch

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 92c02fc747f7794f2c20ac161ad5d7b9c0f2c0f8
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Mon Nov 15 13:39:51 2021 +0100

    [FLINK-24781][table-planner] Added CastRule#canFail and make sure ScalarOperatorGens wraps the cast invocation in a try-catch
    
    Signed-off-by: slinkydeveloper <fr...@gmail.com>
---
 .../functions/casting/AbstractCastRule.java        |  5 ++
 .../AbstractExpressionCodeGeneratorCastRule.java   |  2 +
 .../table/planner/functions/casting/CastRule.java  |  2 +
 .../CodeGeneratedExpressionCastExecutor.java       |  7 ++-
 .../planner/codegen/calls/ScalarOperatorGens.scala | 60 +++++++++++++++++-----
 5 files changed, 63 insertions(+), 13 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java
index c193139..840c8df 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java
@@ -31,4 +31,9 @@ abstract class AbstractCastRule<IN, OUT> implements CastRule<IN, OUT> {
     public CastRulePredicate getPredicateDefinition() {
         return predicate;
     }
+
+    @Override
+    public boolean canFail() {
+        return false;
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
index 0b14ddc..aa0a50b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
@@ -25,7 +25,9 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
 
 import java.util.Collections;
 
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.boxedTypeTermForType;
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.box;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.cast;
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.unbox;
 
 /**
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java
index e93effb..58217e4 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java
@@ -45,6 +45,8 @@ public interface CastRule<IN, OUT> {
     CastExecutor<IN, OUT> create(
             Context context, LogicalType inputLogicalType, LogicalType targetLogicalType);
 
+    boolean canFail();
+
     /** Casting context. */
     interface Context {
         ZoneId getSessionZoneId();
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
index c94db8d..f39089a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
@@ -53,7 +53,12 @@ class CodeGeneratedExpressionCastExecutor<IN, OUT> implements CastExecutor<IN, O
             inputArray[0] = value;
             return (OUT) expressionEvaluator.evaluate(inputArray);
         } catch (InvocationTargetException e) {
-            throw new FlinkRuntimeException("Cannot execute the compiled expression", e);
+            if (e.getCause() instanceof TableException) {
+                // Expected exception created by the rule, so no need to wrap it
+                throw (TableException) e.getCause();
+            }
+            throw new TableException(
+                    "Cannot execute the compiled expression for an unknown cause", e);
         }
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 6a7fbb2..0cb0bea 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.codegen.calls
 
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.data.binary.BinaryArrayData
 import org.apache.flink.table.planner.functions.casting.{CastRuleProvider, CodeGeneratorCastRule, ExpressionCodeGeneratorCastRule}
 import org.apache.flink.table.data.util.MapDataUtil
@@ -953,17 +953,53 @@ object ScalarOperatorGens {
           targetType
         )
 
-        val castCode = s"\n" +
-          s"// --- Cast section generated by ${className(codeGeneratorCastRule.getClass)}\n" +
-          s"${castCodeBlock.getCode}" +
-          s"// --- End cast section\n"
-
-        return GeneratedExpression(
-          castCodeBlock.getReturnTerm,
-          castCodeBlock.getIsNullTerm,
-          operand.code + castCode,
-          targetType
-        )
+        if (codeGeneratorCastRule.canFail) {
+          val resultTerm = ctx.addReusableLocalVariable(
+            primitiveTypeTermForType(targetType),
+            "castRuleResult"
+          )
+          val nullTerm = ctx.addReusableLocalVariable(
+            "boolean",
+            "castRuleResultIsNull"
+          )
+
+          // TODO this code belongs to TRY_CAST, more than to CAST.
+          //  See https://issues.apache.org/jira/browse/FLINK-24385 for more details
+          val castCode =
+            s"""
+               | // --- Cast section generated by ${className(codeGeneratorCastRule.getClass)}
+               | try {
+               |   ${castCodeBlock.getCode}
+               |   $resultTerm = ${castCodeBlock.getReturnTerm};
+               |   $nullTerm = ${castCodeBlock.getIsNullTerm};
+               | } catch (${className[Throwable]} e) {
+               |   $resultTerm = ${primitiveDefaultValue(targetType)};
+               |   $nullTerm = true;
+               | }
+               | // --- End cast section
+               """.stripMargin
+
+          return GeneratedExpression(
+            resultTerm,
+            nullTerm,
+            operand.code + "\n" + castCode,
+            targetType
+          )
+        } else {
+          val castCode =
+            s"""
+               | // --- Cast section generated by ${className(codeGeneratorCastRule.getClass)}
+               | ${castCodeBlock.getCode}
+               | // --- End cast section
+               """.stripMargin
+
+          return GeneratedExpression(
+            castCodeBlock.getReturnTerm,
+            castCodeBlock.getIsNullTerm,
+            operand.code + castCode,
+            targetType
+          )
+        }
       case _ =>
     }