You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/11/22 07:05:53 UTC

[flink] 02/03: [FLINK-24781][table-planner] Add string parsing methods to BinaryStringDataUtil and add from string cast rules

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0426d8c7af0191f75e6aaa4696b3358de059dc67
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Mon Nov 15 13:40:34 2021 +0100

    [FLINK-24781][table-planner] Add string parsing methods to BinaryStringDataUtil and add from string cast rules
    
    Signed-off-by: slinkydeveloper <fr...@gmail.com>
---
 .../apache/flink/table/utils/DateTimeUtils.java    |   5 +-
 .../AbstractExpressionCodeGeneratorCastRule.java   |   9 +-
 .../functions/casting/CastRuleProvider.java        |   8 +
 .../CodeGeneratedExpressionCastExecutor.java       |   1 -
 .../functions/casting/StringToBinaryCastRule.java  |  50 +++++
 .../functions/casting/StringToBooleanCastRule.java |  55 ++++++
 .../functions/casting/StringToDateCastRule.java    |  55 ++++++
 .../functions/casting/StringToDecimalCastRule.java |  63 ++++++
 .../casting/StringToNumericPrimitiveCastRule.java  |  78 ++++++++
 .../functions/casting/StringToTimeCastRule.java    |  58 ++++++
 .../casting/StringToTimestampCastRule.java         |  63 ++++++
 .../planner/codegen/calls/BuiltInMethods.scala     |  68 +++++--
 .../planner/codegen/calls/ScalarOperatorGens.scala | 105 ----------
 .../planner/functions/casting/CastRulesTest.java   | 211 ++++++++++++++++++++-
 .../table/data/binary/BinaryStringDataUtil.java    | 169 ++++++++++-------
 .../flink/table/data/BinaryStringDataTest.java     |  41 ++--
 16 files changed, 826 insertions(+), 213 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
index fdd715b..06fc883 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
@@ -317,6 +317,7 @@ public class DateTimeUtils {
     // --------------------------------------------------------------------------------------------
     // String --> Timestamp conversion
     // --------------------------------------------------------------------------------------------
+
     public static TimestampData toTimestampData(String dateStr) {
         int length = dateStr.length();
         String format;
@@ -411,7 +412,7 @@ public class DateTimeUtils {
      * @param format date time string format
      * @param tz the time zone
      */
-    public static Long toTimestamp(String dateStr, String format, TimeZone tz) {
+    private static Long toTimestamp(String dateStr, String format, TimeZone tz) {
         SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
         formatter.setTimeZone(tz);
         try {
@@ -1717,7 +1718,7 @@ public class DateTimeUtils {
         return timeStringToUnixDate(v, 0);
     }
 
-    public static Integer timeStringToUnixDate(String v, int start) {
+    private static Integer timeStringToUnixDate(String v, int start) {
         final int colon1 = v.indexOf(':', start);
         // timezone hh:mm:ss[.ssssss][[+|-]hh:mm:ss]
         // refer https://www.w3.org/TR/NOTE-datetime
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
index aa0a50b..6700a7e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
@@ -73,7 +73,14 @@ abstract class AbstractExpressionCodeGeneratorCastRule<IN, OUT>
                         box(
                                 generateExpression(
                                         createCodeGeneratorCastRuleContext(context),
-                                        unbox(inputArgumentName, inputLogicalType),
+                                        unbox(
+                                                // We need the casting because the rules uses the
+                                                // concrete classes (e.g. StringData and
+                                                // BinaryStringData)
+                                                cast(
+                                                        boxedTypeTermForType(inputLogicalType),
+                                                        inputArgumentName),
+                                                inputLogicalType),
                                         inputLogicalType,
                                         targetLogicalType),
                                 targetLogicalType));
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
index b098191..cb5dfa2 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
@@ -58,6 +58,14 @@ public class CastRuleProvider {
                 .addRule(MapAndMultisetToStringCastRule.INSTANCE)
                 .addRule(RowToStringCastRule.INSTANCE)
                 .addRule(RawToStringCastRule.INSTANCE)
+                // From string rules
+                .addRule(StringToBooleanCastRule.INSTANCE)
+                .addRule(StringToDecimalCastRule.INSTANCE)
+                .addRule(StringToNumericPrimitiveCastRule.INSTANCE)
+                .addRule(StringToDateCastRule.INSTANCE)
+                .addRule(StringToTimeCastRule.INSTANCE)
+                .addRule(StringToTimestampCastRule.INSTANCE)
+                .addRule(StringToBinaryCastRule.INSTANCE)
                 // Collection rules
                 .addRule(ArrayToArrayCastRule.INSTANCE)
                 // Special rules
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
index f39089a..7c361ac 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.planner.functions.casting;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.utils.CastExecutor;
-import org.apache.flink.util.FlinkRuntimeException;
 
 import org.codehaus.janino.ExpressionEvaluator;
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBinaryCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBinaryCastRule.java
new file mode 100644
index 0000000..2a70de4
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBinaryCastRule.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+
+/**
+ * {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeFamily#BINARY_STRING} cast rule.
+ */
+class StringToBinaryCastRule extends AbstractExpressionCodeGeneratorCastRule<StringData, byte[]> {
+
+    static final StringToBinaryCastRule INSTANCE = new StringToBinaryCastRule();
+
+    private StringToBinaryCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(LogicalTypeFamily.BINARY_STRING)
+                        .build());
+    }
+
+    @Override
+    public String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        return methodCall(inputTerm, "toBytes");
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBooleanCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBooleanCastRule.java
new file mode 100644
index 0000000..b8730eb
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBooleanCastRule.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_BOOLEAN;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/** {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeRoot#BOOLEAN} cast rule. */
+class StringToBooleanCastRule extends AbstractExpressionCodeGeneratorCastRule<StringData, Boolean> {
+
+    static final StringToBooleanCastRule INSTANCE = new StringToBooleanCastRule();
+
+    private StringToBooleanCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(LogicalTypeRoot.BOOLEAN)
+                        .build());
+    }
+
+    @Override
+    public String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        return staticCall(STRING_DATA_TO_BOOLEAN(), inputTerm);
+    }
+
+    @Override
+    public boolean canFail() {
+        return true;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDateCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDateCastRule.java
new file mode 100644
index 0000000..2a31f22
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDateCastRule.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_DATE;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/** {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeRoot#DATE} cast rule. */
+class StringToDateCastRule extends AbstractExpressionCodeGeneratorCastRule<StringData, Integer> {
+
+    static final StringToDateCastRule INSTANCE = new StringToDateCastRule();
+
+    private StringToDateCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(LogicalTypeRoot.DATE)
+                        .build());
+    }
+
+    @Override
+    public String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        return staticCall(STRING_DATA_TO_DATE(), inputTerm);
+    }
+
+    @Override
+    public boolean canFail() {
+        return true;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDecimalCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDecimalCastRule.java
new file mode 100644
index 0000000..77108d8
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDecimalCastRule.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_DECIMAL;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/** {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeRoot#DECIMAL} cast rule. */
+class StringToDecimalCastRule
+        extends AbstractExpressionCodeGeneratorCastRule<StringData, DecimalData> {
+
+    static final StringToDecimalCastRule INSTANCE = new StringToDecimalCastRule();
+
+    private StringToDecimalCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(LogicalTypeRoot.DECIMAL)
+                        .build());
+    }
+
+    @Override
+    public String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final DecimalType targetDecimalType = (DecimalType) targetLogicalType;
+        return staticCall(
+                STRING_DATA_TO_DECIMAL(),
+                inputTerm,
+                targetDecimalType.getPrecision(),
+                targetDecimalType.getScale());
+    }
+
+    @Override
+    public boolean canFail() {
+        return true;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToNumericPrimitiveCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToNumericPrimitiveCastRule.java
new file mode 100644
index 0000000..cec9ca8
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToNumericPrimitiveCastRule.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_BYTE;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_DOUBLE;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_FLOAT;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_INT;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_LONG;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_SHORT;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/**
+ * {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeFamily#INTEGER_NUMERIC} and
+ * {@link LogicalTypeFamily#APPROXIMATE_NUMERIC} cast rule.
+ */
+class StringToNumericPrimitiveCastRule
+        extends AbstractExpressionCodeGeneratorCastRule<StringData, Number> {
+
+    static final StringToNumericPrimitiveCastRule INSTANCE = new StringToNumericPrimitiveCastRule();
+
+    private StringToNumericPrimitiveCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(LogicalTypeFamily.INTEGER_NUMERIC)
+                        .target(LogicalTypeFamily.APPROXIMATE_NUMERIC)
+                        .build());
+    }
+
+    @Override
+    public String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        switch (targetLogicalType.getTypeRoot()) {
+            case TINYINT:
+                return staticCall(STRING_DATA_TO_BYTE(), inputTerm);
+            case SMALLINT:
+                return staticCall(STRING_DATA_TO_SHORT(), inputTerm);
+            case INTEGER:
+                return staticCall(STRING_DATA_TO_INT(), inputTerm);
+            case BIGINT:
+                return staticCall(STRING_DATA_TO_LONG(), inputTerm);
+            case FLOAT:
+                return staticCall(STRING_DATA_TO_FLOAT(), inputTerm);
+            case DOUBLE:
+                return staticCall(STRING_DATA_TO_DOUBLE(), inputTerm);
+        }
+        throw new IllegalArgumentException("This is a bug. Please file an issue.");
+    }
+
+    @Override
+    public boolean canFail() {
+        return true;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimeCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimeCastRule.java
new file mode 100644
index 0000000..4c14b78
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimeCastRule.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_TIME;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/**
+ * {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeRoot#TIME_WITHOUT_TIME_ZONE} cast
+ * rule.
+ */
+class StringToTimeCastRule extends AbstractExpressionCodeGeneratorCastRule<StringData, Integer> {
+
+    static final StringToTimeCastRule INSTANCE = new StringToTimeCastRule();
+
+    private StringToTimeCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE)
+                        .build());
+    }
+
+    @Override
+    public String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        return staticCall(STRING_DATA_TO_TIME(), inputTerm);
+    }
+
+    @Override
+    public boolean canFail() {
+        return true;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimestampCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimestampCastRule.java
new file mode 100644
index 0000000..96e40b3
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimestampCastRule.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_TIMESTAMP;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_TIMESTAMP_WITH_ZONE;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/** {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeFamily#TIMESTAMP} cast rule. */
+class StringToTimestampCastRule
+        extends AbstractExpressionCodeGeneratorCastRule<StringData, TimestampData> {
+
+    static final StringToTimestampCastRule INSTANCE = new StringToTimestampCastRule();
+
+    private StringToTimestampCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(LogicalTypeFamily.TIMESTAMP)
+                        .build());
+    }
+
+    @Override
+    public String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        if (targetLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
+            return staticCall(STRING_DATA_TO_TIMESTAMP(), inputTerm);
+        }
+
+        return staticCall(
+                STRING_DATA_TO_TIMESTAMP_WITH_ZONE(), inputTerm, context.getSessionTimeZoneTerm());
+    }
+
+    @Override
+    public boolean canFail() {
+        return true;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index 15132b0..308826d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -22,14 +22,14 @@ import org.apache.flink.table.data.{DecimalData, DecimalDataUtils, TimestampData
 import org.apache.flink.table.runtime.functions._
 import org.apache.flink.table.utils.DateTimeUtils
 import org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange
-
 import org.apache.calcite.linq4j.tree.Types
 import org.apache.calcite.runtime.{JsonFunctions, SqlFunctions}
 import org.apache.calcite.sql.{SqlJsonExistsErrorBehavior, SqlJsonQueryEmptyOrErrorBehavior, SqlJsonQueryWrapperBehavior, SqlJsonValueEmptyOrErrorBehavior}
-import org.apache.flink.table.data.binary.BinaryStringData
+import org.apache.flink.table.data.binary.{BinaryStringData, BinaryStringDataUtil}
 
 import java.lang.reflect.Method
 import java.lang.{Byte => JByte, Integer => JInteger, Long => JLong, Short => JShort}
+import java.time.ZoneId
 import java.util.TimeZone
 
 object BuiltInMethods {
@@ -402,16 +402,6 @@ object BuiltInMethods {
     "toTimestampData",
     classOf[String], classOf[String])
 
-  val STRING_TO_TIMESTAMP_TIME_ZONE = Types.lookupMethod(
-    classOf[DateTimeUtils],
-    "toTimestamp",
-    classOf[String], classOf[TimeZone])
-
-  val STRING_TO_TIMESTAMP_WITH_FORMAT_TIME_ZONE = Types.lookupMethod(
-    classOf[DateTimeUtils],
-    "toTimestamp",
-    classOf[String], classOf[String], classOf[TimeZone])
-
   val TIMESTAMP_WITH_LOCAL_TIME_ZONE_TO_DATE = Types.lookupMethod(
     classOf[DateTimeUtils],
     "timestampWithLocalZoneToDate",
@@ -547,6 +537,60 @@ object BuiltInMethods {
   val BINARY_STRING_DATA_FROM_STRING = Types.lookupMethod(classOf[BinaryStringData], "fromString",
     classOf[String])
 
+  val STRING_DATA_TO_BOOLEAN = Types.lookupMethod(
+    classOf[BinaryStringDataUtil],
+    "toBoolean",
+    classOf[BinaryStringData])
+
+  val STRING_DATA_TO_DECIMAL = Types.lookupMethod(
+    classOf[BinaryStringDataUtil],
+    "toDecimal",
+    classOf[BinaryStringData],
+    classOf[Int],
+    classOf[Int])
+
+  val STRING_DATA_TO_LONG = Types.lookupMethod(
+    classOf[BinaryStringDataUtil],
+    "toLong",
+    classOf[BinaryStringData])
+
+  val STRING_DATA_TO_INT = Types.lookupMethod(
+    classOf[BinaryStringDataUtil],
+    "toInt",
+    classOf[BinaryStringData])
+
+  val STRING_DATA_TO_SHORT = Types.lookupMethod(
+    classOf[BinaryStringDataUtil],
+    "toShort",
+    classOf[BinaryStringData])
+
+  val STRING_DATA_TO_BYTE = Types.lookupMethod(
+    classOf[BinaryStringDataUtil],
+    "toByte",
+    classOf[BinaryStringData])
+
+  val STRING_DATA_TO_FLOAT = Types.lookupMethod(
+    classOf[BinaryStringDataUtil],
+    "toFloat",
+    classOf[BinaryStringData])
+
+  val STRING_DATA_TO_DOUBLE = Types.lookupMethod(
+    classOf[BinaryStringDataUtil],
+    "toDouble",
+    classOf[BinaryStringData])
+
+  val STRING_DATA_TO_DATE = Types.lookupMethod(
+    classOf[BinaryStringDataUtil], "toDate", classOf[BinaryStringData])
+
+  val STRING_DATA_TO_TIME = Types.lookupMethod(
+    classOf[BinaryStringDataUtil], "toTime", classOf[BinaryStringData])
+
+  val STRING_DATA_TO_TIMESTAMP = Types.lookupMethod(
+    classOf[BinaryStringDataUtil], "toTimestamp", classOf[BinaryStringData])
+
+  val STRING_DATA_TO_TIMESTAMP_WITH_ZONE = Types.lookupMethod(
+    classOf[BinaryStringDataUtil], "toTimestamp", classOf[BinaryStringData], classOf[TimeZone])
+
   // DecimalData functions
 
   val DECIMAL_TO_DECIMAL = Types.lookupMethod(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 0cb0bea..8554e4f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -1094,111 +1094,6 @@ object ScalarOperatorGens {
           operandTerm => s"$operandTerm.toBytes($serTerm)"
         }
 
-      // String -> Boolean
-      case (VARCHAR | CHAR, BOOLEAN) =>
-        val castedExpression = generateUnaryOperatorIfNotNull(
-          ctx,
-          targetType,
-          operand,
-          resultNullable = true) {
-          operandTerm => s"$BINARY_STRING_UTIL.toBooleanSQL($operandTerm)"
-        }
-        val resultTerm = newName("primitiveCastResult")
-        castedExpression.copy(
-          resultTerm = resultTerm,
-          code =
-            s"""
-               |${castedExpression.code}
-               |boolean $resultTerm = Boolean.TRUE.equals(${castedExpression.resultTerm});
-               |""".stripMargin
-        )
-
-      // String -> NUMERIC TYPE (not Character)
-      case (VARCHAR | CHAR, _)
-        if isNumeric(targetType) =>
-        targetType match {
-          case dt: DecimalType =>
-            generateUnaryOperatorIfNotNull(ctx, targetType, operand) { operandTerm =>
-              s"$BINARY_STRING_UTIL.toDecimal($operandTerm, ${dt.getPrecision}, ${dt.getScale})"
-            }
-          case _ =>
-            val methodName = targetType.getTypeRoot match {
-              case TINYINT => "toByte"
-              case SMALLINT => "toShort"
-              case INTEGER => "toInt"
-              case BIGINT => "toLong"
-              case DOUBLE => "toDouble"
-              case FLOAT => "toFloat"
-              case _ => null
-            }
-            assert(methodName != null, "Unexpected data type.")
-            generateUnaryOperatorIfNotNull(
-              ctx,
-              targetType,
-              operand,
-              resultNullable = true) {
-              operandTerm => s"($BINARY_STRING_UTIL.$methodName($operandTerm.trim()))"
-            }
-        }
-
-      // String -> Date
-      case (VARCHAR | CHAR, DATE) =>
-        generateUnaryOperatorIfNotNull(
-          ctx,
-          targetType,
-          operand,
-          resultNullable = true) {
-          operandTerm =>
-            s"${qualifyMethod(BuiltInMethods.STRING_TO_DATE)}($operandTerm.toString())"
-        }
-
-      // String -> Time
-      case (VARCHAR | CHAR, TIME_WITHOUT_TIME_ZONE) =>
-        generateUnaryOperatorIfNotNull(
-          ctx,
-          targetType,
-          operand,
-          resultNullable = true) {
-          operandTerm =>
-            s"${qualifyMethod(BuiltInMethods.STRING_TO_TIME)}($operandTerm.toString())"
-        }
-
-      // String -> Timestamp
-      case (VARCHAR | CHAR, TIMESTAMP_WITHOUT_TIME_ZONE) =>
-        generateUnaryOperatorIfNotNull(
-          ctx,
-          targetType,
-          operand,
-          resultNullable = true) {
-          operandTerm =>
-            s"""
-               |${qualifyMethod(BuiltInMethods.STRING_TO_TIMESTAMP)}($operandTerm.toString())
-           """.stripMargin
-        }
-
-      case (VARCHAR | CHAR, TIMESTAMP_WITH_LOCAL_TIME_ZONE) =>
-        generateCallWithStmtIfArgsNotNull(
-          ctx, targetType, Seq(operand), resultNullable = true) { operands =>
-          val zone = ctx.addReusableSessionTimeZone()
-          val method = qualifyMethod(BuiltInMethods.STRING_TO_TIMESTAMP_TIME_ZONE)
-          val toTimestampResultName = newName("toTimestampResult")
-          // this method call might return null
-          val stmt = s"Long $toTimestampResultName = $method(${operands.head}.toString(), $zone);"
-          val result =
-            s"""
-               |($toTimestampResultName == null ?
-               |  null :
-               |  $TIMESTAMP_DATA.fromEpochMillis($toTimestampResultName))
-               |""".stripMargin
-          (stmt, result)
-        }
-
-      // String -> binary
-      case (VARCHAR | CHAR, VARBINARY | BINARY) =>
-        generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-          operandTerm => s"$operandTerm.toBytes()"
-        }
-
       // Note: SQL2003 $6.12 - casting is not allowed between boolean and numeric types.
       //       Calcite does not allow it either.
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
index 3652787..127ece8 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.functions.casting;
 
 import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericMapData;
@@ -80,6 +81,7 @@ import static org.apache.flink.table.api.DataTypes.TINYINT;
 import static org.apache.flink.table.api.DataTypes.VARBINARY;
 import static org.apache.flink.table.api.DataTypes.VARCHAR;
 import static org.apache.flink.table.api.DataTypes.YEAR;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -90,9 +92,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
  */
 class CastRulesTest {
 
+    private static final ZoneId CET = ZoneId.of("CET");
+
     private static final CastRule.Context CET_CONTEXT =
-            CastRule.Context.create(
-                    ZoneId.of("CET"), Thread.currentThread().getContextClassLoader());
+            CastRule.Context.create(CET, Thread.currentThread().getContextClassLoader());
 
     private static final byte DEFAULT_POSITIVE_TINY_INT = (byte) 5;
     private static final byte DEFAULT_NEGATIVE_TINY_INT = (byte) -5;
@@ -125,6 +128,12 @@ class CastRulesTest {
         return Stream.of(
                 CastTestSpecBuilder.testCastTo(TINYINT())
                         .fromCase(TINYINT(), null, null)
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+                        .fromCase(STRING(), StringData.fromString("1.234"), (byte) 1)
+                        .fromCase(STRING(), StringData.fromString("123"), (byte) 123)
+                        .fail(STRING(), StringData.fromString("-130"), TableException.class)
                         .fromCase(
                                 DECIMAL(4, 3),
                                 DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -151,6 +160,12 @@ class CastRulesTest {
                         .fromCase(DOUBLE(), DEFAULT_NEGATIVE_DOUBLE, (byte) -123),
                 CastTestSpecBuilder.testCastTo(SMALLINT())
                         .fromCase(SMALLINT(), null, null)
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+                        .fromCase(STRING(), StringData.fromString("1.234"), (short) 1)
+                        .fromCase(STRING(), StringData.fromString("123"), (short) 123)
+                        .fail(STRING(), StringData.fromString("-32769"), TableException.class)
                         .fromCase(
                                 DECIMAL(4, 3),
                                 DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -187,6 +202,15 @@ class CastRulesTest {
                         .fromCase(DOUBLE(), DEFAULT_NEGATIVE_DOUBLE, (short) -123)
                         .fromCase(DOUBLE(), 123456.7890d, (short) -7616),
                 CastTestSpecBuilder.testCastTo(INT())
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+                        .fromCase(STRING(), StringData.fromString("1.234"), 1)
+                        .fromCase(STRING(), StringData.fromString("123"), 123)
+                        .fail(
+                                STRING(),
+                                StringData.fromString("-3276913443134"),
+                                TableException.class)
                         .fromCase(
                                 DECIMAL(4, 3),
                                 DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -229,6 +253,13 @@ class CastRulesTest {
                         .fromCase(INTERVAL(DAY(), SECOND()), 123L, 123),
                 CastTestSpecBuilder.testCastTo(BIGINT())
                         .fromCase(BIGINT(), null, null)
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+                        .fromCase(STRING(), StringData.fromString("1.234"), 1L)
+                        .fromCase(STRING(), StringData.fromString("123"), 123L)
+                        .fromCase(
+                                STRING(), StringData.fromString("-3276913443134"), -3276913443134L)
                         .fromCase(
                                 DECIMAL(4, 3),
                                 DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -266,6 +297,13 @@ class CastRulesTest {
                         .fromCase(DOUBLE(), 9234567891.12345d, 9234567891L),
                 CastTestSpecBuilder.testCastTo(FLOAT())
                         .fromCase(FLOAT(), null, null)
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+                        .fromCase(STRING(), StringData.fromString("1.234"), 1.234f)
+                        .fromCase(STRING(), StringData.fromString("123"), 123.0f)
+                        .fromCase(
+                                STRING(), StringData.fromString("-3276913443134"), -3.27691351E12f)
                         .fromCase(
                                 DECIMAL(4, 3),
                                 DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -307,6 +345,15 @@ class CastRulesTest {
                         .fromCase(DOUBLE(), 1239234567891.1234567891234d, 1.23923451E12f),
                 CastTestSpecBuilder.testCastTo(DOUBLE())
                         .fromCase(DOUBLE(), null, null)
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+                        .fromCase(STRING(), StringData.fromString("1.234"), 1.234d)
+                        .fromCase(STRING(), StringData.fromString("123"), 123.0d)
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("-3276913443134"),
+                                -3.276913443134E12d)
                         .fromCase(
                                 DECIMAL(4, 3),
                                 DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -349,6 +396,92 @@ class CastRulesTest {
                         .fromCase(DOUBLE(), DEFAULT_POSITIVE_DOUBLE, DEFAULT_POSITIVE_DOUBLE)
                         .fromCase(DOUBLE(), DEFAULT_NEGATIVE_DOUBLE, DEFAULT_NEGATIVE_DOUBLE)
                         .fromCase(DOUBLE(), 1239234567891.1234567891234d, 1.2392345678911235E12d),
+                CastTestSpecBuilder.testCastTo(DATE())
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("123"),
+                                DateTimeUtils.localDateToUnixDate(LocalDate.of(123, 1, 1)))
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("2021-09-27"),
+                                DateTimeUtils.localDateToUnixDate(LocalDate.of(2021, 9, 27)))
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("2021-09-27 12:34:56.123456789"),
+                                DateTimeUtils.localDateToUnixDate(LocalDate.of(2021, 9, 27)))
+                        .fail(STRING(), StringData.fromString("2021/09/27"), TableException.class),
+                CastTestSpecBuilder.testCastTo(TIME())
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("23"),
+                                DateTimeUtils.localTimeToUnixDate(LocalTime.of(23, 0, 0)))
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("23:45"),
+                                DateTimeUtils.localTimeToUnixDate(LocalTime.of(23, 45, 0)))
+                        .fail(STRING(), StringData.fromString("2021-09-27"), TableException.class)
+                        .fail(
+                                STRING(),
+                                StringData.fromString("2021-09-27 12:34:56"),
+                                TableException.class)
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("12:34:56.123456789"),
+                                DateTimeUtils.localTimeToUnixDate(
+                                        LocalTime.of(12, 34, 56, 123_000_000)))
+                        .fail(
+                                STRING(),
+                                StringData.fromString("2021-09-27 12:34:56.123456789"),
+                                TableException.class),
+                CastTestSpecBuilder.testCastTo(TIMESTAMP(9))
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fail(STRING(), StringData.fromString("123"), TableException.class)
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("2021-09-27"),
+                                TimestampData.fromLocalDateTime(
+                                        LocalDateTime.of(2021, 9, 27, 0, 0, 0, 0)))
+                        .fail(STRING(), StringData.fromString("2021/09/27"), TableException.class)
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("2021-09-27 12:34:56.123456789"),
+                                TimestampData.fromLocalDateTime(
+                                        LocalDateTime.of(2021, 9, 27, 12, 34, 56, 123456789))),
+                CastTestSpecBuilder.testCastTo(TIMESTAMP_LTZ(9))
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fail(STRING(), StringData.fromString("123"), TableException.class)
+                        .fromCase(
+                                STRING(),
+                                CET_CONTEXT,
+                                StringData.fromString("2021-09-27"),
+                                TimestampData.fromInstant(
+                                        LocalDateTime.of(2021, 9, 27, 0, 0, 0, 0)
+                                                .atZone(CET)
+                                                .toInstant()))
+                        .fromCase(
+                                STRING(),
+                                CET_CONTEXT,
+                                StringData.fromString("2021-09-27 12:34:56.123"),
+                                TimestampData.fromInstant(
+                                        LocalDateTime.of(2021, 9, 27, 12, 34, 56, 123000000)
+                                                .atZone(CET)
+                                                .toInstant()))
+                        // https://issues.apache.org/jira/browse/FLINK-24446 Fractional seconds are
+                        // lost
+                        .fromCase(
+                                STRING(),
+                                CET_CONTEXT,
+                                StringData.fromString("2021-09-27 12:34:56.123456789"),
+                                TimestampData.fromInstant(
+                                        LocalDateTime.of(2021, 9, 27, 12, 34, 56, 0)
+                                                .atZone(CET)
+                                                .toInstant())),
                 CastTestSpecBuilder.testCastTo(STRING())
                         .fromCase(STRING(), null, null)
                         .fromCase(
@@ -478,7 +611,60 @@ class CastRulesTest {
                                 RawValueData.fromObject(
                                         LocalDateTime.parse("2020-11-11T18:08:01.123")),
                                 StringData.fromString("2020-11-11T18:08:01.123")),
+                CastTestSpecBuilder.testCastTo(BOOLEAN())
+                        .fromCase(BOOLEAN(), null, null)
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fromCase(CHAR(4), StringData.fromString("true"), true)
+                        .fromCase(VARCHAR(5), StringData.fromString("FalsE"), false)
+                        .fail(STRING(), StringData.fromString("Apache Flink"), TableException.class)
+                        .fromCase(STRING(), StringData.fromString("TRUE"), true)
+                        .fail(STRING(), StringData.fromString(""), TableException.class),
+                CastTestSpecBuilder.testCastTo(BINARY(2))
+                        .fromCase(CHAR(3), StringData.fromString("foo"), new byte[] {102, 111, 111})
+                        .fromCase(
+                                VARCHAR(5),
+                                StringData.fromString("Flink"),
+                                new byte[] {70, 108, 105, 110, 107})
+                        // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+                        // bytes
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("Apache"),
+                                new byte[] {65, 112, 97, 99, 104, 101}),
+                CastTestSpecBuilder.testCastTo(VARBINARY(4))
+                        .fromCase(CHAR(3), StringData.fromString("foo"), new byte[] {102, 111, 111})
+                        .fromCase(
+                                VARCHAR(5),
+                                StringData.fromString("Flink"),
+                                new byte[] {70, 108, 105, 110, 107})
+                        // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+                        // bytes
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("Apache"),
+                                new byte[] {65, 112, 97, 99, 104, 101}),
+                CastTestSpecBuilder.testCastTo(BYTES())
+                        .fromCase(CHAR(3), StringData.fromString("foo"), new byte[] {102, 111, 111})
+                        .fromCase(
+                                VARCHAR(5),
+                                StringData.fromString("Flink"),
+                                new byte[] {70, 108, 105, 110, 107})
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("Apache"),
+                                new byte[] {65, 112, 97, 99, 104, 101}),
                 CastTestSpecBuilder.testCastTo(DECIMAL(5, 3))
+                        .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+                        .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+                        .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("1.234"),
+                                DecimalData.fromBigDecimal(new BigDecimal("1.234"), 5, 3))
+                        .fromCase(
+                                STRING(),
+                                StringData.fromString("1.2"),
+                                DecimalData.fromBigDecimal(new BigDecimal("1.200"), 5, 3))
                         .fromCase(
                                 DECIMAL(4, 3),
                                 DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -641,12 +827,21 @@ class CastRulesTest {
             this.inputTypes.add(dataType);
             this.assertionExecutors.add(
                     executor -> {
-                        assertEquals(target, executor.cast(src));
-                        // Run twice to make sure rules are reusable without causing issues
-                        assertEquals(
-                                target,
-                                executor.cast(src),
-                                "Error when reusing the rule. Perhaps there is some state that needs to be reset");
+                        if (target instanceof byte[]) {
+                            assertArrayEquals((byte[]) target, (byte[]) executor.cast(src));
+                            // Run twice to make sure rules are reusable without causing issues
+                            assertArrayEquals(
+                                    (byte[]) target,
+                                    (byte[]) executor.cast(src),
+                                    "Error when reusing the rule. Perhaps there is some state that needs to be reset");
+                        } else {
+                            assertEquals(target, executor.cast(src));
+                            // Run twice to make sure rules are reusable without causing issues
+                            assertEquals(
+                                    target,
+                                    executor.cast(src),
+                                    "Error when reusing the rule. Perhaps there is some state that needs to be reset");
+                        }
                     });
             this.descriptions.add("{" + src + " => " + target + "}");
             this.castContexts.add(castContext);
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java
index 2aebca5..e534337 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java
@@ -18,18 +18,24 @@
 package org.apache.flink.table.data.binary;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.runtime.util.SegmentsUtil;
 import org.apache.flink.table.runtime.util.StringUtf8Utils;
+import org.apache.flink.table.utils.DateTimeUtils;
 import org.apache.flink.table.utils.EncodingUtils;
 
 import java.math.BigDecimal;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
+import java.time.DateTimeException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.TimeZone;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -144,12 +150,16 @@ public class BinaryStringDataUtil {
         return substrings.toArray(new BinaryStringData[0]);
     }
 
-    /** Decide boolean representation of a string. */
-    public static Boolean toBooleanSQL(BinaryStringData str) {
+    /** Parse a {@link StringData} to boolean. */
+    public static boolean toBoolean(BinaryStringData str) throws TableException {
         BinaryStringData lowerCase = str.toLowerCase();
-        return TRUE_STRINGS.contains(lowerCase)
-                ? Boolean.TRUE
-                : (FALSE_STRINGS.contains(lowerCase) ? Boolean.FALSE : null);
+        if (TRUE_STRINGS.contains(lowerCase)) {
+            return true;
+        }
+        if (FALSE_STRINGS.contains(lowerCase)) {
+            return false;
+        }
+        throw new TableException("Cannot parse '" + str + "' as BOOLEAN.");
     }
 
     /** Calculate the hash value of the given bytes use {@link MessageDigest}. */
@@ -168,21 +178,30 @@ public class BinaryStringDataUtil {
     }
 
     /**
-     * Parses this BinaryStringData to DecimalData.
+     * Parses a {@link BinaryStringData} to {@link DecimalData}.
      *
-     * @return DecimalData value if the parsing was successful, or null if overflow
-     * @throws NumberFormatException if the parsing failed.
+     * @return DecimalData value if the parsing was successful.
      */
-    public static DecimalData toDecimal(BinaryStringData str, int precision, int scale) {
+    public static DecimalData toDecimal(BinaryStringData str, int precision, int scale)
+            throws NumberFormatException {
         str.ensureMaterialized();
 
+        DecimalData data;
+
         if (DecimalDataUtils.isByteArrayDecimal(precision)
                 || DecimalDataUtils.isByteArrayDecimal(str.getSizeInBytes())) {
-            return toBigPrecisionDecimal(str, precision, scale);
+            data = toBigPrecisionDecimal(str, precision, scale);
+        } else {
+            int sizeInBytes = str.getSizeInBytes();
+            data =
+                    toDecimalFromBytes(
+                            precision, scale, getTmpBytes(str, sizeInBytes), 0, sizeInBytes);
         }
 
-        int sizeInBytes = str.getSizeInBytes();
-        return toDecimalFromBytes(precision, scale, getTmpBytes(str, sizeInBytes), 0, sizeInBytes);
+        if (data == null) {
+            throw numberFormatExceptionFor(str, "Overflow.");
+        }
+        return data;
     }
 
     private static DecimalData toDecimalFromBytes(
@@ -353,12 +372,9 @@ public class BinaryStringDataUtil {
                     break;
                 }
             }
-            try {
-                BigDecimal bd = new BigDecimal(chars, start, end - start);
-                return DecimalData.fromBigDecimal(bd, precision, scale);
-            } catch (NumberFormatException nfe) {
-                return null;
-            }
+
+            BigDecimal bd = new BigDecimal(chars, start, end - start);
+            return DecimalData.fromBigDecimal(bd, precision, scale);
         }
     }
 
@@ -371,14 +387,12 @@ public class BinaryStringDataUtil {
      * Long.MIN_VALUE is '-9223372036854775808'.
      *
      * <p>This code is mostly copied from LazyLong.parseLong in Hive.
-     *
-     * @return Long value if the parsing was successful else null.
      */
-    public static Long toLong(BinaryStringData str) {
+    public static long toLong(BinaryStringData str) throws NumberFormatException {
         int sizeInBytes = str.getSizeInBytes();
         byte[] tmpBytes = getTmpBytes(str, sizeInBytes);
         if (sizeInBytes == 0) {
-            return null;
+            throw numberFormatExceptionFor(str, "Input is empty.");
         }
         int i = 0;
 
@@ -387,7 +401,7 @@ public class BinaryStringDataUtil {
         if (negative || b == '+') {
             i++;
             if (sizeInBytes == 1) {
-                return null;
+                throw numberFormatExceptionFor(str, "Input has only positive or negative symbol.");
             }
         }
 
@@ -409,7 +423,7 @@ public class BinaryStringDataUtil {
             if (b >= '0' && b <= '9') {
                 digit = b - '0';
             } else {
-                return null;
+                throw numberFormatExceptionFor(str, "Invalid character found.");
             }
 
             // We are going to process the new digit and accumulate the result. However, before
@@ -417,7 +431,7 @@ public class BinaryStringDataUtil {
             // stopValue(Long.MIN_VALUE / radix), then result * 10 will definitely be smaller
             // than minValue, and we can stop.
             if (result < stopValue) {
-                return null;
+                throw numberFormatExceptionFor(str, "Overflow.");
             }
 
             result = result * radix - digit;
@@ -425,7 +439,7 @@ public class BinaryStringDataUtil {
             // stopValue(Long.MIN_VALUE / radix), we can just use `result > 0` to check overflow.
             // If result overflows, we should stop.
             if (result > 0) {
-                return null;
+                throw numberFormatExceptionFor(str, "Overflow.");
             }
         }
 
@@ -435,7 +449,7 @@ public class BinaryStringDataUtil {
         while (i < sizeInBytes) {
             byte currentByte = tmpBytes[i];
             if (currentByte < '0' || currentByte > '9') {
-                return null;
+                throw numberFormatExceptionFor(str, "Invalid character found.");
             }
             i++;
         }
@@ -443,7 +457,7 @@ public class BinaryStringDataUtil {
         if (!negative) {
             result = -result;
             if (result < 0) {
-                return null;
+                throw numberFormatExceptionFor(str, "Overflow.");
             }
         }
         return result;
@@ -461,14 +475,12 @@ public class BinaryStringDataUtil {
      *
      * <p>Note that, this method is almost same as `toLong`, but we leave it duplicated for
      * performance reasons, like Hive does.
-     *
-     * @return Integer value if the parsing was successful else null.
      */
-    public static Integer toInt(BinaryStringData str) {
+    public static int toInt(BinaryStringData str) throws NumberFormatException {
         int sizeInBytes = str.getSizeInBytes();
         byte[] tmpBytes = getTmpBytes(str, sizeInBytes);
         if (sizeInBytes == 0) {
-            return null;
+            throw numberFormatExceptionFor(str, "Input is empty.");
         }
         int i = 0;
 
@@ -477,7 +489,7 @@ public class BinaryStringDataUtil {
         if (negative || b == '+') {
             i++;
             if (sizeInBytes == 1) {
-                return null;
+                throw numberFormatExceptionFor(str, "Input has only positive or negative symbol.");
             }
         }
 
@@ -499,7 +511,7 @@ public class BinaryStringDataUtil {
             if (b >= '0' && b <= '9') {
                 digit = b - '0';
             } else {
-                return null;
+                throw numberFormatExceptionFor(str, "Invalid character found.");
             }
 
             // We are going to process the new digit and accumulate the result. However, before
@@ -507,7 +519,7 @@ public class BinaryStringDataUtil {
             // stopValue(Long.MIN_VALUE / radix), then result * 10 will definitely be smaller
             // than minValue, and we can stop.
             if (result < stopValue) {
-                return null;
+                throw numberFormatExceptionFor(str, "Overflow.");
             }
 
             result = result * radix - digit;
@@ -515,7 +527,7 @@ public class BinaryStringDataUtil {
             // stopValue(Long.MIN_VALUE / radix), we can just use `result > 0` to check overflow.
             // If result overflows, we should stop.
             if (result > 0) {
-                return null;
+                throw numberFormatExceptionFor(str, "Overflow.");
             }
         }
 
@@ -525,7 +537,7 @@ public class BinaryStringDataUtil {
         while (i < sizeInBytes) {
             byte currentByte = tmpBytes[i];
             if (currentByte < '0' || currentByte > '9') {
-                return null;
+                throw numberFormatExceptionFor(str, "Invalid character found.");
             }
             i++;
         }
@@ -533,48 +545,77 @@ public class BinaryStringDataUtil {
         if (!negative) {
             result = -result;
             if (result < 0) {
-                return null;
+                throw numberFormatExceptionFor(str, "Overflow.");
             }
         }
         return result;
     }
 
-    public static Short toShort(BinaryStringData str) {
-        Integer intValue = toInt(str);
-        if (intValue != null) {
-            short result = intValue.shortValue();
-            if (result == intValue) {
-                return result;
-            }
+    public static short toShort(BinaryStringData str) throws NumberFormatException {
+        int intValue = toInt(str);
+        short result = (short) intValue;
+        if (result == intValue) {
+            return result;
         }
-        return null;
+        throw numberFormatExceptionFor(str, "Overflow.");
     }
 
-    public static Byte toByte(BinaryStringData str) {
-        Integer intValue = toInt(str);
-        if (intValue != null) {
-            byte result = intValue.byteValue();
-            if (result == intValue) {
-                return result;
-            }
+    public static byte toByte(BinaryStringData str) throws NumberFormatException {
+        int intValue = toInt(str);
+        byte result = (byte) intValue;
+        if (result == intValue) {
+            return result;
         }
-        return null;
+        throw numberFormatExceptionFor(str, "Overflow.");
     }
 
-    public static Double toDouble(BinaryStringData str) {
-        try {
-            return Double.valueOf(str.toString());
-        } catch (NumberFormatException e) {
-            return null;
+    public static double toDouble(BinaryStringData str) throws NumberFormatException {
+        return Double.parseDouble(str.toString());
+    }
+
+    public static float toFloat(BinaryStringData str) throws NumberFormatException {
+        return Float.parseFloat(str.toString());
+    }
+
+    private static NumberFormatException numberFormatExceptionFor(StringData input, String reason) {
+        return new NumberFormatException("For input string: '" + input + "'. " + reason);
+    }
+
+    public static int toDate(BinaryStringData input) throws DateTimeException {
+        Integer date = DateTimeUtils.dateStringToUnixDate(input.toString());
+        if (date == null) {
+            throw new DateTimeException("For input string: '" + input + "'.");
         }
+
+        return date;
     }
 
-    public static Float toFloat(BinaryStringData str) {
-        try {
-            return Float.valueOf(str.toString());
-        } catch (NumberFormatException e) {
-            return null;
+    public static int toTime(BinaryStringData input) throws DateTimeException {
+        Integer date = DateTimeUtils.timeStringToUnixDate(input.toString());
+        if (date == null) {
+            throw new DateTimeException("For input string: '" + input + "'.");
+        }
+
+        return date;
+    }
+
+    public static TimestampData toTimestamp(BinaryStringData input) throws DateTimeException {
+        TimestampData timestamp = DateTimeUtils.toTimestampData(input.toString());
+        if (timestamp == null) {
+            throw new DateTimeException("For input string: '" + input + "'.");
         }
+
+        return timestamp;
+    }
+
+    public static TimestampData toTimestamp(BinaryStringData input, TimeZone timeZone)
+            throws DateTimeException {
+        Long timestamp = DateTimeUtils.toTimestamp(input.toString(), timeZone);
+        if (timestamp == null) {
+            throw new DateTimeException("For input string: '" + input + "'.");
+        }
+
+        return TimestampData.fromEpochMillis(timestamp);
     }
 
     /**
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java
index 4a8741f..bfb979d 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java
@@ -60,6 +60,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -420,29 +421,29 @@ public class BinaryStringDataTest {
     @Test
     public void testToNumeric() {
         // Test to integer.
-        assertEquals(Byte.valueOf("123"), toByte(fromString("123")));
-        assertEquals(Byte.valueOf("123"), toByte(fromString("+123")));
-        assertEquals(Byte.valueOf("-123"), toByte(fromString("-123")));
+        assertEquals(Byte.parseByte("123"), toByte(fromString("123")));
+        assertEquals(Byte.parseByte("123"), toByte(fromString("+123")));
+        assertEquals(Byte.parseByte("-123"), toByte(fromString("-123")));
 
-        assertEquals(Short.valueOf("123"), toShort(fromString("123")));
-        assertEquals(Short.valueOf("123"), toShort(fromString("+123")));
-        assertEquals(Short.valueOf("-123"), toShort(fromString("-123")));
+        assertEquals(Short.parseShort("123"), toShort(fromString("123")));
+        assertEquals(Short.parseShort("123"), toShort(fromString("+123")));
+        assertEquals(Short.parseShort("-123"), toShort(fromString("-123")));
 
-        assertEquals(Integer.valueOf("123"), toInt(fromString("123")));
-        assertEquals(Integer.valueOf("123"), toInt(fromString("+123")));
-        assertEquals(Integer.valueOf("-123"), toInt(fromString("-123")));
+        assertEquals(Integer.parseInt("123"), toInt(fromString("123")));
+        assertEquals(Integer.parseInt("123"), toInt(fromString("+123")));
+        assertEquals(Integer.parseInt("-123"), toInt(fromString("-123")));
 
-        assertEquals(Long.valueOf("1234567890"), toLong(fromString("1234567890")));
-        assertEquals(Long.valueOf("+1234567890"), toLong(fromString("+1234567890")));
-        assertEquals(Long.valueOf("-1234567890"), toLong(fromString("-1234567890")));
+        assertEquals(Long.parseLong("1234567890"), toLong(fromString("1234567890")));
+        assertEquals(Long.parseLong("+1234567890"), toLong(fromString("+1234567890")));
+        assertEquals(Long.parseLong("-1234567890"), toLong(fromString("-1234567890")));
 
         // Test decimal string to integer.
-        assertEquals(Integer.valueOf("123"), toInt(fromString("123.456789")));
-        assertEquals(Long.valueOf("123"), toLong(fromString("123.456789")));
+        assertEquals(Integer.parseInt("123"), toInt(fromString("123.456789")));
+        assertEquals(Long.parseLong("123"), toLong(fromString("123.456789")));
 
         // Test negative cases.
-        assertNull(toInt(fromString("1a3.456789")));
-        assertNull(toInt(fromString("123.a56789")));
+        assertThrows(NumberFormatException.class, () -> toInt(fromString("1a3.456789")));
+        assertThrows(NumberFormatException.class, () -> toInt(fromString("123.a56789")));
 
         // Test composite in BinaryRowData.
         BinaryRowData row = new BinaryRowData(20);
@@ -453,10 +454,10 @@ public class BinaryStringDataTest {
         writer.writeString(3, BinaryStringData.fromString("123456789"));
         writer.complete();
 
-        assertEquals(Byte.valueOf("1"), toByte(((BinaryStringData) row.getString(0))));
-        assertEquals(Short.valueOf("123"), toShort(((BinaryStringData) row.getString(1))));
-        assertEquals(Integer.valueOf("12345"), toInt(((BinaryStringData) row.getString(2))));
-        assertEquals(Long.valueOf("123456789"), toLong(((BinaryStringData) row.getString(3))));
+        assertEquals(Byte.parseByte("1"), toByte(((BinaryStringData) row.getString(0))));
+        assertEquals(Short.parseShort("123"), toShort(((BinaryStringData) row.getString(1))));
+        assertEquals(Integer.parseInt("12345"), toInt(((BinaryStringData) row.getString(2))));
+        assertEquals(Long.parseLong("123456789"), toLong(((BinaryStringData) row.getString(3))));
     }
 
     @Test