You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/11/22 07:05:51 UTC

[flink] branch master updated (cd988b6 -> 9f7eef2)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from cd988b6  [FLINK-24918][state] Support to specify the data dir for state benchmark
     new 92c02fc  [FLINK-24781][table-planner] Added CastRule#canFail and make sure ScalarOperatorGens wraps the cast invocation in a try-catch
     new 0426d8c  [FLINK-24781][table-planner] Add string parsing methods to BinaryStringDataUtil and add from string cast rules
     new 9f7eef2  [FLINK-24781][table-planner] Refactor cast of literals to use CastExecutor

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/table/utils/DateTimeUtils.java    |   5 +-
 .../functions/casting/AbstractCastRule.java        |   5 +
 .../AbstractExpressionCodeGeneratorCastRule.java   |  11 +-
 .../table/planner/functions/casting/CastRule.java  |   2 +
 .../functions/casting/CastRuleProvider.java        |   8 +
 .../CodeGeneratedExpressionCastExecutor.java       |   9 +-
 ...ngCastRule.java => StringToBinaryCastRule.java} |  22 +-
 ...gCastRule.java => StringToBooleanCastRule.java} |  24 +-
 ...ringCastRule.java => StringToDateCastRule.java} |  24 +-
 ...lCastRule.java => StringToDecimalCastRule.java} |  23 +-
 .../casting/StringToNumericPrimitiveCastRule.java  |  78 +++++++
 ...ringCastRule.java => StringToTimeCastRule.java} |  27 ++-
 ...astRule.java => StringToTimestampCastRule.java} |  39 ++--
 .../flink/table/planner/codegen/CodeGenUtils.scala |  26 ++-
 .../table/planner/codegen/GenerateUtils.scala      |  16 --
 .../planner/codegen/calls/BuiltInMethods.scala     |  67 +++++-
 .../table/planner/codegen/calls/IfCallGen.scala    |   7 +-
 .../planner/codegen/calls/ScalarOperatorGens.scala | 254 ++++++++-------------
 .../planner/functions/casting/CastRulesTest.java   | 211 ++++++++++++++++-
 .../validation/ScalarOperatorsValidationTest.scala |  12 +-
 .../table/data/binary/BinaryStringDataUtil.java    | 169 ++++++++------
 .../flink/table/data/BinaryStringDataTest.java     |  41 ++--
 22 files changed, 726 insertions(+), 354 deletions(-)
 copy flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{NumericToStringCastRule.java => StringToBinaryCastRule.java} (67%)
 copy flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{DateToStringCastRule.java => StringToBooleanCastRule.java} (68%)
 copy flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{DateToStringCastRule.java => StringToDateCastRule.java} (68%)
 copy flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{DecimalToDecimalCastRule.java => StringToDecimalCastRule.java} (71%)
 create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToNumericPrimitiveCastRule.java
 copy flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{DateToStringCastRule.java => StringToTimeCastRule.java} (67%)
 copy flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{IntervalToStringCastRule.java => StringToTimestampCastRule.java} (59%)

[flink] 02/03: [FLINK-24781][table-planner] Add string parsing methods to BinaryStringDataUtil and add from string cast rules

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0426d8c7af0191f75e6aaa4696b3358de059dc67
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Mon Nov 15 13:40:34 2021 +0100

    [FLINK-24781][table-planner] Add string parsing methods to BinaryStringDataUtil and add from string cast rules
    
    Signed-off-by: slinkydeveloper <fr...@gmail.com>
---
 .../apache/flink/table/utils/DateTimeUtils.java    |   5 +-
 .../AbstractExpressionCodeGeneratorCastRule.java   |   9 +-
 .../functions/casting/CastRuleProvider.java        |   8 +
 .../CodeGeneratedExpressionCastExecutor.java       |   1 -
 .../functions/casting/StringToBinaryCastRule.java  |  50 +++++
 .../functions/casting/StringToBooleanCastRule.java |  55 ++++++
 .../functions/casting/StringToDateCastRule.java    |  55 ++++++
 .../functions/casting/StringToDecimalCastRule.java |  63 ++++++
 .../casting/StringToNumericPrimitiveCastRule.java  |  78 ++++++++
 .../functions/casting/StringToTimeCastRule.java    |  58 ++++++
 .../casting/StringToTimestampCastRule.java         |  63 ++++++
 .../planner/codegen/calls/BuiltInMethods.scala     |  68 +++++--
 .../planner/codegen/calls/ScalarOperatorGens.scala | 105 ----------
 .../planner/functions/casting/CastRulesTest.java   | 211 ++++++++++++++++++++-
 .../table/data/binary/BinaryStringDataUtil.java    | 169 ++++++++++-------
 .../flink/table/data/BinaryStringDataTest.java     |  41 ++--
 16 files changed, 826 insertions(+), 213 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
index fdd715b..06fc883 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
@@ -317,6 +317,7 @@ public class DateTimeUtils {
     // --------------------------------------------------------------------------------------------
     // String --> Timestamp conversion
     // --------------------------------------------------------------------------------------------
+
     public static TimestampData toTimestampData(String dateStr) {
         int length = dateStr.length();
         String format;
@@ -411,7 +412,7 @@ public class DateTimeUtils {
      * @param format date time string format
      * @param tz the time zone
      */
-    public static Long toTimestamp(String dateStr, String format, TimeZone tz) {
+    private static Long toTimestamp(String dateStr, String format, TimeZone tz) {
         SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
         formatter.setTimeZone(tz);
         try {
@@ -1717,7 +1718,7 @@ public class DateTimeUtils {
         return timeStringToUnixDate(v, 0);
     }
 
-    public static Integer timeStringToUnixDate(String v, int start) {
+    private static Integer timeStringToUnixDate(String v, int start) {
         final int colon1 = v.indexOf(':', start);
         // timezone hh:mm:ss[.ssssss][[+|-]hh:mm:ss]
         // refer https://www.w3.org/TR/NOTE-datetime
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
index aa0a50b..6700a7e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
@@ -73,7 +73,14 @@ abstract class AbstractExpressionCodeGeneratorCastRule<IN, OUT>
                         box(
                                 generateExpression(
                                         createCodeGeneratorCastRuleContext(context),
-                                        unbox(inputArgumentName, inputLogicalType),
+                                        unbox(
+                                                // We need the casting because the rules uses the
+                                                // concrete classes (e.g. StringData and
+                                                // BinaryStringData)
+                                                cast(
+                                                        boxedTypeTermForType(inputLogicalType),
+                                                        inputArgumentName),
+                                                inputLogicalType),
                                         inputLogicalType,
                                         targetLogicalType),
                                 targetLogicalType));
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
index b098191..cb5dfa2 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
@@ -58,6 +58,14 @@ public class CastRuleProvider {
                 .addRule(MapAndMultisetToStringCastRule.INSTANCE)
                 .addRule(RowToStringCastRule.INSTANCE)
                 .addRule(RawToStringCastRule.INSTANCE)
+                // From string rules
+                .addRule(StringToBooleanCastRule.INSTANCE)
+                .addRule(StringToDecimalCastRule.INSTANCE)
+                .addRule(StringToNumericPrimitiveCastRule.INSTANCE)
+                .addRule(StringToDateCastRule.INSTANCE)
+                .addRule(StringToTimeCastRule.INSTANCE)
+                .addRule(StringToTimestampCastRule.INSTANCE)
+                .addRule(StringToBinaryCastRule.INSTANCE)
                 // Collection rules
                 .addRule(ArrayToArrayCastRule.INSTANCE)
                 // Special rules
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
index f39089a..7c361ac 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.planner.functions.casting;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.utils.CastExecutor;
-import org.apache.flink.util.FlinkRuntimeException;
 
 import org.codehaus.janino.ExpressionEvaluator;
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBinaryCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBinaryCastRule.java
new file mode 100644
index 0000000..2a70de4
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBinaryCastRule.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+
+/**
+ * {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeFamily#BINARY_STRING} cast rule.
+ */
+class StringToBinaryCastRule extends AbstractExpressionCodeGeneratorCastRule<StringData, byte[]> {
+
+    static final StringToBinaryCastRule INSTANCE = new StringToBinaryCastRule();
+
+    private StringToBinaryCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(LogicalTypeFamily.BINARY_STRING)
+                        .build());
+    }
+
+    @Override
+    public String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        return methodCall(inputTerm, "toBytes");
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBooleanCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBooleanCastRule.java
new file mode 100644
index 0000000..b8730eb
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBooleanCastRule.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_BOOLEAN;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/** {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeRoot#BOOLEAN} cast rule. */
+class StringToBooleanCastRule extends AbstractExpressionCodeGeneratorCastRule<StringData, Boolean> {
+
+    static final StringToBooleanCastRule INSTANCE = new StringToBooleanCastRule();
+
+    private StringToBooleanCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(LogicalTypeRoot.BOOLEAN)
+                        .build());
+    }
+
+    @Override
+    public String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        return staticCall(STRING_DATA_TO_BOOLEAN(), inputTerm);
+    }
+
+    @Override
+    public boolean canFail() {
+        return true;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDateCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDateCastRule.java
new file mode 100644
index 0000000..2a31f22
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDateCastRule.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_DATE;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/** {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeRoot#DATE} cast rule. */
+class StringToDateCastRule extends AbstractExpressionCodeGeneratorCastRule<StringData, Integer> {
+
+    static final StringToDateCastRule INSTANCE = new StringToDateCastRule();
+
+    private StringToDateCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(LogicalTypeRoot.DATE)
+                        .build());
+    }
+
+    @Override
+    public String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        return staticCall(STRING_DATA_TO_DATE(), inputTerm);
+    }
+
+    @Override
+    public boolean canFail() {
+        return true;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDecimalCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDecimalCastRule.java
new file mode 100644
index 0000000..77108d8
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDecimalCastRule.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_DECIMAL;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/** {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeRoot#DECIMAL} cast rule. */
+class StringToDecimalCastRule
+        extends AbstractExpressionCodeGeneratorCastRule<StringData, DecimalData> {
+
+    static final StringToDecimalCastRule INSTANCE = new StringToDecimalCastRule();
+
+    private StringToDecimalCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(LogicalTypeRoot.DECIMAL)
+                        .build());
+    }
+
+    @Override
+    public String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final DecimalType targetDecimalType = (DecimalType) targetLogicalType;
+        return staticCall(
+                STRING_DATA_TO_DECIMAL(),
+                inputTerm,
+                targetDecimalType.getPrecision(),
+                targetDecimalType.getScale());
+    }
+
+    @Override
+    public boolean canFail() {
+        return true;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToNumericPrimitiveCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToNumericPrimitiveCastRule.java
new file mode 100644
index 0000000..cec9ca8
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToNumericPrimitiveCastRule.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_BYTE;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_DOUBLE;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_FLOAT;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_INT;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_LONG;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_SHORT;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/**
+ * {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeFamily#INTEGER_NUMERIC} and
+ * {@link LogicalTypeFamily#APPROXIMATE_NUMERIC} cast rule.
+ */
+class StringToNumericPrimitiveCastRule
+        extends AbstractExpressionCodeGeneratorCastRule<StringData, Number> {
+
+    static final StringToNumericPrimitiveCastRule INSTANCE = new StringToNumericPrimitiveCastRule();
+
+    private StringToNumericPrimitiveCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(LogicalTypeFamily.INTEGER_NUMERIC)
+                        .target(LogicalTypeFamily.APPROXIMATE_NUMERIC)
+                        .build());
+    }
+
+    @Override
+    public String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        switch (targetLogicalType.getTypeRoot()) {
+            case TINYINT:
+                return staticCall(STRING_DATA_TO_BYTE(), inputTerm);
+            case SMALLINT:
+                return staticCall(STRING_DATA_TO_SHORT(), inputTerm);
+            case INTEGER:
+                return staticCall(STRING_DATA_TO_INT(), inputTerm);
+            case BIGINT:
+                return staticCall(STRING_DATA_TO_LONG(), inputTerm);
+            case FLOAT:
+                return staticCall(STRING_DATA_TO_FLOAT(), inputTerm);
+            case DOUBLE:
+                return staticCall(STRING_DATA_TO_DOUBLE(), inputTerm);
+        }
+        throw new IllegalArgumentException("This is a bug. Please file an issue.");
+    }
+
+    @Override
+    public boolean canFail() {
+        return true;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimeCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimeCastRule.java
new file mode 100644
index 0000000..4c14b78
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimeCastRule.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_TIME;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/**
+ * {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeRoot#TIME_WITHOUT_TIME_ZONE} cast
+ * rule.
+ */
+class StringToTimeCastRule extends AbstractExpressionCodeGeneratorCastRule<StringData, Integer> {
+
+    static final StringToTimeCastRule INSTANCE = new StringToTimeCastRule();
+
+    private StringToTimeCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE)
+                        .build());
+    }
+
+    @Override
+    public String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        return staticCall(STRING_DATA_TO_TIME(), inputTerm);
+    }
+
+    @Override
+    public boolean canFail() {
+        return true;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimestampCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimestampCastRule.java
new file mode 100644
index 0000000..96e40b3
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimestampCastRule.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_TIMESTAMP;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_TIMESTAMP_WITH_ZONE;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/** {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeFamily#TIMESTAMP} cast rule. */
+class StringToTimestampCastRule
+        extends AbstractExpressionCodeGeneratorCastRule<StringData, TimestampData> {
+
+    static final StringToTimestampCastRule INSTANCE = new StringToTimestampCastRule();
+
+    private StringToTimestampCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(LogicalTypeFamily.TIMESTAMP)
+                        .build());
+    }
+
+    @Override
+    public String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        if (targetLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
+            return staticCall(STRING_DATA_TO_TIMESTAMP(), inputTerm);
+        }
+
+        return staticCall(
+                STRING_DATA_TO_TIMESTAMP_WITH_ZONE(), inputTerm, context.getSessionTimeZoneTerm());
+    }
+
+    @Override
+    public boolean canFail() {
+        return true;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index 15132b0..308826d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -22,14 +22,14 @@ import org.apache.flink.table.data.{DecimalData, DecimalDataUtils, TimestampData
 import org.apache.flink.table.runtime.functions._
 import org.apache.flink.table.utils.DateTimeUtils
 import org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange
-
 import org.apache.calcite.linq4j.tree.Types
 import org.apache.calcite.runtime.{JsonFunctions, SqlFunctions}
 import org.apache.calcite.sql.{SqlJsonExistsErrorBehavior, SqlJsonQueryEmptyOrErrorBehavior, SqlJsonQueryWrapperBehavior, SqlJsonValueEmptyOrErrorBehavior}
-import org.apache.flink.table.data.binary.BinaryStringData
+import org.apache.flink.table.data.binary.{BinaryStringData, BinaryStringDataUtil}
 
 import java.lang.reflect.Method
 import java.lang.{Byte => JByte, Integer => JInteger, Long => JLong, Short => JShort}
+import java.time.ZoneId
 import java.util.TimeZone
 
 object BuiltInMethods {
@@ -402,16 +402,6 @@ object BuiltInMethods {
     "toTimestampData",
     classOf[String], classOf[String])
 
-  val STRING_TO_TIMESTAMP_TIME_ZONE = Types.lookupMethod(
-    classOf[DateTimeUtils],
-    "toTimestamp",
-    classOf[String], classOf[TimeZone])
-
-  val STRING_TO_TIMESTAMP_WITH_FORMAT_TIME_ZONE = Types.lookupMethod(
-    classOf[DateTimeUtils],
-    "toTimestamp",
-    classOf[String], classOf[String], classOf[TimeZone])
-
   val TIMESTAMP_WITH_LOCAL_TIME_ZONE_TO_DATE = Types.lookupMethod(
     classOf[DateTimeUtils],
     "timestampWithLocalZoneToDate",
@@ -547,6 +537,60 @@ object BuiltInMethods {
   val BINARY_STRING_DATA_FROM_STRING = Types.lookupMethod(classOf[BinaryStringData], "fromString",
     classOf[String])
 
+  val STRING_DATA_TO_BOOLEAN = Types.lookupMethod(
+    classOf[BinaryStringDataUtil],
+    "toBoolean",
+    classOf[BinaryStringData])
+
+  val STRING_DATA_TO_DECIMAL = Types.lookupMethod(
+    classOf[BinaryStringDataUtil],
+    "toDecimal",
+    classOf[BinaryStringData],
+    classOf[Int],
+    classOf[Int])
+
+  val STRING_DATA_TO_LONG = Types.lookupMethod(
+    classOf[BinaryStringDataUtil],
+    "toLong",
+    classOf[BinaryStringData])
+
+  val STRING_DATA_TO_INT = Types.lookupMethod(
+    classOf[BinaryStringDataUtil],
+    "toInt",
+    classOf[BinaryStringData])
+
+  val STRING_DATA_TO_SHORT = Types.lookupMethod(
+    classOf[BinaryStringDataUtil],
+    "toShort",
+    classOf[BinaryStringData])
+
+  val STRING_DATA_TO_BYTE = Types.lookupMethod(
+    classOf[BinaryStringDataUtil],
+    "toByte",
+    classOf[BinaryStringData])
+
+  val STRING_DATA_TO_FLOAT = Types.lookupMethod(
+    classOf[BinaryStringDataUtil],
+    "toFloat",
+    classOf[BinaryStringData])
+
+  val STRING_DATA_TO_DOUBLE = Types.lookupMethod(
+    classOf[BinaryStringDataUtil],
+    "toDouble",
+    classOf[BinaryStringData])
+
+  val STRING_DATA_TO_DATE = Types.lookupMethod(
+    classOf[BinaryStringDataUtil], "toDate", classOf[BinaryStringData])
+
+  val STRING_DATA_TO_TIME = Types.lookupMethod(
+    classOf[BinaryStringDataUtil], "toTime", classOf[BinaryStringData])
+
+  val STRING_DATA_TO_TIMESTAMP = Types.lookupMethod(
+    classOf[BinaryStringDataUtil], "toTimestamp", classOf[BinaryStringData])
+
+  val STRING_DATA_TO_TIMESTAMP_WITH_ZONE = Types.lookupMethod(
+    classOf[BinaryStringDataUtil], "toTimestamp", classOf[BinaryStringData], classOf[TimeZone])
+
   // DecimalData functions
 
   val DECIMAL_TO_DECIMAL = Types.lookupMethod(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 0cb0bea..8554e4f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -1094,111 +1094,6 @@ object ScalarOperatorGens {
           operandTerm => s"$operandTerm.toBytes($serTerm)"
         }
 
-      // String -> Boolean
-      case (VARCHAR | CHAR, BOOLEAN) =>
-        val castedExpression = generateUnaryOperatorIfNotNull(
-          ctx,
-          targetType,
-          operand,
-          resultNullable = true) {
-          operandTerm => s"$BINARY_STRING_UTIL.toBooleanSQL($operandTerm)"
-        }
-        val resultTerm = newName("primitiveCastResult")
-        castedExpression.copy(
-          resultTerm = resultTerm,
-          code =
-            s"""
-               |${castedExpression.code}
-               |boolean $resultTerm = Boolean.TRUE.equals(${castedExpression.resultTerm});
-               |""".stripMargin
-        )
-
-      // String -> NUMERIC TYPE (not Character)
-      case (VARCHAR | CHAR, _)
-        if isNumeric(targetType) =>
-        targetType match {
-          case dt: DecimalType =>
-            generateUnaryOperatorIfNotNull(ctx, targetType, operand) { operandTerm =>
-              s"$BINARY_STRING_UTIL.toDecimal($operandTerm, ${dt.getPrecision}, ${dt.getScale})"
-            }
-          case _ =>
-            val methodName = targetType.getTypeRoot match {
-              case TINYINT => "toByte"
-              case SMALLINT => "toShort"
-              case INTEGER => "toInt"
-              case BIGINT => "toLong"
-              case DOUBLE => "toDouble"
-              case FLOAT => "toFloat"
-              case _ => null
-            }
-            assert(methodName != null, "Unexpected data type.")
-            generateUnaryOperatorIfNotNull(
-              ctx,
-              targetType,
-              operand,
-              resultNullable = true) {
-              operandTerm => s"($BINARY_STRING_UTIL.$methodName($operandTerm.trim()))"
-            }
-        }
-
-      // String -> Date
-      case (VARCHAR | CHAR, DATE) =>
-        generateUnaryOperatorIfNotNull(
-          ctx,
-          targetType,
-          operand,
-          resultNullable = true) {
-          operandTerm =>
-            s"${qualifyMethod(BuiltInMethods.STRING_TO_DATE)}($operandTerm.toString())"
-        }
-
-      // String -> Time
-      case (VARCHAR | CHAR, TIME_WITHOUT_TIME_ZONE) =>
-        generateUnaryOperatorIfNotNull(
-          ctx,
-          targetType,
-          operand,
-          resultNullable = true) {
-          operandTerm =>
-            s"${qualifyMethod(BuiltInMethods.STRING_TO_TIME)}($operandTerm.toString())"
-        }
-
-      // String -> Timestamp
-      case (VARCHAR | CHAR, TIMESTAMP_WITHOUT_TIME_ZONE) =>
-        generateUnaryOperatorIfNotNull(
-          ctx,
-          targetType,
-          operand,
-          resultNullable = true) {
-          operandTerm =>
-            s"""
-               |${qualifyMethod(BuiltInMethods.STRING_TO_TIMESTAMP)}($operandTerm.toString())
-           """.stripMargin
-        }
-
-      case (VARCHAR | CHAR, TIMESTAMP_WITH_LOCAL_TIME_ZONE) =>
-        generateCallWithStmtIfArgsNotNull(
-          ctx, targetType, Seq(operand), resultNullable = true) { operands =>
-          val zone = ctx.addReusableSessionTimeZone()
-          val method = qualifyMethod(BuiltInMethods.STRING_TO_TIMESTAMP_TIME_ZONE)
-          val toTimestampResultName = newName("toTimestampResult")
-          // this method call might return null
-          val stmt = s"Long $toTimestampResultName = $method(${operands.head}.toString(), $zone);"
-          val result =
-            s"""
-               |($toTimestampResultName == null ?
-               |  null :
-               |  $TIMESTAMP_DATA.fromEpochMillis($toTimestampResultName))
-               |""".stripMargin
-          (stmt, result)
-        }
-
-      // String -> binary
-      case (VARCHAR | CHAR, VARBINARY | BINARY) =>
-        generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-          operandTerm => s"$operandTerm.toBytes()"
-        }
-
       // Note: SQL2003 $6.12 - casting is not allowed between boolean and numeric types.
       //       Calcite does not allow it either.
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
index 3652787..127ece8 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.functions.casting;
 
 import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericMapData;
@@ -80,6 +81,7 @@ import static org.apache.flink.table.api.DataTypes.TINYINT;
 import static org.apache.flink.table.api.DataTypes.VARBINARY;
 import static org.apache.flink.table.api.DataTypes.VARCHAR;
 import static org.apache.flink.table.api.DataTypes.YEAR;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -90,9 +92,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
  */
 class CastRulesTest {
 
+    private static final ZoneId CET = ZoneId.of("CET");
+
     private static final CastRule.Context CET_CONTEXT =
-            CastRule.Context.create(
-                    ZoneId.of("CET"), Thread.currentThread().getContextClassLoader());
+            CastRule.Context.create(CET, Thread.currentThread().getContextClassLoader());
 
     private static final byte DEFAULT_POSITIVE_TINY_INT = (byte) 5;
     private static final byte DEFAULT_NEGATIVE_TINY_INT = (byte) -5;
@@ -125,6 +128,12 @@ class CastRulesTest {
         return Stream.of(
                 CastTestSpecBuilder.testCastTo(TINYINT())
                         .fromCase(TINYINT(), null, null)
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+                        .fromCase(STRING(), StringData.fromString("1.234"), (byte) 1)
+                        .fromCase(STRING(), StringData.fromString("123"), (byte) 123)
+                        .fail(STRING(), StringData.fromString("-130"), TableException.class)
                         .fromCase(
                                 DECIMAL(4, 3),
                                 DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -151,6 +160,12 @@ class CastRulesTest {
                         .fromCase(DOUBLE(), DEFAULT_NEGATIVE_DOUBLE, (byte) -123),
                 CastTestSpecBuilder.testCastTo(SMALLINT())
                         .fromCase(SMALLINT(), null, null)
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+                        .fromCase(STRING(), StringData.fromString("1.234"), (short) 1)
+                        .fromCase(STRING(), StringData.fromString("123"), (short) 123)
+                        .fail(STRING(), StringData.fromString("-32769"), TableException.class)
                         .fromCase(
                                 DECIMAL(4, 3),
                                 DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -187,6 +202,15 @@ class CastRulesTest {
                         .fromCase(DOUBLE(), DEFAULT_NEGATIVE_DOUBLE, (short) -123)
                         .fromCase(DOUBLE(), 123456.7890d, (short) -7616),
                 CastTestSpecBuilder.testCastTo(INT())
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+                        .fromCase(STRING(), StringData.fromString("1.234"), 1)
+                        .fromCase(STRING(), StringData.fromString("123"), 123)
+                        .fail(
+                                STRING(),
+                                StringData.fromString("-3276913443134"),
+                                TableException.class)
                         .fromCase(
                                 DECIMAL(4, 3),
                                 DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -229,6 +253,13 @@ class CastRulesTest {
                         .fromCase(INTERVAL(DAY(), SECOND()), 123L, 123),
                 CastTestSpecBuilder.testCastTo(BIGINT())
                         .fromCase(BIGINT(), null, null)
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+                        .fromCase(STRING(), StringData.fromString("1.234"), 1L)
+                        .fromCase(STRING(), StringData.fromString("123"), 123L)
+                        .fromCase(
+                                STRING(), StringData.fromString("-3276913443134"), -3276913443134L)
                         .fromCase(
                                 DECIMAL(4, 3),
                                 DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -266,6 +297,13 @@ class CastRulesTest {
                         .fromCase(DOUBLE(), 9234567891.12345d, 9234567891L),
                 CastTestSpecBuilder.testCastTo(FLOAT())
                         .fromCase(FLOAT(), null, null)
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+                        .fromCase(STRING(), StringData.fromString("1.234"), 1.234f)
+                        .fromCase(STRING(), StringData.fromString("123"), 123.0f)
+                        .fromCase(
+                                STRING(), StringData.fromString("-3276913443134"), -3.27691351E12f)
                         .fromCase(
                                 DECIMAL(4, 3),
                                 DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -307,6 +345,15 @@ class CastRulesTest {
                         .fromCase(DOUBLE(), 1239234567891.1234567891234d, 1.23923451E12f),
                 CastTestSpecBuilder.testCastTo(DOUBLE())
                         .fromCase(DOUBLE(), null, null)
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+                        .fromCase(STRING(), StringData.fromString("1.234"), 1.234d)
+                        .fromCase(STRING(), StringData.fromString("123"), 123.0d)
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("-3276913443134"),
+                                -3.276913443134E12d)
                         .fromCase(
                                 DECIMAL(4, 3),
                                 DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -349,6 +396,92 @@ class CastRulesTest {
                         .fromCase(DOUBLE(), DEFAULT_POSITIVE_DOUBLE, DEFAULT_POSITIVE_DOUBLE)
                         .fromCase(DOUBLE(), DEFAULT_NEGATIVE_DOUBLE, DEFAULT_NEGATIVE_DOUBLE)
                         .fromCase(DOUBLE(), 1239234567891.1234567891234d, 1.2392345678911235E12d),
+                CastTestSpecBuilder.testCastTo(DATE())
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("123"),
+                                DateTimeUtils.localDateToUnixDate(LocalDate.of(123, 1, 1)))
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("2021-09-27"),
+                                DateTimeUtils.localDateToUnixDate(LocalDate.of(2021, 9, 27)))
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("2021-09-27 12:34:56.123456789"),
+                                DateTimeUtils.localDateToUnixDate(LocalDate.of(2021, 9, 27)))
+                        .fail(STRING(), StringData.fromString("2021/09/27"), TableException.class),
+                CastTestSpecBuilder.testCastTo(TIME())
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("23"),
+                                DateTimeUtils.localTimeToUnixDate(LocalTime.of(23, 0, 0)))
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("23:45"),
+                                DateTimeUtils.localTimeToUnixDate(LocalTime.of(23, 45, 0)))
+                        .fail(STRING(), StringData.fromString("2021-09-27"), TableException.class)
+                        .fail(
+                                STRING(),
+                                StringData.fromString("2021-09-27 12:34:56"),
+                                TableException.class)
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("12:34:56.123456789"),
+                                DateTimeUtils.localTimeToUnixDate(
+                                        LocalTime.of(12, 34, 56, 123_000_000)))
+                        .fail(
+                                STRING(),
+                                StringData.fromString("2021-09-27 12:34:56.123456789"),
+                                TableException.class),
+                CastTestSpecBuilder.testCastTo(TIMESTAMP(9))
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fail(STRING(), StringData.fromString("123"), TableException.class)
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("2021-09-27"),
+                                TimestampData.fromLocalDateTime(
+                                        LocalDateTime.of(2021, 9, 27, 0, 0, 0, 0)))
+                        .fail(STRING(), StringData.fromString("2021/09/27"), TableException.class)
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("2021-09-27 12:34:56.123456789"),
+                                TimestampData.fromLocalDateTime(
+                                        LocalDateTime.of(2021, 9, 27, 12, 34, 56, 123456789))),
+                CastTestSpecBuilder.testCastTo(TIMESTAMP_LTZ(9))
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fail(STRING(), StringData.fromString("123"), TableException.class)
+                        .fromCase(
+                                STRING(),
+                                CET_CONTEXT,
+                                StringData.fromString("2021-09-27"),
+                                TimestampData.fromInstant(
+                                        LocalDateTime.of(2021, 9, 27, 0, 0, 0, 0)
+                                                .atZone(CET)
+                                                .toInstant()))
+                        .fromCase(
+                                STRING(),
+                                CET_CONTEXT,
+                                StringData.fromString("2021-09-27 12:34:56.123"),
+                                TimestampData.fromInstant(
+                                        LocalDateTime.of(2021, 9, 27, 12, 34, 56, 123000000)
+                                                .atZone(CET)
+                                                .toInstant()))
+                        // https://issues.apache.org/jira/browse/FLINK-24446 Fractional seconds are
+                        // lost
+                        .fromCase(
+                                STRING(),
+                                CET_CONTEXT,
+                                StringData.fromString("2021-09-27 12:34:56.123456789"),
+                                TimestampData.fromInstant(
+                                        LocalDateTime.of(2021, 9, 27, 12, 34, 56, 0)
+                                                .atZone(CET)
+                                                .toInstant())),
                 CastTestSpecBuilder.testCastTo(STRING())
                         .fromCase(STRING(), null, null)
                         .fromCase(
@@ -478,7 +611,60 @@ class CastRulesTest {
                                 RawValueData.fromObject(
                                         LocalDateTime.parse("2020-11-11T18:08:01.123")),
                                 StringData.fromString("2020-11-11T18:08:01.123")),
+                CastTestSpecBuilder.testCastTo(BOOLEAN())
+                        .fromCase(BOOLEAN(), null, null)
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fromCase(CHAR(4), StringData.fromString("true"), true)
+                        .fromCase(VARCHAR(5), StringData.fromString("FalsE"), false)
+                        .fail(STRING(), StringData.fromString("Apache Flink"), TableException.class)
+                        .fromCase(STRING(), StringData.fromString("TRUE"), true)
+                        .fail(STRING(), StringData.fromString(""), TableException.class),
+                CastTestSpecBuilder.testCastTo(BINARY(2))
+                        .fromCase(CHAR(3), StringData.fromString("foo"), new byte[] {102, 111, 111})
+                        .fromCase(
+                                VARCHAR(5),
+                                StringData.fromString("Flink"),
+                                new byte[] {70, 108, 105, 110, 107})
+                        // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+                        // bytes
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("Apache"),
+                                new byte[] {65, 112, 97, 99, 104, 101}),
+                CastTestSpecBuilder.testCastTo(VARBINARY(4))
+                        .fromCase(CHAR(3), StringData.fromString("foo"), new byte[] {102, 111, 111})
+                        .fromCase(
+                                VARCHAR(5),
+                                StringData.fromString("Flink"),
+                                new byte[] {70, 108, 105, 110, 107})
+                        // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+                        // bytes
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("Apache"),
+                                new byte[] {65, 112, 97, 99, 104, 101}),
+                CastTestSpecBuilder.testCastTo(BYTES())
+                        .fromCase(CHAR(3), StringData.fromString("foo"), new byte[] {102, 111, 111})
+                        .fromCase(
+                                VARCHAR(5),
+                                StringData.fromString("Flink"),
+                                new byte[] {70, 108, 105, 110, 107})
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("Apache"),
+                                new byte[] {65, 112, 97, 99, 104, 101}),
                 CastTestSpecBuilder.testCastTo(DECIMAL(5, 3))
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("1.234"),
+                                DecimalData.fromBigDecimal(new BigDecimal("1.234"), 5, 3))
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("1.2"),
+                                DecimalData.fromBigDecimal(new BigDecimal("1.200"), 5, 3))
                         .fromCase(
                                 DECIMAL(4, 3),
                                 DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -641,12 +827,21 @@ class CastRulesTest {
             this.inputTypes.add(dataType);
             this.assertionExecutors.add(
                     executor -> {
-                        assertEquals(target, executor.cast(src));
-                        // Run twice to make sure rules are reusable without causing issues
-                        assertEquals(
-                                target,
-                                executor.cast(src),
-                                "Error when reusing the rule. Perhaps there is some state that needs to be reset");
+                        if (target instanceof byte[]) {
+                            assertArrayEquals((byte[]) target, (byte[]) executor.cast(src));
+                            // Run twice to make sure rules are reusable without causing issues
+                            assertArrayEquals(
+                                    (byte[]) target,
+                                    (byte[]) executor.cast(src),
+                                    "Error when reusing the rule. Perhaps there is some state that needs to be reset");
+                        } else {
+                            assertEquals(target, executor.cast(src));
+                            // Run twice to make sure rules are reusable without causing issues
+                            assertEquals(
+                                    target,
+                                    executor.cast(src),
+                                    "Error when reusing the rule. Perhaps there is some state that needs to be reset");
+                        }
                     });
             this.descriptions.add("{" + src + " => " + target + "}");
             this.castContexts.add(castContext);
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java
index 2aebca5..e534337 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java
@@ -18,18 +18,24 @@
 package org.apache.flink.table.data.binary;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.runtime.util.SegmentsUtil;
 import org.apache.flink.table.runtime.util.StringUtf8Utils;
+import org.apache.flink.table.utils.DateTimeUtils;
 import org.apache.flink.table.utils.EncodingUtils;
 
 import java.math.BigDecimal;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
+import java.time.DateTimeException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.TimeZone;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -144,12 +150,16 @@ public class BinaryStringDataUtil {
         return substrings.toArray(new BinaryStringData[0]);
     }
 
-    /** Decide boolean representation of a string. */
-    public static Boolean toBooleanSQL(BinaryStringData str) {
+    /** Parse a {@link StringData} to boolean. */
+    public static boolean toBoolean(BinaryStringData str) throws TableException {
         BinaryStringData lowerCase = str.toLowerCase();
-        return TRUE_STRINGS.contains(lowerCase)
-                ? Boolean.TRUE
-                : (FALSE_STRINGS.contains(lowerCase) ? Boolean.FALSE : null);
+        if (TRUE_STRINGS.contains(lowerCase)) {
+            return true;
+        }
+        if (FALSE_STRINGS.contains(lowerCase)) {
+            return false;
+        }
+        throw new TableException("Cannot parse '" + str + "' as BOOLEAN.");
     }
 
     /** Calculate the hash value of the given bytes use {@link MessageDigest}. */
@@ -168,21 +178,30 @@ public class BinaryStringDataUtil {
     }
 
     /**
-     * Parses this BinaryStringData to DecimalData.
+     * Parses a {@link BinaryStringData} to {@link DecimalData}.
      *
-     * @return DecimalData value if the parsing was successful, or null if overflow
-     * @throws NumberFormatException if the parsing failed.
+     * @return DecimalData value if the parsing was successful.
      */
-    public static DecimalData toDecimal(BinaryStringData str, int precision, int scale) {
+    public static DecimalData toDecimal(BinaryStringData str, int precision, int scale)
+            throws NumberFormatException {
         str.ensureMaterialized();
 
+        DecimalData data;
+
         if (DecimalDataUtils.isByteArrayDecimal(precision)
                 || DecimalDataUtils.isByteArrayDecimal(str.getSizeInBytes())) {
-            return toBigPrecisionDecimal(str, precision, scale);
+            data = toBigPrecisionDecimal(str, precision, scale);
+        } else {
+            int sizeInBytes = str.getSizeInBytes();
+            data =
+                    toDecimalFromBytes(
+                            precision, scale, getTmpBytes(str, sizeInBytes), 0, sizeInBytes);
         }
 
-        int sizeInBytes = str.getSizeInBytes();
-        return toDecimalFromBytes(precision, scale, getTmpBytes(str, sizeInBytes), 0, sizeInBytes);
+        if (data == null) {
+            throw numberFormatExceptionFor(str, "Overflow.");
+        }
+        return data;
     }
 
     private static DecimalData toDecimalFromBytes(
@@ -353,12 +372,9 @@ public class BinaryStringDataUtil {
                     break;
                 }
             }
-            try {
-                BigDecimal bd = new BigDecimal(chars, start, end - start);
-                return DecimalData.fromBigDecimal(bd, precision, scale);
-            } catch (NumberFormatException nfe) {
-                return null;
-            }
+
+            BigDecimal bd = new BigDecimal(chars, start, end - start);
+            return DecimalData.fromBigDecimal(bd, precision, scale);
         }
     }
 
@@ -371,14 +387,12 @@ public class BinaryStringDataUtil {
      * Long.MIN_VALUE is '-9223372036854775808'.
      *
      * <p>This code is mostly copied from LazyLong.parseLong in Hive.
-     *
-     * @return Long value if the parsing was successful else null.
      */
-    public static Long toLong(BinaryStringData str) {
+    public static long toLong(BinaryStringData str) throws NumberFormatException {
         int sizeInBytes = str.getSizeInBytes();
         byte[] tmpBytes = getTmpBytes(str, sizeInBytes);
         if (sizeInBytes == 0) {
-            return null;
+            throw numberFormatExceptionFor(str, "Input is empty.");
         }
         int i = 0;
 
@@ -387,7 +401,7 @@ public class BinaryStringDataUtil {
         if (negative || b == '+') {
             i++;
             if (sizeInBytes == 1) {
-                return null;
+                throw numberFormatExceptionFor(str, "Input has only positive or negative symbol.");
             }
         }
 
@@ -409,7 +423,7 @@ public class BinaryStringDataUtil {
             if (b >= '0' && b <= '9') {
                 digit = b - '0';
             } else {
-                return null;
+                throw numberFormatExceptionFor(str, "Invalid character found.");
             }
 
             // We are going to process the new digit and accumulate the result. However, before
@@ -417,7 +431,7 @@ public class BinaryStringDataUtil {
             // stopValue(Long.MIN_VALUE / radix), then result * 10 will definitely be smaller
             // than minValue, and we can stop.
             if (result < stopValue) {
-                return null;
+                throw numberFormatExceptionFor(str, "Overflow.");
             }
 
             result = result * radix - digit;
@@ -425,7 +439,7 @@ public class BinaryStringDataUtil {
             // stopValue(Long.MIN_VALUE / radix), we can just use `result > 0` to check overflow.
             // If result overflows, we should stop.
             if (result > 0) {
-                return null;
+                throw numberFormatExceptionFor(str, "Overflow.");
             }
         }
 
@@ -435,7 +449,7 @@ public class BinaryStringDataUtil {
         while (i < sizeInBytes) {
             byte currentByte = tmpBytes[i];
             if (currentByte < '0' || currentByte > '9') {
-                return null;
+                throw numberFormatExceptionFor(str, "Invalid character found.");
             }
             i++;
         }
@@ -443,7 +457,7 @@ public class BinaryStringDataUtil {
         if (!negative) {
             result = -result;
             if (result < 0) {
-                return null;
+                throw numberFormatExceptionFor(str, "Overflow.");
             }
         }
         return result;
@@ -461,14 +475,12 @@ public class BinaryStringDataUtil {
      *
      * <p>Note that, this method is almost same as `toLong`, but we leave it duplicated for
      * performance reasons, like Hive does.
-     *
-     * @return Integer value if the parsing was successful else null.
      */
-    public static Integer toInt(BinaryStringData str) {
+    public static int toInt(BinaryStringData str) throws NumberFormatException {
         int sizeInBytes = str.getSizeInBytes();
         byte[] tmpBytes = getTmpBytes(str, sizeInBytes);
         if (sizeInBytes == 0) {
-            return null;
+            throw numberFormatExceptionFor(str, "Input is empty.");
         }
         int i = 0;
 
@@ -477,7 +489,7 @@ public class BinaryStringDataUtil {
         if (negative || b == '+') {
             i++;
             if (sizeInBytes == 1) {
-                return null;
+                throw numberFormatExceptionFor(str, "Input has only positive or negative symbol.");
             }
         }
 
@@ -499,7 +511,7 @@ public class BinaryStringDataUtil {
             if (b >= '0' && b <= '9') {
                 digit = b - '0';
             } else {
-                return null;
+                throw numberFormatExceptionFor(str, "Invalid character found.");
             }
 
             // We are going to process the new digit and accumulate the result. However, before
@@ -507,7 +519,7 @@ public class BinaryStringDataUtil {
             // stopValue(Long.MIN_VALUE / radix), then result * 10 will definitely be smaller
             // than minValue, and we can stop.
             if (result < stopValue) {
-                return null;
+                throw numberFormatExceptionFor(str, "Overflow.");
             }
 
             result = result * radix - digit;
@@ -515,7 +527,7 @@ public class BinaryStringDataUtil {
             // stopValue(Long.MIN_VALUE / radix), we can just use `result > 0` to check overflow.
             // If result overflows, we should stop.
             if (result > 0) {
-                return null;
+                throw numberFormatExceptionFor(str, "Overflow.");
             }
         }
 
@@ -525,7 +537,7 @@ public class BinaryStringDataUtil {
         while (i < sizeInBytes) {
             byte currentByte = tmpBytes[i];
             if (currentByte < '0' || currentByte > '9') {
-                return null;
+                throw numberFormatExceptionFor(str, "Invalid character found.");
             }
             i++;
         }
@@ -533,48 +545,77 @@ public class BinaryStringDataUtil {
         if (!negative) {
             result = -result;
             if (result < 0) {
-                return null;
+                throw numberFormatExceptionFor(str, "Overflow.");
             }
         }
         return result;
     }
 
-    public static Short toShort(BinaryStringData str) {
-        Integer intValue = toInt(str);
-        if (intValue != null) {
-            short result = intValue.shortValue();
-            if (result == intValue) {
-                return result;
-            }
+    public static short toShort(BinaryStringData str) throws NumberFormatException {
+        int intValue = toInt(str);
+        short result = (short) intValue;
+        if (result == intValue) {
+            return result;
         }
-        return null;
+        throw numberFormatExceptionFor(str, "Overflow.");
     }
 
-    public static Byte toByte(BinaryStringData str) {
-        Integer intValue = toInt(str);
-        if (intValue != null) {
-            byte result = intValue.byteValue();
-            if (result == intValue) {
-                return result;
-            }
+    public static byte toByte(BinaryStringData str) throws NumberFormatException {
+        int intValue = toInt(str);
+        byte result = (byte) intValue;
+        if (result == intValue) {
+            return result;
         }
-        return null;
+        throw numberFormatExceptionFor(str, "Overflow.");
     }
 
-    public static Double toDouble(BinaryStringData str) {
-        try {
-            return Double.valueOf(str.toString());
-        } catch (NumberFormatException e) {
-            return null;
+    public static double toDouble(BinaryStringData str) throws NumberFormatException {
+        return Double.parseDouble(str.toString());
+    }
+
+    public static float toFloat(BinaryStringData str) throws NumberFormatException {
+        return Float.parseFloat(str.toString());
+    }
+
+    private static NumberFormatException numberFormatExceptionFor(StringData input, String reason) {
+        return new NumberFormatException("For input string: '" + input + "'. " + reason);
+    }
+
+    public static int toDate(BinaryStringData input) throws DateTimeException {
+        Integer date = DateTimeUtils.dateStringToUnixDate(input.toString());
+        if (date == null) {
+            throw new DateTimeException("For input string: '" + input + "'.");
         }
+
+        return date;
     }
 
-    public static Float toFloat(BinaryStringData str) {
-        try {
-            return Float.valueOf(str.toString());
-        } catch (NumberFormatException e) {
-            return null;
+    public static int toTime(BinaryStringData input) throws DateTimeException {
+        Integer date = DateTimeUtils.timeStringToUnixDate(input.toString());
+        if (date == null) {
+            throw new DateTimeException("For input string: '" + input + "'.");
+        }
+
+        return date;
+    }
+
+    public static TimestampData toTimestamp(BinaryStringData input) throws DateTimeException {
+        TimestampData timestamp = DateTimeUtils.toTimestampData(input.toString());
+        if (timestamp == null) {
+            throw new DateTimeException("For input string: '" + input + "'.");
         }
+
+        return timestamp;
+    }
+
+    public static TimestampData toTimestamp(BinaryStringData input, TimeZone timeZone)
+            throws DateTimeException {
+        Long timestamp = DateTimeUtils.toTimestamp(input.toString(), timeZone);
+        if (timestamp == null) {
+            throw new DateTimeException("For input string: '" + input + "'.");
+        }
+
+        return TimestampData.fromEpochMillis(timestamp);
     }
 
     /**
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java
index 4a8741f..bfb979d 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java
@@ -60,6 +60,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -420,29 +421,29 @@ public class BinaryStringDataTest {
     @Test
     public void testToNumeric() {
         // Test to integer.
-        assertEquals(Byte.valueOf("123"), toByte(fromString("123")));
-        assertEquals(Byte.valueOf("123"), toByte(fromString("+123")));
-        assertEquals(Byte.valueOf("-123"), toByte(fromString("-123")));
+        assertEquals(Byte.parseByte("123"), toByte(fromString("123")));
+        assertEquals(Byte.parseByte("123"), toByte(fromString("+123")));
+        assertEquals(Byte.parseByte("-123"), toByte(fromString("-123")));
 
-        assertEquals(Short.valueOf("123"), toShort(fromString("123")));
-        assertEquals(Short.valueOf("123"), toShort(fromString("+123")));
-        assertEquals(Short.valueOf("-123"), toShort(fromString("-123")));
+        assertEquals(Short.parseShort("123"), toShort(fromString("123")));
+        assertEquals(Short.parseShort("123"), toShort(fromString("+123")));
+        assertEquals(Short.parseShort("-123"), toShort(fromString("-123")));
 
-        assertEquals(Integer.valueOf("123"), toInt(fromString("123")));
-        assertEquals(Integer.valueOf("123"), toInt(fromString("+123")));
-        assertEquals(Integer.valueOf("-123"), toInt(fromString("-123")));
+        assertEquals(Integer.parseInt("123"), toInt(fromString("123")));
+        assertEquals(Integer.parseInt("123"), toInt(fromString("+123")));
+        assertEquals(Integer.parseInt("-123"), toInt(fromString("-123")));
 
-        assertEquals(Long.valueOf("1234567890"), toLong(fromString("1234567890")));
-        assertEquals(Long.valueOf("+1234567890"), toLong(fromString("+1234567890")));
-        assertEquals(Long.valueOf("-1234567890"), toLong(fromString("-1234567890")));
+        assertEquals(Long.parseLong("1234567890"), toLong(fromString("1234567890")));
+        assertEquals(Long.parseLong("+1234567890"), toLong(fromString("+1234567890")));
+        assertEquals(Long.parseLong("-1234567890"), toLong(fromString("-1234567890")));
 
         // Test decimal string to integer.
-        assertEquals(Integer.valueOf("123"), toInt(fromString("123.456789")));
-        assertEquals(Long.valueOf("123"), toLong(fromString("123.456789")));
+        assertEquals(Integer.parseInt("123"), toInt(fromString("123.456789")));
+        assertEquals(Long.parseLong("123"), toLong(fromString("123.456789")));
 
         // Test negative cases.
-        assertNull(toInt(fromString("1a3.456789")));
-        assertNull(toInt(fromString("123.a56789")));
+        assertThrows(NumberFormatException.class, () -> toInt(fromString("1a3.456789")));
+        assertThrows(NumberFormatException.class, () -> toInt(fromString("123.a56789")));
 
         // Test composite in BinaryRowData.
         BinaryRowData row = new BinaryRowData(20);
@@ -453,10 +454,10 @@ public class BinaryStringDataTest {
         writer.writeString(3, BinaryStringData.fromString("123456789"));
         writer.complete();
 
-        assertEquals(Byte.valueOf("1"), toByte(((BinaryStringData) row.getString(0))));
-        assertEquals(Short.valueOf("123"), toShort(((BinaryStringData) row.getString(1))));
-        assertEquals(Integer.valueOf("12345"), toInt(((BinaryStringData) row.getString(2))));
-        assertEquals(Long.valueOf("123456789"), toLong(((BinaryStringData) row.getString(3))));
+        assertEquals(Byte.parseByte("1"), toByte(((BinaryStringData) row.getString(0))));
+        assertEquals(Short.parseShort("123"), toShort(((BinaryStringData) row.getString(1))));
+        assertEquals(Integer.parseInt("12345"), toInt(((BinaryStringData) row.getString(2))));
+        assertEquals(Long.parseLong("123456789"), toLong(((BinaryStringData) row.getString(3))));
     }
 
     @Test

[flink] 03/03: [FLINK-24781][table-planner] Refactor cast of literals to use CastExecutor

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9f7eef293f723800945a9759c50adbf8786a2bd4
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Tue Nov 16 10:48:08 2021 +0100

    [FLINK-24781][table-planner] Refactor cast of literals to use CastExecutor
    
    Signed-off-by: slinkydeveloper <fr...@gmail.com>
    
    This closes #17800.
---
 .../CodeGeneratedExpressionCastExecutor.java       |  3 +-
 .../flink/table/planner/codegen/CodeGenUtils.scala | 26 ++++++-
 .../table/planner/codegen/GenerateUtils.scala      | 16 ----
 .../planner/codegen/calls/BuiltInMethods.scala     |  1 -
 .../table/planner/codegen/calls/IfCallGen.scala    |  7 +-
 .../planner/codegen/calls/ScalarOperatorGens.scala | 89 ++++++++++++----------
 .../validation/ScalarOperatorsValidationTest.scala | 12 +--
 7 files changed, 85 insertions(+), 69 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
index 7c361ac..6e57593 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
@@ -57,7 +57,8 @@ class CodeGeneratedExpressionCastExecutor<IN, OUT> implements CastExecutor<IN, O
                 throw (TableException) e.getCause();
             }
             throw new TableException(
-                    "Cannot execute the compiled expression for an unknown cause", e);
+                    "Cannot execute the compiled expression for an unknown cause. " + e.getCause(),
+                    e);
         }
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index 22bb463..b21d097 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -21,7 +21,6 @@ package org.apache.flink.table.planner.codegen
 import java.lang.reflect.Method
 import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong, Object => JObject, Short => JShort}
 import java.util.concurrent.atomic.AtomicLong
-
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.RuntimeContext
 import org.apache.flink.core.memory.MemorySegment
@@ -33,10 +32,10 @@ import org.apache.flink.table.data.util.DataFormatConverters.IdentityConverter
 import org.apache.flink.table.data.utils.JoinedRowData
 import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.planner.codegen.GenerateUtils.{generateInputFieldUnboxing, generateNonNullField}
+import org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING
 import org.apache.flink.table.runtime.dataview.StateDataViewStore
 import org.apache.flink.table.runtime.generated.{AggsHandleFunction, HashFunction, NamespaceAggsHandleFunction, TableAggsHandleFunction}
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
-import org.apache.flink.table.runtime.types.PlannerTypeUtils.isInteroperable
 import org.apache.flink.table.runtime.typeutils.TypeCheckUtils
 import org.apache.flink.table.runtime.util.{MurmurHashUtil, TimeWindowUtil}
 import org.apache.flink.table.types.DataType
@@ -46,6 +45,7 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{getFieldCount, getPrecision, getScale}
 import org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toInternalConversionClass
 import org.apache.flink.table.types.utils.DataTypeUtils.isInternal
+import org.apache.flink.table.utils.EncodingUtils
 import org.apache.flink.types.{Row, RowKind}
 
 import scala.annotation.tailrec
@@ -195,6 +195,28 @@ object CodeGenUtils {
     case _ => boxedTypeTermForType(t)
   }
 
+  /**
+   * Converts values to stringified representation to include in the codegen.
+   *
+   * This method doesn't support complex types.
+   */
+  def primitiveLiteralForType(value: Any): String = value match {
+    // ordered by type root definition
+    case _: JBoolean => value.toString
+    case _: JByte => s"((byte)$value)"
+    case _: JShort => s"((short)$value)"
+    case _: JInt => value.toString
+    case _: JLong => value.toString + "L"
+    case _: JFloat => value.toString + "f"
+    case _: JDouble => value.toString + "d"
+    case sd: StringData =>
+      qualifyMethod(BINARY_STRING_DATA_FROM_STRING) + "(\"" +
+        EncodingUtils.escapeJava(sd.toString) + "\")"
+    case td: TimestampData =>
+      s"$TIMESTAMP_DATA.fromEpochMillis(${td.getMillisecond}L, ${td.getNanoOfMillisecond})"
+    case _ => throw new IllegalArgumentException("Illegal literal type: " + value.getClass)
+  }
+
   @tailrec
   def boxedTypeTermForType(t: LogicalType): String = t.getTypeRoot match {
     // ordered by type root definition
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
index d113953..cc612ac 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
@@ -142,22 +142,6 @@ object GenerateUtils {
 
 
   /**
-    * Generates a string result call with auxiliary statements and result expression.
-    * This will convert the String result to BinaryStringData.
-    */
-  def generateStringResultCallWithStmtIfArgsNotNull(
-      ctx: CodeGeneratorContext,
-      operands: Seq[GeneratedExpression],
-      returnType: LogicalType)
-      (call: Seq[String] => (String, String)): GeneratedExpression = {
-    generateCallWithStmtIfArgsNotNull(ctx, returnType, operands) {
-      args =>
-        val (stmt, result) = call(args)
-        (stmt, s"$BINARY_STRING.fromString($result)")
-    }
-  }
-
-  /**
     * Generates a call with the nullable args.
     */
   def generateCallIfArgsNullable(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index 308826d..824f362 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -29,7 +29,6 @@ import org.apache.flink.table.data.binary.{BinaryStringData, BinaryStringDataUti
 
 import java.lang.reflect.Method
 import java.lang.{Byte => JByte, Integer => JInteger, Long => JLong, Short => JShort}
-import java.time.ZoneId
 import java.util.TimeZone
 
 object BuiltInMethods {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala
index af8061c..5fe1dd1 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala
@@ -19,10 +19,9 @@
 package org.apache.flink.table.planner.codegen.calls
 
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, primitiveDefaultValue, primitiveTypeTermForType}
-import org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens.toCastContext
-import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGenUtils, CodeGeneratorContext, GeneratedExpression}
+import org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens.toCodegenCastContext
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedExpression}
 import org.apache.flink.table.planner.functions.casting.{CastRuleProvider, ExpressionCodeGeneratorCastRule}
-import org.apache.flink.table.runtime.types.PlannerTypeUtils.isInteroperable
 import org.apache.flink.table.types.logical.LogicalType
 
 /**
@@ -86,7 +85,7 @@ class IfCallGen() extends CallGenerator {
       rule match {
         case codeGeneratorCastRule: ExpressionCodeGeneratorCastRule[_, _] =>
           codeGeneratorCastRule.generateExpression(
-            toCastContext(ctx),
+            toCodegenCastContext(ctx),
             expr.resultTerm,
             expr.resultType,
             targetType
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 8554e4f..dc8bb63 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -20,13 +20,14 @@ package org.apache.flink.table.planner.codegen.calls
 
 import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.data.binary.BinaryArrayData
-import org.apache.flink.table.planner.functions.casting.{CastRuleProvider, CodeGeneratorCastRule, ExpressionCodeGeneratorCastRule}
+import org.apache.flink.table.planner.functions.casting.{CastRule, CastRuleProvider, CodeGeneratorCastRule, ExpressionCodeGeneratorCastRule}
 import org.apache.flink.table.data.util.MapDataUtil
+import org.apache.flink.table.data.utils.CastExecutor
 import org.apache.flink.table.data.writer.{BinaryArrayWriter, BinaryRowWriter}
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{binaryRowFieldSetAccess, binaryRowSetNull, binaryWriterWriteField, binaryWriterWriteNull, _}
 import org.apache.flink.table.planner.codegen.GenerateUtils._
 import org.apache.flink.table.planner.codegen.GeneratedExpression.{ALWAYS_NULL, NEVER_NULL, NO_CODE}
-import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGenUtils, CodeGeneratorContext, GeneratedExpression}
+import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGeneratorContext, GeneratedExpression}
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
 import org.apache.flink.table.runtime.functions.SqlFunctionUtils
 import org.apache.flink.table.runtime.types.PlannerTypeUtils
@@ -42,6 +43,7 @@ import org.apache.flink.table.utils.DateTimeUtils
 import org.apache.flink.util.Preconditions.checkArgument
 import org.apache.flink.table.utils.DateTimeUtils.MILLIS_PER_DAY
 
+import java.time.ZoneId
 import java.util.Arrays.asList
 import scala.collection.JavaConversions._
 
@@ -487,7 +489,7 @@ object ScalarOperatorGens {
     // for performance, we cast literal string to literal time.
     else if (isTimePoint(left.resultType) && isCharacterString(right.resultType)) {
       if (right.literal) {
-        generateEquals(ctx, left, generateCastStringLiteralToDateTime(ctx, right, left.resultType))
+        generateEquals(ctx, left, generateCastLiteral(ctx, right, left.resultType))
       } else {
         generateEquals(ctx, left, generateCast(ctx, right, left.resultType))
       }
@@ -496,7 +498,7 @@ object ScalarOperatorGens {
       if (left.literal) {
         generateEquals(
           ctx,
-          generateCastStringLiteralToDateTime(ctx, left, right.resultType),
+          generateCastLiteral(ctx, left, right.resultType),
           right)
       } else {
         generateEquals(ctx, generateCast(ctx, left, right.resultType), right)
@@ -946,7 +948,7 @@ object ScalarOperatorGens {
 
         // Generate the code block
         val castCodeBlock = codeGeneratorCastRule.generateCodeBlock(
-          toCastContext(ctx),
+          toCodegenCastContext(ctx),
           operand.resultTerm,
           operand.nullTerm,
           inputType,
@@ -1942,42 +1944,43 @@ object ScalarOperatorGens {
     }
   }
 
-  private def generateCastStringLiteralToDateTime(
-      ctx: CodeGeneratorContext,
-      stringLiteral: GeneratedExpression,
-      expectType: LogicalType): GeneratedExpression = {
-    checkArgument(stringLiteral.literal)
-    if (java.lang.Boolean.valueOf(stringLiteral.nullTerm)) {
-      return generateNullLiteral(expectType, nullCheck = true)
+  /**
+   * This method supports casting literals to non-composite types (primitives, strings, date time).
+   * Every cast result is declared as class member, in order to be able to reuse it.
+   */
+  private def generateCastLiteral(
+     ctx: CodeGeneratorContext,
+     literalExpr: GeneratedExpression,
+     resultType: LogicalType): GeneratedExpression = {
+    checkArgument(literalExpr.literal)
+    if (java.lang.Boolean.valueOf(literalExpr.nullTerm)) {
+      return generateNullLiteral(resultType, nullCheck = true)
     }
 
-    val stringValue = stringLiteral.literalValue.get.toString
-    val literalCode = expectType.getTypeRoot match {
-      case DATE =>
-        DateTimeUtils.dateStringToUnixDate(stringValue) match {
-          case null => throw new ValidationException(s"String '$stringValue' is not a valid date")
-          case v => v
-        }
-      case TIME_WITHOUT_TIME_ZONE =>
-        DateTimeUtils.timeStringToUnixDate(stringValue) match {
-          case null => throw new ValidationException(s"String '$stringValue' is not a valid time")
-          case v => v
-        }
-      case TIMESTAMP_WITHOUT_TIME_ZONE =>
-        DateTimeUtils.toTimestampData(stringValue) match {
-          case null =>
-            throw new ValidationException(s"String '$stringValue' is not a valid timestamp")
-          case v => s"${CodeGenUtils.TIMESTAMP_DATA}.fromEpochMillis(" +
-            s"${v.getMillisecond}L, ${v.getNanoOfMillisecond})"
-        }
-      case _ => throw new UnsupportedOperationException
+    val castExecutor = CastRuleProvider.create(
+      toCastContext(ctx),
+      literalExpr.resultType,
+      resultType
+    ).asInstanceOf[CastExecutor[Any, Any]]
+
+    if (castExecutor == null) {
+      throw new CodeGenException(
+        s"Unsupported casting from ${literalExpr.resultType} to $resultType")
     }
 
-    val typeTerm = primitiveTypeTermForType(expectType)
-    val resultTerm = newName("stringToTime")
-    val stmt = s"$typeTerm $resultTerm = $literalCode;"
-    ctx.addReusableMember(stmt)
-    GeneratedExpression(resultTerm, "false", "", expectType)
+    try {
+      val result = castExecutor.cast(literalExpr.literalValue.get)
+      val resultTerm = newName("stringToTime")
+
+      val declStmt =
+        s"${primitiveTypeTermForType(resultType)} $resultTerm = ${primitiveLiteralForType(result)};"
+
+      ctx.addReusableMember(declStmt)
+      GeneratedExpression(resultTerm, "false", "", resultType, Some(result))
+    } catch {
+      case e: Throwable =>
+        throw new ValidationException("Error when casting literal: " + e.getMessage, e)
+    }
   }
 
   private def generateArrayComparison(
@@ -2169,7 +2172,7 @@ object ScalarOperatorGens {
     rule match {
       case codeGeneratorCastRule: ExpressionCodeGeneratorCastRule[_, _] =>
         operandTerm => codeGeneratorCastRule.generateExpression(
-          toCastContext(ctx),
+          toCodegenCastContext(ctx),
           operandTerm,
           operandType,
           resultType
@@ -2179,7 +2182,7 @@ object ScalarOperatorGens {
     }
   }
 
-  def toCastContext(ctx: CodeGeneratorContext): CodeGeneratorCastRule.Context = {
+  def toCodegenCastContext(ctx: CodeGeneratorContext): CodeGeneratorCastRule.Context = {
     new CodeGeneratorCastRule.Context {
       override def getSessionTimeZoneTerm: String = ctx.addReusableSessionTimeZone()
       override def declareVariable(ty: String, variablePrefix: String): String =
@@ -2193,4 +2196,12 @@ object ScalarOperatorGens {
     }
   }
 
+  def toCastContext(ctx: CodeGeneratorContext): CastRule.Context = {
+    new CastRule.Context {
+      override def getSessionZoneId: ZoneId = ctx.tableConfig.getLocalTimeZone
+
+      override def getClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader
+    }
+  }
+
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarOperatorsValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarOperatorsValidationTest.scala
index 4b27008..5fc4b72 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarOperatorsValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarOperatorsValidationTest.scala
@@ -88,24 +88,24 @@ class ScalarOperatorsValidationTest extends ScalarOperatorsTestBase {
   @Test
   def testTemporalTypeEqualsInvalidStringLiteral(): Unit = {
     testExpectedSqlException(
-      "f15 = 'invalid'", "is not a valid date",
+      "f15 = 'invalid'", "java.time.DateTimeException",
       classOf[ValidationException])
     testExpectedSqlException(
-      "'invalid' = f15", "is not a valid date",
+      "'invalid' = f15", "java.time.DateTimeException",
       classOf[ValidationException])
 
     testExpectedSqlException(
-      "f21 = 'invalid'", "is not a valid time",
+      "f21 = 'invalid'", "java.time.DateTimeException",
       classOf[ValidationException])
     testExpectedSqlException(
-      "'invalid' = f21", "is not a valid time",
+      "'invalid' = f21", "java.time.DateTimeException",
       classOf[ValidationException])
 
     testExpectedSqlException(
-      "f22 = 'invalid'", "is not a valid timestamp",
+      "f22 = 'invalid'", "java.time.DateTimeException",
       classOf[ValidationException])
     testExpectedSqlException(
-      "'invalid' = f22", "is not a valid timestamp",
+      "'invalid' = f22", "java.time.DateTimeException",
       classOf[ValidationException])
   }
 }

[flink] 01/03: [FLINK-24781][table-planner] Added CastRule#canFail and make sure ScalarOperatorGens wraps the cast invocation in a try-catch

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 92c02fc747f7794f2c20ac161ad5d7b9c0f2c0f8
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Mon Nov 15 13:39:51 2021 +0100

    [FLINK-24781][table-planner] Added CastRule#canFail and make sure ScalarOperatorGens wraps the cast invocation in a try-catch
    
    Signed-off-by: slinkydeveloper <fr...@gmail.com>
---
 .../functions/casting/AbstractCastRule.java        |  5 ++
 .../AbstractExpressionCodeGeneratorCastRule.java   |  2 +
 .../table/planner/functions/casting/CastRule.java  |  2 +
 .../CodeGeneratedExpressionCastExecutor.java       |  7 ++-
 .../planner/codegen/calls/ScalarOperatorGens.scala | 60 +++++++++++++++++-----
 5 files changed, 63 insertions(+), 13 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java
index c193139..840c8df 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java
@@ -31,4 +31,9 @@ abstract class AbstractCastRule<IN, OUT> implements CastRule<IN, OUT> {
     public CastRulePredicate getPredicateDefinition() {
         return predicate;
     }
+
+    @Override
+    public boolean canFail() {
+        return false;
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
index 0b14ddc..aa0a50b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
@@ -25,7 +25,9 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
 
 import java.util.Collections;
 
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.boxedTypeTermForType;
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.box;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.cast;
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.unbox;
 
 /**
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java
index e93effb..58217e4 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java
@@ -45,6 +45,8 @@ public interface CastRule<IN, OUT> {
     CastExecutor<IN, OUT> create(
             Context context, LogicalType inputLogicalType, LogicalType targetLogicalType);
 
+    boolean canFail();
+
     /** Casting context. */
     interface Context {
         ZoneId getSessionZoneId();
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
index c94db8d..f39089a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
@@ -53,7 +53,12 @@ class CodeGeneratedExpressionCastExecutor<IN, OUT> implements CastExecutor<IN, O
             inputArray[0] = value;
             return (OUT) expressionEvaluator.evaluate(inputArray);
         } catch (InvocationTargetException e) {
-            throw new FlinkRuntimeException("Cannot execute the compiled expression", e);
+            if (e.getCause() instanceof TableException) {
+                // Expected exception created by the rule, so no need to wrap it
+                throw (TableException) e.getCause();
+            }
+            throw new TableException(
+                    "Cannot execute the compiled expression for an unknown cause", e);
         }
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 6a7fbb2..0cb0bea 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.codegen.calls
 
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.data.binary.BinaryArrayData
 import org.apache.flink.table.planner.functions.casting.{CastRuleProvider, CodeGeneratorCastRule, ExpressionCodeGeneratorCastRule}
 import org.apache.flink.table.data.util.MapDataUtil
@@ -953,17 +953,53 @@ object ScalarOperatorGens {
           targetType
         )
 
-        val castCode = s"\n" +
-          s"// --- Cast section generated by ${className(codeGeneratorCastRule.getClass)}\n" +
-          s"${castCodeBlock.getCode}" +
-          s"// --- End cast section\n"
-
-        return GeneratedExpression(
-          castCodeBlock.getReturnTerm,
-          castCodeBlock.getIsNullTerm,
-          operand.code + castCode,
-          targetType
-        )
+        if (codeGeneratorCastRule.canFail) {
+          val resultTerm = ctx.addReusableLocalVariable(
+            primitiveTypeTermForType(targetType),
+            "castRuleResult"
+          )
+          val nullTerm = ctx.addReusableLocalVariable(
+            "boolean",
+            "castRuleResultIsNull"
+          )
+
+          // TODO this code belongs to TRY_CAST, more than to CAST.
+          //  See https://issues.apache.org/jira/browse/FLINK-24385 for more details
+          val castCode =
+            s"""
+               | // --- Cast section generated by ${className(codeGeneratorCastRule.getClass)}
+               | try {
+               |   ${castCodeBlock.getCode}
+               |   $resultTerm = ${castCodeBlock.getReturnTerm};
+               |   $nullTerm = ${castCodeBlock.getIsNullTerm};
+               | } catch (${className[Throwable]} e) {
+               |   $resultTerm = ${primitiveDefaultValue(targetType)};
+               |   $nullTerm = true;
+               | }
+               | // --- End cast section
+               """.stripMargin
+
+          return GeneratedExpression(
+            resultTerm,
+            nullTerm,
+            operand.code + "\n" + castCode,
+            targetType
+          )
+        } else {
+          val castCode =
+            s"""
+               | // --- Cast section generated by ${className(codeGeneratorCastRule.getClass)}
+               | ${castCodeBlock.getCode}
+               | // --- End cast section
+               """.stripMargin
+
+          return GeneratedExpression(
+            castCodeBlock.getReturnTerm,
+            castCodeBlock.getIsNullTerm,
+            operand.code + castCode,
+            targetType
+          )
+        }
       case _ =>
     }