You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/11/12 14:43:34 UTC

[flink] branch master updated (f28bb89 -> da24fbf)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from f28bb89  [FLINK-24773][kafka] Fail job if unhandled exception occurs during committing
     new 5c914e1  [hotfix][table-planner] Add ExpressionCodeGeneratorCastRule extracting generateExpression from AbstractExpressionCodeGeneratorCastRule
     new b51abc7c [hotfix][table-planner] Fix primitives/boxed primitives behaviour of ExpressionEvaluator
     new 4891ee9  [FLINK-24779][table-planner] Add numeric casting rules
     new da24fbf  [hotfix][table-planner] Move all casting rules to casting package and make them package-private

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../casting/{rules => }/AbstractCastRule.java      |   9 +-
 .../AbstractCharacterFamilyTargetRule.java         |  10 +-
 .../{rules => }/AbstractCodeGeneratorCastRule.java |  10 +-
 .../AbstractExpressionCodeGeneratorCastRule.java   |  43 ++--
 .../AbstractNullAwareCodeGeneratorCastRule.java    |  23 +-
 .../casting/{rules => }/ArrayToArrayCastRule.java  |  19 +-
 .../casting/{rules => }/ArrayToStringCastRule.java |  21 +-
 .../{rules => }/BinaryToStringCastRule.java        |  14 +-
 .../{rules => }/BooleanToStringCastRule.java       |  14 +-
 .../functions/casting/CastRuleProvider.java        |  19 +-
 .../casting/{rules => }/CastRuleUtils.java         |  70 +++++-
 .../CodeGeneratedExpressionCastExecutor.java       |   2 +-
 .../casting/{rules => }/DateToStringCastRule.java  |  12 +-
 ...CastRule.java => DecimalToDecimalCastRule.java} |  33 +--
 ...java => DecimalToNumericPrimitiveCastRule.java} |  44 ++--
 ...e.java => ExpressionCodeGeneratorCastRule.java} |  40 ++-
 .../casting/{rules => }/IdentityCastRule.java      |  39 ++-
 .../{rules => }/IntervalToStringCastRule.java      |  12 +-
 .../casting/{rules => }/MapToStringCastRule.java   |  20 +-
 .../casting/NumericPrimitiveCastRule.java          |  74 ++++++
 .../casting/NumericPrimitiveToDecimalCastRule.java |  71 ++++++
 .../{rules => }/NumericToStringCastRule.java       |  14 +-
 .../casting/{rules => }/RawToStringCastRule.java   |  14 +-
 .../casting/{rules => }/RowToStringCastRule.java   |  20 +-
 .../casting/{rules => }/TimeToStringCastRule.java  |  10 +-
 .../{rules => }/TimestampToStringCastRule.java     |  14 +-
 .../flink/table/planner/codegen/CodeGenUtils.scala |  79 ++----
 .../table/planner/codegen/ExprCodeGenerator.scala  |   4 +-
 .../planner/codegen/calls/BuiltInMethods.scala     |  27 +-
 .../table/planner/codegen/calls/IfCallGen.scala    |  32 ++-
 .../planner/codegen/calls/ScalarOperatorGens.scala | 124 ++++------
 .../planner/functions/casting/CastRulesTest.java   | 272 ++++++++++++++++++++-
 .../apache/flink/table/data/DecimalDataUtils.java  |  24 --
 33 files changed, 794 insertions(+), 439 deletions(-)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/AbstractCastRule.java (76%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/AbstractCharacterFamilyTargetRule.java (84%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/AbstractCodeGeneratorCastRule.java (94%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/AbstractExpressionCodeGeneratorCastRule.java (74%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/AbstractNullAwareCodeGeneratorCastRule.java (81%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/ArrayToArrayCastRule.java (86%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/ArrayToStringCastRule.java (89%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/BinaryToStringCastRule.java (72%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/BooleanToStringCastRule.java (71%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/CastRuleUtils.java (72%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/CodeGeneratedExpressionCastExecutor.java (96%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/DateToStringCastRule.java (76%)
 copy flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules/UpcastToBigIntCastRule.java => DecimalToDecimalCastRule.java} (54%)
 copy flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules/IntervalToStringCastRule.java => DecimalToNumericPrimitiveCastRule.java} (52%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules/UpcastToBigIntCastRule.java => ExpressionCodeGeneratorCastRule.java} (50%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/IdentityCastRule.java (60%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/IntervalToStringCastRule.java (79%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/MapToStringCastRule.java (92%)
 create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/NumericPrimitiveCastRule.java
 create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/NumericPrimitiveToDecimalCastRule.java
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/NumericToStringCastRule.java (70%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/RawToStringCastRule.java (82%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/RowToStringCastRule.java (87%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/TimeToStringCastRule.java (81%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/{rules => }/TimestampToStringCastRule.java (77%)

[flink] 04/04: [hotfix][table-planner] Move all casting rules to casting package and make them package-private

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit da24fbf23c244771084bf0a31cdbb9749d72e80a
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Fri Nov 12 10:38:32 2021 +0100

    [hotfix][table-planner] Move all casting rules to casting package and make them package-private
    
    Signed-off-by: slinkydeveloper <fr...@gmail.com>
---
 .../casting/{rules => }/AbstractCastRule.java       |  9 ++-------
 .../AbstractCharacterFamilyTargetRule.java          |  8 ++------
 .../{rules => }/AbstractCodeGeneratorCastRule.java  | 10 ++--------
 .../AbstractExpressionCodeGeneratorCastRule.java    | 14 ++++----------
 .../AbstractNullAwareCodeGeneratorCastRule.java     |  9 ++-------
 .../casting/{rules => }/ArrayToArrayCastRule.java   | 19 ++++++-------------
 .../casting/{rules => }/ArrayToStringCastRule.java  | 21 +++++++--------------
 .../casting/{rules => }/BinaryToStringCastRule.java | 14 +++++---------
 .../{rules => }/BooleanToStringCastRule.java        | 14 +++++---------
 .../planner/functions/casting/CastRuleProvider.java | 17 -----------------
 .../casting/{rules => }/CastRuleUtils.java          |  4 +---
 .../CodeGeneratedExpressionCastExecutor.java        |  2 +-
 .../casting/{rules => }/DateToStringCastRule.java   | 12 ++++--------
 .../{rules => }/DecimalToDecimalCastRule.java       | 12 ++++--------
 .../DecimalToNumericPrimitiveCastRule.java          | 14 +++++---------
 .../casting/{rules => }/IdentityCastRule.java       | 12 +++---------
 .../{rules => }/IntervalToStringCastRule.java       | 12 ++++--------
 .../casting/{rules => }/MapToStringCastRule.java    | 20 +++++++-------------
 .../{rules => }/NumericPrimitiveCastRule.java       | 11 +++--------
 .../NumericPrimitiveToDecimalCastRule.java          | 14 +++++---------
 .../{rules => }/NumericToStringCastRule.java        | 14 +++++---------
 .../casting/{rules => }/RawToStringCastRule.java    | 14 +++++---------
 .../casting/{rules => }/RowToStringCastRule.java    | 20 +++++++-------------
 .../casting/{rules => }/TimeToStringCastRule.java   | 10 +++-------
 .../{rules => }/TimestampToStringCastRule.java      | 14 +++++---------
 .../planner/codegen/calls/ScalarOperatorGens.scala  | 15 ++++-----------
 26 files changed, 101 insertions(+), 234 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java
similarity index 76%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java
index b4eeffc..c193139 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCastRule.java
@@ -16,15 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.planner.functions.casting.CastRule;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+package org.apache.flink.table.planner.functions.casting;
 
 /** Base class for all cast rules. */
-@Internal
-public abstract class AbstractCastRule<IN, OUT> implements CastRule<IN, OUT> {
+abstract class AbstractCastRule<IN, OUT> implements CastRule<IN, OUT> {
 
     private final CastRulePredicate predicate;
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCharacterFamilyTargetRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCharacterFamilyTargetRule.java
similarity index 86%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCharacterFamilyTargetRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCharacterFamilyTargetRule.java
index 0309566..8c6ac85 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCharacterFamilyTargetRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCharacterFamilyTargetRule.java
@@ -16,12 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 
@@ -31,8 +28,7 @@ import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY
  * Base class for cast rules converting to {@link LogicalTypeFamily#CHARACTER_STRING} with code
  * generation.
  */
-@Internal
-public abstract class AbstractCharacterFamilyTargetRule<IN>
+abstract class AbstractCharacterFamilyTargetRule<IN>
         extends AbstractExpressionCodeGeneratorCastRule<IN, StringData> {
 
     protected AbstractCharacterFamilyTargetRule(CastRulePredicate predicate) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java
similarity index 94%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCodeGeneratorCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java
index eb7dd9d..f9d300d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java
@@ -16,17 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.utils.CastExecutor;
 import org.apache.flink.table.planner.codegen.CodeGenUtils;
-import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
-import org.apache.flink.table.planner.functions.casting.CastRule;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.runtime.generated.CompileUtils;
 import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -51,8 +46,7 @@ import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
  * AbstractNullAwareCodeGeneratorCastRule}, which provides nullability checks, or from {@link
  * AbstractExpressionCodeGeneratorCastRule} to generate simple expression casts.
  */
-@Internal
-public abstract class AbstractCodeGeneratorCastRule<IN, OUT> extends AbstractCastRule<IN, OUT>
+abstract class AbstractCodeGeneratorCastRule<IN, OUT> extends AbstractCastRule<IN, OUT>
         implements CodeGeneratorCastRule<IN, OUT> {
 
     protected AbstractCodeGeneratorCastRule(CastRulePredicate predicate) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractExpressionCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
similarity index 88%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractExpressionCodeGeneratorCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
index eaf5fc4..0b14ddc 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractExpressionCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
@@ -16,22 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.utils.CastExecutor;
-import org.apache.flink.table.planner.functions.casting.CastRule;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
-import org.apache.flink.table.planner.functions.casting.ExpressionCodeGeneratorCastRule;
 import org.apache.flink.table.runtime.generated.CompileUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
 
 import java.util.Collections;
 
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.box;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.unbox;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.box;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.unbox;
 
 /**
  * Base class for cast rules that supports code generation, requiring only an expression to perform
@@ -40,8 +35,7 @@ import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUti
  *
  * <p>NOTE: the {@code inputTerm} is always either a primitive or a non-null object.
  */
-@Internal
-public abstract class AbstractExpressionCodeGeneratorCastRule<IN, OUT>
+abstract class AbstractExpressionCodeGeneratorCastRule<IN, OUT>
         extends AbstractNullAwareCodeGeneratorCastRule<IN, OUT>
         implements ExpressionCodeGeneratorCastRule<IN, OUT> {
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractNullAwareCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractNullAwareCodeGeneratorCastRule.java
similarity index 90%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractNullAwareCodeGeneratorCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractNullAwareCodeGeneratorCastRule.java
index 9826286..1d3c36e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractNullAwareCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractNullAwareCodeGeneratorCastRule.java
@@ -16,12 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import static org.apache.flink.table.planner.codegen.CodeGenUtils.isPrimitiveNullable;
@@ -32,8 +28,7 @@ import static org.apache.flink.table.planner.codegen.CodeGenUtils.primitiveTypeT
  * Base class for cast rules supporting code generation. This class inherits from {@link
  * AbstractCodeGeneratorCastRule} and takes care of nullability checks.
  */
-@Internal
-public abstract class AbstractNullAwareCodeGeneratorCastRule<IN, OUT>
+abstract class AbstractNullAwareCodeGeneratorCastRule<IN, OUT>
         extends AbstractCodeGeneratorCastRule<IN, OUT> {
 
     protected AbstractNullAwareCodeGeneratorCastRule(CastRulePredicate predicate) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToArrayCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ArrayToArrayCastRule.java
similarity index 86%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToArrayCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ArrayToArrayCastRule.java
index 66c14a6..1a9bf7e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToArrayCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ArrayToArrayCastRule.java
@@ -16,15 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.GenericArrayData;
-import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DistinctType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -32,16 +27,14 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
 import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
 import static org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.newArray;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.newArray;
 
 /** {@link LogicalTypeRoot#ARRAY} to {@link LogicalTypeRoot#ARRAY} cast rule. */
-@Internal
-public class ArrayToArrayCastRule
-        extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, ArrayData> {
+class ArrayToArrayCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, ArrayData> {
 
-    public static final ArrayToArrayCastRule INSTANCE = new ArrayToArrayCastRule();
+    static final ArrayToArrayCastRule INSTANCE = new ArrayToArrayCastRule();
 
     private ArrayToArrayCastRule() {
         super(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ArrayToStringCastRule.java
similarity index 89%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToStringCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ArrayToStringCastRule.java
index d655e92..e470739 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ArrayToStringCastRule.java
@@ -16,14 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
@@ -33,17 +28,15 @@ import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
 import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
 import static org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
 import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.NULL_STR_LITERAL;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.NULL_STR_LITERAL;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.strLiteral;
 
 /** {@link LogicalTypeRoot#ARRAY} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
-@Internal
-public class ArrayToStringCastRule
-        extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+class ArrayToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
 
-    public static final ArrayToStringCastRule INSTANCE = new ArrayToStringCastRule();
+    static final ArrayToStringCastRule INSTANCE = new ArrayToStringCastRule();
 
     private ArrayToStringCastRule() {
         super(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/BinaryToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java
similarity index 72%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/BinaryToStringCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java
index 4ed913c..fd95948 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/BinaryToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java
@@ -16,26 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 
 import java.nio.charset.StandardCharsets;
 
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.accessStaticField;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.accessStaticField;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
 
 /**
  * {@link LogicalTypeFamily#BINARY_STRING} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule.
  */
-@Internal
-public class BinaryToStringCastRule extends AbstractCharacterFamilyTargetRule<byte[]> {
+class BinaryToStringCastRule extends AbstractCharacterFamilyTargetRule<byte[]> {
 
-    public static final BinaryToStringCastRule INSTANCE = new BinaryToStringCastRule();
+    static final BinaryToStringCastRule INSTANCE = new BinaryToStringCastRule();
 
     private BinaryToStringCastRule() {
         super(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/BooleanToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BooleanToStringCastRule.java
similarity index 71%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/BooleanToStringCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BooleanToStringCastRule.java
index 30deb1a..ae95571 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/BooleanToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BooleanToStringCastRule.java
@@ -16,23 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.EMPTY_STR_LITERAL;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.stringConcat;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.EMPTY_STR_LITERAL;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.stringConcat;
 
 /** {@link LogicalTypeRoot#BOOLEAN} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
-@Internal
-public class BooleanToStringCastRule extends AbstractCharacterFamilyTargetRule<Object> {
+class BooleanToStringCastRule extends AbstractCharacterFamilyTargetRule<Object> {
 
-    public static final BooleanToStringCastRule INSTANCE = new BooleanToStringCastRule();
+    static final BooleanToStringCastRule INSTANCE = new BooleanToStringCastRule();
 
     private BooleanToStringCastRule() {
         super(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
index a926630..aae34c2 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
@@ -20,23 +20,6 @@ package org.apache.flink.table.planner.functions.casting;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.utils.CastExecutor;
-import org.apache.flink.table.planner.functions.casting.rules.ArrayToArrayCastRule;
-import org.apache.flink.table.planner.functions.casting.rules.ArrayToStringCastRule;
-import org.apache.flink.table.planner.functions.casting.rules.BinaryToStringCastRule;
-import org.apache.flink.table.planner.functions.casting.rules.BooleanToStringCastRule;
-import org.apache.flink.table.planner.functions.casting.rules.DateToStringCastRule;
-import org.apache.flink.table.planner.functions.casting.rules.DecimalToDecimalCastRule;
-import org.apache.flink.table.planner.functions.casting.rules.DecimalToNumericPrimitiveCastRule;
-import org.apache.flink.table.planner.functions.casting.rules.IdentityCastRule;
-import org.apache.flink.table.planner.functions.casting.rules.IntervalToStringCastRule;
-import org.apache.flink.table.planner.functions.casting.rules.MapToStringCastRule;
-import org.apache.flink.table.planner.functions.casting.rules.NumericPrimitiveCastRule;
-import org.apache.flink.table.planner.functions.casting.rules.NumericPrimitiveToDecimalCastRule;
-import org.apache.flink.table.planner.functions.casting.rules.NumericToStringCastRule;
-import org.apache.flink.table.planner.functions.casting.rules.RawToStringCastRule;
-import org.apache.flink.table.planner.functions.casting.rules.RowToStringCastRule;
-import org.apache.flink.table.planner.functions.casting.rules.TimeToStringCastRule;
-import org.apache.flink.table.planner.functions.casting.rules.TimestampToStringCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/CastRuleUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleUtils.java
similarity index 97%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/CastRuleUtils.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleUtils.java
index 437884f..5daa3d9 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/CastRuleUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleUtils.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
 import org.apache.flink.table.planner.codegen.CodeGenUtils;
-import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
-import org.apache.flink.table.planner.functions.casting.CastRule;
 import org.apache.flink.table.types.logical.DistinctType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.utils.EncodingUtils;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/CodeGeneratedExpressionCastExecutor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
similarity index 96%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/CodeGeneratedExpressionCastExecutor.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
index ad394e0..c94db8d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/CodeGeneratedExpressionCastExecutor.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/DateToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/DateToStringCastRule.java
similarity index 76%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/DateToStringCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/DateToStringCastRule.java
index 31c07c5..d4ab3b8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/DateToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/DateToStringCastRule.java
@@ -16,23 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
 import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.UNIX_DATE_TO_STRING;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.staticCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
 
 /** {@link LogicalTypeRoot#DATE} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
-@Internal
-public class DateToStringCastRule extends AbstractCharacterFamilyTargetRule<Long> {
+class DateToStringCastRule extends AbstractCharacterFamilyTargetRule<Long> {
 
-    public static final DateToStringCastRule INSTANCE = new DateToStringCastRule();
+    static final DateToStringCastRule INSTANCE = new DateToStringCastRule();
 
     private DateToStringCastRule() {
         super(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/DecimalToDecimalCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/DecimalToDecimalCastRule.java
similarity index 80%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/DecimalToDecimalCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/DecimalToDecimalCastRule.java
index 11700fd..23c4c42 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/DecimalToDecimalCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/DecimalToDecimalCastRule.java
@@ -16,25 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.planner.codegen.calls.BuiltInMethods;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.staticCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
 
 /** {@link LogicalTypeRoot#DECIMAL} to {@link LogicalTypeRoot#DECIMAL} cast rule. */
-@Internal
-public class DecimalToDecimalCastRule
+class DecimalToDecimalCastRule
         extends AbstractExpressionCodeGeneratorCastRule<DecimalData, DecimalData> {
 
-    public static final DecimalToDecimalCastRule INSTANCE = new DecimalToDecimalCastRule();
+    static final DecimalToDecimalCastRule INSTANCE = new DecimalToDecimalCastRule();
 
     private DecimalToDecimalCastRule() {
         super(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/DecimalToNumericPrimitiveCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/DecimalToNumericPrimitiveCastRule.java
similarity index 80%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/DecimalToNumericPrimitiveCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/DecimalToNumericPrimitiveCastRule.java
index c9f4927..e1d1344 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/DecimalToNumericPrimitiveCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/DecimalToNumericPrimitiveCastRule.java
@@ -16,12 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -30,18 +27,17 @@ import java.lang.reflect.Method;
 
 import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.DECIMAL_TO_DOUBLE;
 import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.DECIMAL_TO_INTEGRAL;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.castToPrimitive;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.staticCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.castToPrimitive;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
 
 /**
  * {@link LogicalTypeRoot#DECIMAL} to {@link LogicalTypeFamily#INTEGER_NUMERIC} and {@link
  * LogicalTypeFamily#APPROXIMATE_NUMERIC} cast rule.
  */
-@Internal
-public class DecimalToNumericPrimitiveCastRule
+class DecimalToNumericPrimitiveCastRule
         extends AbstractExpressionCodeGeneratorCastRule<DecimalData, Number> {
 
-    public static final DecimalToNumericPrimitiveCastRule INSTANCE =
+    static final DecimalToNumericPrimitiveCastRule INSTANCE =
             new DecimalToNumericPrimitiveCastRule();
 
     private DecimalToNumericPrimitiveCastRule() {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/IdentityCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/IdentityCastRule.java
similarity index 84%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/IdentityCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/IdentityCastRule.java
index f769a71..f2eb001 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/IdentityCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/IdentityCastRule.java
@@ -16,13 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
-import org.apache.flink.table.planner.functions.casting.ExpressionCodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -32,11 +27,10 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
  * Identity cast rule. For more details on when the rule is applied, check {@link
  * #isIdentityCast(LogicalType, LogicalType)}
  */
-@Internal
-public class IdentityCastRule extends AbstractCodeGeneratorCastRule<Object, Object>
+class IdentityCastRule extends AbstractCodeGeneratorCastRule<Object, Object>
         implements ExpressionCodeGeneratorCastRule<Object, Object> {
 
-    public static final IdentityCastRule INSTANCE = new IdentityCastRule();
+    static final IdentityCastRule INSTANCE = new IdentityCastRule();
 
     private IdentityCastRule() {
         super(CastRulePredicate.builder().predicate(IdentityCastRule::isIdentityCast).build());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/IntervalToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/IntervalToStringCastRule.java
similarity index 79%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/IntervalToStringCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/IntervalToStringCastRule.java
index 6e20732..8773a93 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/IntervalToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/IntervalToStringCastRule.java
@@ -16,11 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -29,13 +26,12 @@ import java.lang.reflect.Method;
 
 import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.INTERVAL_DAY_TIME_TO_STRING;
 import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.INTERVAL_YEAR_MONTH_TO_STRING;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.staticCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
 
 /** {@link LogicalTypeFamily#INTERVAL} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
-@Internal
-public class IntervalToStringCastRule extends AbstractCharacterFamilyTargetRule<Object> {
+class IntervalToStringCastRule extends AbstractCharacterFamilyTargetRule<Object> {
 
-    public static final IntervalToStringCastRule INSTANCE = new IntervalToStringCastRule();
+    static final IntervalToStringCastRule INSTANCE = new IntervalToStringCastRule();
 
     private IntervalToStringCastRule() {
         super(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/MapToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapToStringCastRule.java
similarity index 92%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/MapToStringCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapToStringCastRule.java
index 7578a40..90925dc 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/MapToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapToStringCastRule.java
@@ -16,14 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -33,16 +28,15 @@ import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
 import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
 import static org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
 import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.NULL_STR_LITERAL;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.NULL_STR_LITERAL;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.strLiteral;
 
 /** {@link LogicalTypeRoot#MAP} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
-@Internal
-public class MapToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+class MapToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
 
-    public static final MapToStringCastRule INSTANCE = new MapToStringCastRule();
+    static final MapToStringCastRule INSTANCE = new MapToStringCastRule();
 
     private MapToStringCastRule() {
         super(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/NumericPrimitiveCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/NumericPrimitiveCastRule.java
similarity index 86%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/NumericPrimitiveCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/NumericPrimitiveCastRule.java
index 6e20861..ed3fb34 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/NumericPrimitiveCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/NumericPrimitiveCastRule.java
@@ -16,11 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 
@@ -35,11 +32,9 @@ import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_YEAR
  * Cast rule for {@link LogicalTypeFamily#INTEGER_NUMERIC} and {@link
  * LogicalTypeFamily#APPROXIMATE_NUMERIC} and {@link LogicalTypeFamily#INTERVAL} conversions.
  */
-@Internal
-public class NumericPrimitiveCastRule
-        extends AbstractExpressionCodeGeneratorCastRule<Number, Number> {
+class NumericPrimitiveCastRule extends AbstractExpressionCodeGeneratorCastRule<Number, Number> {
 
-    public static final NumericPrimitiveCastRule INSTANCE = new NumericPrimitiveCastRule();
+    static final NumericPrimitiveCastRule INSTANCE = new NumericPrimitiveCastRule();
 
     private NumericPrimitiveCastRule() {
         super(CastRulePredicate.builder().predicate(NumericPrimitiveCastRule::matches).build());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/NumericPrimitiveToDecimalCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/NumericPrimitiveToDecimalCastRule.java
similarity index 82%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/NumericPrimitiveToDecimalCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/NumericPrimitiveToDecimalCastRule.java
index df62905..17ff3ee 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/NumericPrimitiveToDecimalCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/NumericPrimitiveToDecimalCastRule.java
@@ -16,12 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
@@ -29,18 +26,17 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
 import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.DOUBLE_TO_DECIMAL;
 import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.INTEGRAL_TO_DECIMAL;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.cast;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.staticCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.cast;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
 
 /**
  * {@link LogicalTypeFamily#INTEGER_NUMERIC} and {@link LogicalTypeFamily#APPROXIMATE_NUMERIC} to
  * {@link LogicalTypeRoot#DECIMAL} cast rule.
  */
-@Internal
-public class NumericPrimitiveToDecimalCastRule
+class NumericPrimitiveToDecimalCastRule
         extends AbstractExpressionCodeGeneratorCastRule<Number, DecimalData> {
 
-    public static final NumericPrimitiveToDecimalCastRule INSTANCE =
+    static final NumericPrimitiveToDecimalCastRule INSTANCE =
             new NumericPrimitiveToDecimalCastRule();
 
     private NumericPrimitiveToDecimalCastRule() {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/NumericToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/NumericToStringCastRule.java
similarity index 70%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/NumericToStringCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/NumericToStringCastRule.java
index 3220e0c..d83741e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/NumericToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/NumericToStringCastRule.java
@@ -16,22 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.EMPTY_STR_LITERAL;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.stringConcat;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.EMPTY_STR_LITERAL;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.stringConcat;
 
 /** {@link LogicalTypeFamily#NUMERIC} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
-@Internal
-public class NumericToStringCastRule extends AbstractCharacterFamilyTargetRule<Object> {
+class NumericToStringCastRule extends AbstractCharacterFamilyTargetRule<Object> {
 
-    public static final NumericToStringCastRule INSTANCE = new NumericToStringCastRule();
+    static final NumericToStringCastRule INSTANCE = new NumericToStringCastRule();
 
     private NumericToStringCastRule() {
         super(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RawToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RawToStringCastRule.java
similarity index 82%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RawToStringCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RawToStringCastRule.java
index 51fdde3..3301f5f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RawToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RawToStringCastRule.java
@@ -16,26 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
 import static org.apache.flink.table.codesplit.CodeSplitUtil.newName;
 import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
 
 /** {@link LogicalTypeRoot#RAW} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
-@Internal
-public class RawToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<Object, String> {
+class RawToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<Object, String> {
 
-    public static final RawToStringCastRule INSTANCE = new RawToStringCastRule();
+    static final RawToStringCastRule INSTANCE = new RawToStringCastRule();
 
-    protected RawToStringCastRule() {
+    private RawToStringCastRule() {
         super(
                 CastRulePredicate.builder()
                         .input(LogicalTypeRoot.RAW)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RowToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java
similarity index 87%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RowToStringCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java
index 9e3f5b0..1264e9d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RowToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java
@@ -16,14 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -36,16 +31,15 @@ import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
 import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
 import static org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
 import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.NULL_STR_LITERAL;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.NULL_STR_LITERAL;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.strLiteral;
 
 /** {@link LogicalTypeRoot#ROW} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
-@Internal
-public class RowToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+class RowToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
 
-    public static final RowToStringCastRule INSTANCE = new RowToStringCastRule();
+    static final RowToStringCastRule INSTANCE = new RowToStringCastRule();
 
     private RowToStringCastRule() {
         super(CastRulePredicate.builder().predicate(RowToStringCastRule::matches).build());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/TimeToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimeToStringCastRule.java
similarity index 81%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/TimeToStringCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimeToStringCastRule.java
index 81008de..cc07a79 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/TimeToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimeToStringCastRule.java
@@ -16,11 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -32,10 +29,9 @@ import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.UNIX_T
  * {@link LogicalTypeRoot#TIME_WITHOUT_TIME_ZONE} to {@link LogicalTypeFamily#CHARACTER_STRING} cast
  * rule.
  */
-@Internal
-public class TimeToStringCastRule extends AbstractCharacterFamilyTargetRule<Long> {
+class TimeToStringCastRule extends AbstractCharacterFamilyTargetRule<Long> {
 
-    public static final TimeToStringCastRule INSTANCE = new TimeToStringCastRule();
+    static final TimeToStringCastRule INSTANCE = new TimeToStringCastRule();
 
     private TimeToStringCastRule() {
         super(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/TimestampToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimestampToStringCastRule.java
similarity index 77%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/TimestampToStringCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimestampToStringCastRule.java
index d2aef08..965b378 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/TimestampToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimestampToStringCastRule.java
@@ -16,12 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.functions.casting.rules;
+package org.apache.flink.table.planner.functions.casting;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
-import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -30,14 +27,13 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.calcite.avatica.util.DateTimeUtils;
 
 import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.TIMESTAMP_TO_STRING_TIME_ZONE;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.accessStaticField;
-import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.staticCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.accessStaticField;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
 
 /** {@link LogicalTypeFamily#TIMESTAMP} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
-@Internal
-public class TimestampToStringCastRule extends AbstractCharacterFamilyTargetRule<TimestampData> {
+class TimestampToStringCastRule extends AbstractCharacterFamilyTargetRule<TimestampData> {
 
-    public static final TimestampToStringCastRule INSTANCE = new TimestampToStringCastRule();
+    static final TimestampToStringCastRule INSTANCE = new TimestampToStringCastRule();
 
     private TimestampToStringCastRule() {
         super(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 3021776..6a7fbb2 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -18,25 +18,20 @@
 
 package org.apache.flink.table.planner.codegen.calls
 
-import org.apache.flink.table.api.{DataTypes, ValidationException}
-import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter
+import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.data.binary.BinaryArrayData
-import org.apache.flink.table.data.conversion.DataStructureConverters
-import org.apache.flink.table.planner.functions.casting.{CastRule, CastRuleProvider, CodeGeneratorCastRule, ExpressionCodeGeneratorCastRule}
-import org.apache.flink.table.data.util.{DataFormatConverters, MapDataUtil}
+import org.apache.flink.table.planner.functions.casting.{CastRuleProvider, CodeGeneratorCastRule, ExpressionCodeGeneratorCastRule}
+import org.apache.flink.table.data.util.MapDataUtil
 import org.apache.flink.table.data.writer.{BinaryArrayWriter, BinaryRowWriter}
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{binaryRowFieldSetAccess, binaryRowSetNull, binaryWriterWriteField, binaryWriterWriteNull, _}
 import org.apache.flink.table.planner.codegen.GenerateUtils._
 import org.apache.flink.table.planner.codegen.GeneratedExpression.{ALWAYS_NULL, NEVER_NULL, NO_CODE}
-import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGenUtils, CodeGeneratorContext, GenerateUtils, GeneratedExpression}
-import org.apache.flink.table.planner.functions.casting.rules.{AbstractExpressionCodeGeneratorCastRule, IdentityCastRule}
+import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGenUtils, CodeGeneratorContext, GeneratedExpression}
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
 import org.apache.flink.table.runtime.functions.SqlFunctionUtils
-import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
 import org.apache.flink.table.runtime.types.PlannerTypeUtils
 import org.apache.flink.table.runtime.types.PlannerTypeUtils.{isInteroperable, isPrimitive}
 import org.apache.flink.table.runtime.typeutils.TypeCheckUtils._
-import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical.LogicalTypeFamily.DATETIME
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical._
@@ -47,8 +42,6 @@ import org.apache.flink.table.utils.DateTimeUtils
 import org.apache.flink.util.Preconditions.checkArgument
 import org.apache.flink.table.utils.DateTimeUtils.MILLIS_PER_DAY
 
-import java.lang.{StringBuilder => JStringBuilder}
-import java.nio.charset.StandardCharsets
 import java.util.Arrays.asList
 import scala.collection.JavaConversions._
 

[flink] 02/04: [hotfix][table-planner] Fix primitives/boxed primitives behaviour of ExpressionEvaluator

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b51abc7c8c230507749b237a52fb9ba3eddf9e9c
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Mon Nov 8 17:34:55 2021 +0100

    [hotfix][table-planner] Fix primitives/boxed primitives behaviour of ExpressionEvaluator
    
    Signed-off-by: slinkydeveloper <fr...@gmail.com>
---
 .../AbstractExpressionCodeGeneratorCastRule.java   | 23 ++++++++++++----
 .../flink/table/planner/codegen/CodeGenUtils.scala | 32 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 5 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractExpressionCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractExpressionCodeGeneratorCastRule.java
index f3ed23f..aba38f0 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractExpressionCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractExpressionCodeGeneratorCastRule.java
@@ -30,10 +30,15 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
 
 import java.util.Collections;
 
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.box;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.unbox;
+
 /**
  * Base class for cast rules that supports code generation, requiring only an expression to perform
  * the cast. If the casting logic requires to generate several statements, look at {@link
  * AbstractNullAwareCodeGeneratorCastRule}.
+ *
+ * <p>NOTE: the {@code inputTerm} is always either a primitive or a non-null object.
  */
 @Internal
 public abstract class AbstractExpressionCodeGeneratorCastRule<IN, OUT>
@@ -63,11 +68,19 @@ public abstract class AbstractExpressionCodeGeneratorCastRule<IN, OUT>
         final String inputArgumentName = "inputValue";
 
         final String expression =
-                generateExpression(
-                        createCodeGeneratorCastRuleContext(context),
-                        inputArgumentName,
-                        inputLogicalType,
-                        targetLogicalType);
+                // We need to wrap the expression in a null check
+                CastRuleUtils.ternaryOperator(
+                        inputArgumentName + " == null",
+                        "null",
+                        // Values are always boxed when passed to ExpressionEvaluator and no auto
+                        // boxing/unboxing is provided, so we need to take care of it manually
+                        box(
+                                generateExpression(
+                                        createCodeGeneratorCastRuleContext(context),
+                                        unbox(inputArgumentName, inputLogicalType),
+                                        inputLogicalType,
+                                        targetLogicalType),
+                                targetLogicalType));
 
         return new CodeGeneratedExpressionCastExecutor<>(
                 CompileUtils.compileExpression(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index 748c583..f63f3aa 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -195,6 +195,38 @@ object CodeGenUtils {
     case _ => boxedTypeTermForType(t)
   }
 
+  /**
+   * Execute primitive unboxing.
+   */
+  def unbox(term: String, ty: LogicalType): String = ty.getTypeRoot match {
+    // ordered by type root definition
+    case BOOLEAN => s"$term.booleanValue()"
+    case TINYINT => s"$term.byteValue()"
+    case SMALLINT => s"$term.shortValue()"
+    case INTEGER | DATE | TIME_WITHOUT_TIME_ZONE | INTERVAL_YEAR_MONTH => s"$term.intValue()"
+    case BIGINT | INTERVAL_DAY_TIME => s"$term.longValue()"
+    case FLOAT => s"$term.floatValue()"
+    case DOUBLE => s"$term.doubleValue()"
+    case DISTINCT_TYPE => unbox(term, ty.asInstanceOf[DistinctType].getSourceType)
+    case _ => term
+  }
+
+  /**
+   * Execute primitive unboxing.
+   */
+  def box(term: String, ty: LogicalType): String = ty.getTypeRoot match {
+    // ordered by type root definition
+    case BOOLEAN => s"Boolean.valueOf($term)"
+    case TINYINT => s"Byte.valueOf($term)"
+    case SMALLINT => s"Short.valueOf($term)"
+    case INTEGER | DATE | TIME_WITHOUT_TIME_ZONE | INTERVAL_YEAR_MONTH => s"Integer.valueOf($term)"
+    case BIGINT | INTERVAL_DAY_TIME => s"Long.valueOf($term)"
+    case FLOAT => s"Float.valueOf($term)"
+    case DOUBLE => s"Double.valueOf($term)"
+    case DISTINCT_TYPE => unbox(term, ty.asInstanceOf[DistinctType].getSourceType)
+    case _ => term
+  }
+
   @tailrec
   def boxedTypeTermForType(t: LogicalType): String = t.getTypeRoot match {
     // ordered by type root definition

[flink] 01/04: [hotfix][table-planner] Add ExpressionCodeGeneratorCastRule extracting generateExpression from AbstractExpressionCodeGeneratorCastRule

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5c914e15e2016be12ce3ea2b2127a2607df24a75
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Mon Nov 8 17:31:51 2021 +0100

    [hotfix][table-planner] Add ExpressionCodeGeneratorCastRule extracting generateExpression from AbstractExpressionCodeGeneratorCastRule
    
    Signed-off-by: slinkydeveloper <fr...@gmail.com>
---
 .../casting/ExpressionCodeGeneratorCastRule.java   | 44 ++++++++++++++++++++++
 .../rules/AbstractCharacterFamilyTargetRule.java   |  2 +-
 .../AbstractExpressionCodeGeneratorCastRule.java   | 12 ++----
 .../functions/casting/rules/IdentityCastRule.java  | 13 ++++++-
 4 files changed, 61 insertions(+), 10 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ExpressionCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ExpressionCodeGeneratorCastRule.java
new file mode 100644
index 0000000..a41db96
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ExpressionCodeGeneratorCastRule.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/**
+ * Cast rule that is able to generate a single expression containing all the casting logic.
+ *
+ * @param <IN> Input internal type
+ * @param <OUT> Output internal type
+ */
+@Internal
+public interface ExpressionCodeGeneratorCastRule<IN, OUT> extends CodeGeneratorCastRule<IN, OUT> {
+
+    /**
+     * Generate a Java expression performing the casting. This expression can be wrapped in another
+     * expression, or assigned to a variable or returned from a function.
+     *
+     * <p>NOTE: the {@code inputTerm} is always either a primitive or a non-null object.
+     */
+    String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType);
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCharacterFamilyTargetRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCharacterFamilyTargetRule.java
index 25cbdf1..0309566 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCharacterFamilyTargetRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCharacterFamilyTargetRule.java
@@ -46,7 +46,7 @@ public abstract class AbstractCharacterFamilyTargetRule<IN>
             LogicalType targetLogicalType);
 
     @Override
-    String generateExpression(
+    public String generateExpression(
             CodeGeneratorCastRule.Context context,
             String inputTerm,
             LogicalType inputLogicalType,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractExpressionCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractExpressionCodeGeneratorCastRule.java
index a4027c5..f3ed23f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractExpressionCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractExpressionCodeGeneratorCastRule.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.data.utils.CastExecutor;
 import org.apache.flink.table.planner.functions.casting.CastRule;
 import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
 import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.planner.functions.casting.ExpressionCodeGeneratorCastRule;
 import org.apache.flink.table.runtime.generated.CompileUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
@@ -36,18 +37,13 @@ import java.util.Collections;
  */
 @Internal
 public abstract class AbstractExpressionCodeGeneratorCastRule<IN, OUT>
-        extends AbstractNullAwareCodeGeneratorCastRule<IN, OUT> {
+        extends AbstractNullAwareCodeGeneratorCastRule<IN, OUT>
+        implements ExpressionCodeGeneratorCastRule<IN, OUT> {
 
     protected AbstractExpressionCodeGeneratorCastRule(CastRulePredicate predicate) {
         super(predicate);
     }
 
-    abstract String generateExpression(
-            CodeGeneratorCastRule.Context context,
-            String inputTerm,
-            LogicalType inputLogicalType,
-            LogicalType targetLogicalType);
-
     @Override
     protected String generateCodeBlockInternal(
             CodeGeneratorCastRule.Context context,
@@ -58,7 +54,7 @@ public abstract class AbstractExpressionCodeGeneratorCastRule<IN, OUT>
         return returnVariable
                 + " = "
                 + generateExpression(context, inputTerm, inputLogicalType, targetLogicalType)
-                + ";";
+                + ";\n";
     }
 
     @Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/IdentityCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/IdentityCastRule.java
index e96c4d4..184520f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/IdentityCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/IdentityCastRule.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
 import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
 import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.planner.functions.casting.ExpressionCodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
@@ -31,7 +32,8 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
  * #isIdentityCast(LogicalType, LogicalType)}
  */
 @Internal
-public class IdentityCastRule extends AbstractCodeGeneratorCastRule<Object, Object> {
+public class IdentityCastRule extends AbstractCodeGeneratorCastRule<Object, Object>
+        implements ExpressionCodeGeneratorCastRule<Object, Object> {
 
     public static final IdentityCastRule INSTANCE = new IdentityCastRule();
 
@@ -51,6 +53,15 @@ public class IdentityCastRule extends AbstractCodeGeneratorCastRule<Object, Obje
     }
 
     @Override
+    public String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        return inputTerm;
+    }
+
+    @Override
     public CastCodeBlock generateCodeBlock(
             CodeGeneratorCastRule.Context context,
             String inputTerm,

[flink] 03/04: [FLINK-24779][table-planner] Add numeric casting rules

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4891ee93bc0f0e3b1f8bf6190963ea5931b0dc2f
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Mon Nov 8 17:37:18 2021 +0100

    [FLINK-24779][table-planner] Add numeric casting rules
    
    Signed-off-by: slinkydeveloper <fr...@gmail.com>
    
    This closes #17738.
---
 .../functions/casting/CastRuleProvider.java        |  10 +-
 .../casting/ExpressionCodeGeneratorCastRule.java   |   3 +-
 .../AbstractExpressionCodeGeneratorCastRule.java   |   4 +-
 .../AbstractNullAwareCodeGeneratorCastRule.java    |  14 +-
 .../functions/casting/rules/CastRuleUtils.java     |  66 +++++
 ...CastRule.java => DecimalToDecimalCastRule.java} |  27 +-
 ...java => DecimalToNumericPrimitiveCastRule.java} |  35 ++-
 .../functions/casting/rules/IdentityCastRule.java  |  18 ++
 .../casting/rules/NumericPrimitiveCastRule.java    |  79 ++++++
 .../rules/NumericPrimitiveToDecimalCastRule.java   |  75 ++++++
 .../flink/table/planner/codegen/CodeGenUtils.scala | 111 ++-------
 .../table/planner/codegen/ExprCodeGenerator.scala  |   4 +-
 .../planner/codegen/calls/BuiltInMethods.scala     |  27 +-
 .../table/planner/codegen/calls/IfCallGen.scala    |  32 ++-
 .../planner/codegen/calls/ScalarOperatorGens.scala | 113 +++------
 .../planner/functions/casting/CastRulesTest.java   | 272 ++++++++++++++++++++-
 .../apache/flink/table/data/DecimalDataUtils.java  |  24 --
 17 files changed, 683 insertions(+), 231 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
index 2684e79..a926630 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
@@ -25,15 +25,18 @@ import org.apache.flink.table.planner.functions.casting.rules.ArrayToStringCastR
 import org.apache.flink.table.planner.functions.casting.rules.BinaryToStringCastRule;
 import org.apache.flink.table.planner.functions.casting.rules.BooleanToStringCastRule;
 import org.apache.flink.table.planner.functions.casting.rules.DateToStringCastRule;
+import org.apache.flink.table.planner.functions.casting.rules.DecimalToDecimalCastRule;
+import org.apache.flink.table.planner.functions.casting.rules.DecimalToNumericPrimitiveCastRule;
 import org.apache.flink.table.planner.functions.casting.rules.IdentityCastRule;
 import org.apache.flink.table.planner.functions.casting.rules.IntervalToStringCastRule;
 import org.apache.flink.table.planner.functions.casting.rules.MapToStringCastRule;
+import org.apache.flink.table.planner.functions.casting.rules.NumericPrimitiveCastRule;
+import org.apache.flink.table.planner.functions.casting.rules.NumericPrimitiveToDecimalCastRule;
 import org.apache.flink.table.planner.functions.casting.rules.NumericToStringCastRule;
 import org.apache.flink.table.planner.functions.casting.rules.RawToStringCastRule;
 import org.apache.flink.table.planner.functions.casting.rules.RowToStringCastRule;
 import org.apache.flink.table.planner.functions.casting.rules.TimeToStringCastRule;
 import org.apache.flink.table.planner.functions.casting.rules.TimestampToStringCastRule;
-import org.apache.flink.table.planner.functions.casting.rules.UpcastToBigIntCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -56,7 +59,10 @@ public class CastRuleProvider {
     static {
         INSTANCE
                 // Numeric rules
-                .addRule(UpcastToBigIntCastRule.INSTANCE)
+                .addRule(DecimalToDecimalCastRule.INSTANCE)
+                .addRule(NumericPrimitiveToDecimalCastRule.INSTANCE)
+                .addRule(DecimalToNumericPrimitiveCastRule.INSTANCE)
+                .addRule(NumericPrimitiveCastRule.INSTANCE)
                 // To string rules
                 .addRule(NumericToStringCastRule.INSTANCE)
                 .addRule(BooleanToStringCastRule.INSTANCE)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ExpressionCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ExpressionCodeGeneratorCastRule.java
index a41db96..68b2aba 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ExpressionCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ExpressionCodeGeneratorCastRule.java
@@ -34,7 +34,8 @@ public interface ExpressionCodeGeneratorCastRule<IN, OUT> extends CodeGeneratorC
      * Generate a Java expression performing the casting. This expression can be wrapped in another
      * expression, or assigned to a variable or returned from a function.
      *
-     * <p>NOTE: the {@code inputTerm} is always either a primitive or a non-null object.
+     * <p>NOTE: the {@code inputTerm} is always either a primitive or a non-null object, while the
+     * expression result is either a primitive or a nullable object.
      */
     String generateExpression(
             CodeGeneratorCastRule.Context context,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractExpressionCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractExpressionCodeGeneratorCastRule.java
index aba38f0..eaf5fc4 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractExpressionCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractExpressionCodeGeneratorCastRule.java
@@ -30,8 +30,8 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
 
 import java.util.Collections;
 
-import static org.apache.flink.table.planner.codegen.CodeGenUtils.box;
-import static org.apache.flink.table.planner.codegen.CodeGenUtils.unbox;
+import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.box;
+import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.unbox;
 
 /**
  * Base class for cast rules that supports code generation, requiring only an expression to perform
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractNullAwareCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractNullAwareCodeGeneratorCastRule.java
index c58d1cd..9826286 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractNullAwareCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractNullAwareCodeGeneratorCastRule.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
 import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.isPrimitiveNullable;
 import static org.apache.flink.table.planner.codegen.CodeGenUtils.primitiveDefaultValue;
 import static org.apache.flink.table.planner.codegen.CodeGenUtils.primitiveTypeTermForType;
 
@@ -59,8 +60,7 @@ public abstract class AbstractNullAwareCodeGeneratorCastRule<IN, OUT>
             LogicalType targetType) {
         final CastRuleUtils.CodeWriter writer = new CastRuleUtils.CodeWriter();
 
-        // Result of a casting can be null only and only if the input is null
-        final boolean isResultNullable = inputType.isNullable();
+        final boolean isResultNullable = inputType.isNullable() || isPrimitiveNullable(targetType);
         String nullTerm;
         if (isResultNullable) {
             nullTerm = context.declareVariable("boolean", "isNull");
@@ -81,7 +81,15 @@ public abstract class AbstractNullAwareCodeGeneratorCastRule<IN, OUT>
         if (isResultNullable) {
             writer.ifStmt(
                     "!" + nullTerm,
-                    thenWriter -> thenWriter.appendBlock(castCodeBlock),
+                    thenWriter -> {
+                        thenWriter.appendBlock(castCodeBlock);
+
+                        // If the result type is not primitive,
+                        // then perform another null check
+                        if (isPrimitiveNullable(targetType)) {
+                            thenWriter.assignStmt(nullTerm, returnTerm + " == null");
+                        }
+                    },
                     elseWriter ->
                             elseWriter.assignStmt(returnTerm, primitiveDefaultValue(targetType)));
         } else {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/CastRuleUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/CastRuleUtils.java
index c32d447..437884f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/CastRuleUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/CastRuleUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.functions.casting.rules;
 import org.apache.flink.table.planner.codegen.CodeGenUtils;
 import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
 import org.apache.flink.table.planner.functions.casting.CastRule;
+import org.apache.flink.table.types.logical.DistinctType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.utils.EncodingUtils;
 
@@ -41,6 +42,10 @@ final class CastRuleUtils {
     static final String NULL_STR_LITERAL = strLiteral("null");
     static final String EMPTY_STR_LITERAL = "\"\"";
 
+    static String staticCall(Class<?> clazz, String methodName, Object... args) {
+        return methodCall(className(clazz), methodName, args);
+    }
+
     static String staticCall(Method staticMethod, Object... args) {
         return functionCall(CodeGenUtils.qualifyMethod(staticMethod), args);
     }
@@ -80,6 +85,67 @@ final class CastRuleUtils {
         return "\"" + EncodingUtils.escapeJava(str) + "\"";
     }
 
+    static String cast(String target, String expression) {
+        return "((" + target + ")(" + expression + "))";
+    }
+
+    static String castToPrimitive(LogicalType target, String expression) {
+        return cast(primitiveTypeTermForType(target), expression);
+    }
+
+    static String unbox(String term, LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case BOOLEAN:
+                return methodCall(term, "booleanValue");
+            case TINYINT:
+                return methodCall(term, "byteValue");
+            case SMALLINT:
+                return methodCall(term, "shortValue");
+            case INTEGER:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+            case INTERVAL_YEAR_MONTH:
+                return methodCall(term, "intValue");
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                return methodCall(term, "longValue");
+            case FLOAT:
+                return methodCall(term, "floatValue");
+            case DOUBLE:
+                return methodCall(term, "doubleValue");
+            case DISTINCT_TYPE:
+                return unbox(term, ((DistinctType) type).getSourceType());
+        }
+        return term;
+    }
+
+    static String box(String term, LogicalType type) {
+        switch (type.getTypeRoot()) {
+                // ordered by type root definition
+            case BOOLEAN:
+                return staticCall(Boolean.class, "valueOf", term);
+            case TINYINT:
+                return staticCall(Byte.class, "valueOf", term);
+            case SMALLINT:
+                return staticCall(Short.class, "valueOf", term);
+            case INTEGER:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+            case INTERVAL_YEAR_MONTH:
+                return staticCall(Integer.class, "valueOf", term);
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                return staticCall(Long.class, "valueOf", term);
+            case FLOAT:
+                return staticCall(Float.class, "valueOf", term);
+            case DOUBLE:
+                return staticCall(Double.class, "valueOf", term);
+            case DISTINCT_TYPE:
+                box(term, ((DistinctType) type).getSourceType());
+        }
+        return term;
+    }
+
     static final class CodeWriter {
         StringBuilder builder = new StringBuilder();
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/UpcastToBigIntCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/DecimalToDecimalCastRule.java
similarity index 60%
copy from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/UpcastToBigIntCastRule.java
copy to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/DecimalToDecimalCastRule.java
index 4d002f3..11700fd 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/UpcastToBigIntCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/DecimalToDecimalCastRule.java
@@ -19,24 +19,28 @@
 package org.apache.flink.table.planner.functions.casting.rules;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.planner.codegen.calls.BuiltInMethods;
 import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
 import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
-/** Upcasting to long for smaller types. */
+import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.staticCall;
+
+/** {@link LogicalTypeRoot#DECIMAL} to {@link LogicalTypeRoot#DECIMAL} cast rule. */
 @Internal
-public class UpcastToBigIntCastRule extends AbstractExpressionCodeGeneratorCastRule<Object, Long> {
+public class DecimalToDecimalCastRule
+        extends AbstractExpressionCodeGeneratorCastRule<DecimalData, DecimalData> {
 
-    public static final UpcastToBigIntCastRule INSTANCE = new UpcastToBigIntCastRule();
+    public static final DecimalToDecimalCastRule INSTANCE = new DecimalToDecimalCastRule();
 
-    private UpcastToBigIntCastRule() {
+    private DecimalToDecimalCastRule() {
         super(
                 CastRulePredicate.builder()
-                        .input(LogicalTypeRoot.TINYINT)
-                        .input(LogicalTypeRoot.SMALLINT)
-                        .input(LogicalTypeRoot.INTEGER)
-                        .target(LogicalTypeRoot.BIGINT)
+                        .input(LogicalTypeRoot.DECIMAL)
+                        .target(LogicalTypeRoot.DECIMAL)
                         .build());
     }
 
@@ -46,6 +50,11 @@ public class UpcastToBigIntCastRule extends AbstractExpressionCodeGeneratorCastR
             String inputTerm,
             LogicalType inputLogicalType,
             LogicalType targetLogicalType) {
-        return "((long)(" + inputTerm + "))";
+        final DecimalType targetDecimalType = (DecimalType) targetLogicalType;
+        return staticCall(
+                BuiltInMethods.DECIMAL_TO_DECIMAL(),
+                inputTerm,
+                targetDecimalType.getPrecision(),
+                targetDecimalType.getScale());
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/UpcastToBigIntCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/DecimalToNumericPrimitiveCastRule.java
similarity index 51%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/UpcastToBigIntCastRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/DecimalToNumericPrimitiveCastRule.java
index 4d002f3..c9f4927 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/UpcastToBigIntCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/DecimalToNumericPrimitiveCastRule.java
@@ -19,24 +19,37 @@
 package org.apache.flink.table.planner.functions.casting.rules;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
 import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
-/** Upcasting to long for smaller types. */
+import java.lang.reflect.Method;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.DECIMAL_TO_DOUBLE;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.DECIMAL_TO_INTEGRAL;
+import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.castToPrimitive;
+import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.staticCall;
+
+/**
+ * {@link LogicalTypeRoot#DECIMAL} to {@link LogicalTypeFamily#INTEGER_NUMERIC} and {@link
+ * LogicalTypeFamily#APPROXIMATE_NUMERIC} cast rule.
+ */
 @Internal
-public class UpcastToBigIntCastRule extends AbstractExpressionCodeGeneratorCastRule<Object, Long> {
+public class DecimalToNumericPrimitiveCastRule
+        extends AbstractExpressionCodeGeneratorCastRule<DecimalData, Number> {
 
-    public static final UpcastToBigIntCastRule INSTANCE = new UpcastToBigIntCastRule();
+    public static final DecimalToNumericPrimitiveCastRule INSTANCE =
+            new DecimalToNumericPrimitiveCastRule();
 
-    private UpcastToBigIntCastRule() {
+    private DecimalToNumericPrimitiveCastRule() {
         super(
                 CastRulePredicate.builder()
-                        .input(LogicalTypeRoot.TINYINT)
-                        .input(LogicalTypeRoot.SMALLINT)
-                        .input(LogicalTypeRoot.INTEGER)
-                        .target(LogicalTypeRoot.BIGINT)
+                        .input(LogicalTypeRoot.DECIMAL)
+                        .target(LogicalTypeFamily.INTEGER_NUMERIC)
+                        .target(LogicalTypeFamily.APPROXIMATE_NUMERIC)
                         .build());
     }
 
@@ -46,6 +59,10 @@ public class UpcastToBigIntCastRule extends AbstractExpressionCodeGeneratorCastR
             String inputTerm,
             LogicalType inputLogicalType,
             LogicalType targetLogicalType) {
-        return "((long)(" + inputTerm + "))";
+        Method method =
+                targetLogicalType.is(LogicalTypeFamily.INTEGER_NUMERIC)
+                        ? DECIMAL_TO_INTEGRAL()
+                        : DECIMAL_TO_DOUBLE();
+        return castToPrimitive(targetLogicalType, staticCall(method, inputTerm));
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/IdentityCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/IdentityCastRule.java
index 184520f..f769a71 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/IdentityCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/IdentityCastRule.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
 import org.apache.flink.table.planner.functions.casting.ExpressionCodeGeneratorCastRule;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
 
 /**
@@ -49,6 +50,23 @@ public class IdentityCastRule extends AbstractCodeGeneratorCastRule<Object, Obje
                 && targetLogicalType.is(LogicalTypeFamily.CHARACTER_STRING)) {
             return true;
         }
+
+        // INTERVAL_YEAR_MONTH and INTEGER uses the same primitive int type
+        if ((inputLogicalType.is(LogicalTypeRoot.INTERVAL_YEAR_MONTH)
+                        && targetLogicalType.is(LogicalTypeRoot.INTEGER))
+                || (inputLogicalType.is(LogicalTypeRoot.INTEGER)
+                        && targetLogicalType.is(LogicalTypeRoot.INTERVAL_YEAR_MONTH))) {
+            return true;
+        }
+
+        // INTERVAL_DAY_TIME and BIGINT uses the same primitive long type
+        if ((inputLogicalType.is(LogicalTypeRoot.INTERVAL_DAY_TIME)
+                        && targetLogicalType.is(LogicalTypeRoot.BIGINT))
+                || (inputLogicalType.is(LogicalTypeRoot.BIGINT)
+                        && targetLogicalType.is(LogicalTypeRoot.INTERVAL_DAY_TIME))) {
+            return true;
+        }
+
         return LogicalTypeCasts.supportsAvoidingCast(inputLogicalType, targetLogicalType);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/NumericPrimitiveCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/NumericPrimitiveCastRule.java
new file mode 100644
index 0000000..6e20861
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/NumericPrimitiveCastRule.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.APPROXIMATE_NUMERIC;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.INTEGER_NUMERIC;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_DAY_TIME;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_YEAR_MONTH;
+
+/**
+ * Cast rule for {@link LogicalTypeFamily#INTEGER_NUMERIC} and {@link
+ * LogicalTypeFamily#APPROXIMATE_NUMERIC} and {@link LogicalTypeFamily#INTERVAL} conversions.
+ */
+@Internal
+public class NumericPrimitiveCastRule
+        extends AbstractExpressionCodeGeneratorCastRule<Number, Number> {
+
+    public static final NumericPrimitiveCastRule INSTANCE = new NumericPrimitiveCastRule();
+
+    private NumericPrimitiveCastRule() {
+        super(CastRulePredicate.builder().predicate(NumericPrimitiveCastRule::matches).build());
+    }
+
+    private static boolean matches(LogicalType input, LogicalType target) {
+        // Exclude identity casting
+        if (input.is(target.getTypeRoot())) {
+            return false;
+        }
+
+        // Conversions between primitive numerics
+        if ((input.is(INTEGER_NUMERIC) || input.is(APPROXIMATE_NUMERIC))
+                && (target.is(INTEGER_NUMERIC) || target.is(APPROXIMATE_NUMERIC))) {
+            return true;
+        }
+
+        // Conversions between Interval year month (int) and bigint (long)
+        if ((input.is(INTERVAL_YEAR_MONTH) && target.is(BIGINT))
+                || (input.is(BIGINT) && target.is(INTERVAL_YEAR_MONTH))) {
+            return true;
+        }
+
+        // Conversions between Interval day time (long) and integer (int)
+        return (input.is(INTERVAL_DAY_TIME) && target.is(INTEGER))
+                || (input.is(INTEGER) && target.is(INTERVAL_DAY_TIME));
+    }
+
+    @Override
+    public String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        return CastRuleUtils.castToPrimitive(targetLogicalType, inputTerm);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/NumericPrimitiveToDecimalCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/NumericPrimitiveToDecimalCastRule.java
new file mode 100644
index 0000000..df62905
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/NumericPrimitiveToDecimalCastRule.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.DOUBLE_TO_DECIMAL;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.INTEGRAL_TO_DECIMAL;
+import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.cast;
+import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.staticCall;
+
+/**
+ * {@link LogicalTypeFamily#INTEGER_NUMERIC} and {@link LogicalTypeFamily#APPROXIMATE_NUMERIC} to
+ * {@link LogicalTypeRoot#DECIMAL} cast rule.
+ */
+@Internal
+public class NumericPrimitiveToDecimalCastRule
+        extends AbstractExpressionCodeGeneratorCastRule<Number, DecimalData> {
+
+    public static final NumericPrimitiveToDecimalCastRule INSTANCE =
+            new NumericPrimitiveToDecimalCastRule();
+
+    private NumericPrimitiveToDecimalCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeFamily.INTEGER_NUMERIC)
+                        .input(LogicalTypeFamily.APPROXIMATE_NUMERIC)
+                        .target(LogicalTypeRoot.DECIMAL)
+                        .build());
+    }
+
+    @Override
+    public String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final DecimalType targetDecimalType = (DecimalType) targetLogicalType;
+        if (inputLogicalType.is(LogicalTypeFamily.INTEGER_NUMERIC)) {
+            return staticCall(
+                    INTEGRAL_TO_DECIMAL(),
+                    cast("long", inputTerm),
+                    targetDecimalType.getPrecision(),
+                    targetDecimalType.getScale());
+        }
+        return staticCall(
+                DOUBLE_TO_DECIMAL(),
+                cast("double", inputTerm),
+                targetDecimalType.getPrecision(),
+                targetDecimalType.getScale());
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index f63f3aa..22bb463 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -195,38 +195,6 @@ object CodeGenUtils {
     case _ => boxedTypeTermForType(t)
   }
 
-  /**
-   * Execute primitive unboxing.
-   */
-  def unbox(term: String, ty: LogicalType): String = ty.getTypeRoot match {
-    // ordered by type root definition
-    case BOOLEAN => s"$term.booleanValue()"
-    case TINYINT => s"$term.byteValue()"
-    case SMALLINT => s"$term.shortValue()"
-    case INTEGER | DATE | TIME_WITHOUT_TIME_ZONE | INTERVAL_YEAR_MONTH => s"$term.intValue()"
-    case BIGINT | INTERVAL_DAY_TIME => s"$term.longValue()"
-    case FLOAT => s"$term.floatValue()"
-    case DOUBLE => s"$term.doubleValue()"
-    case DISTINCT_TYPE => unbox(term, ty.asInstanceOf[DistinctType].getSourceType)
-    case _ => term
-  }
-
-  /**
-   * Execute primitive unboxing.
-   */
-  def box(term: String, ty: LogicalType): String = ty.getTypeRoot match {
-    // ordered by type root definition
-    case BOOLEAN => s"Boolean.valueOf($term)"
-    case TINYINT => s"Byte.valueOf($term)"
-    case SMALLINT => s"Short.valueOf($term)"
-    case INTEGER | DATE | TIME_WITHOUT_TIME_ZONE | INTERVAL_YEAR_MONTH => s"Integer.valueOf($term)"
-    case BIGINT | INTERVAL_DAY_TIME => s"Long.valueOf($term)"
-    case FLOAT => s"Float.valueOf($term)"
-    case DOUBLE => s"Double.valueOf($term)"
-    case DISTINCT_TYPE => unbox(term, ty.asInstanceOf[DistinctType].getSourceType)
-    case _ => term
-  }
-
   @tailrec
   def boxedTypeTermForType(t: LogicalType): String = t.getTypeRoot match {
     // ordered by type root definition
@@ -254,6 +222,22 @@ object CodeGenUtils {
   }
 
   /**
+   * Returns true if [[primitiveDefaultValue()]] returns a nullable Java type, that is,
+   * a non primitive type.
+   */
+  @tailrec
+  def isPrimitiveNullable(t: LogicalType): Boolean = t.getTypeRoot match {
+    // ordered by type root definition
+    case BOOLEAN | TINYINT | SMALLINT | INTEGER |
+         DATE | TIME_WITHOUT_TIME_ZONE | INTERVAL_YEAR_MONTH |
+         BIGINT | INTERVAL_DAY_TIME | FLOAT | DOUBLE => false
+
+    case DISTINCT_TYPE => isPrimitiveNullable(t.asInstanceOf[DistinctType].getSourceType)
+
+    case _ => true
+  }
+
+  /**
     * Gets the default value for a primitive type, and null for generic types
     */
   @tailrec
@@ -328,69 +312,6 @@ object CodeGenUtils {
       throw new IllegalArgumentException("Illegal type: " + t)
   }
 
-  // ----------------------------------------------------------------------------------------------
-
-  // Cast numeric type to another numeric type with larger range.
-  // This function must be in sync with [[NumericOrDefaultReturnTypeInference]].
-  def getNumericCastedResultTerm(expr: GeneratedExpression, targetType: LogicalType): String = {
-    (expr.resultType.getTypeRoot, targetType.getTypeRoot) match {
-      case _ if isInteroperable(expr.resultType, targetType) => expr.resultTerm
-
-      // byte -> other numeric types
-      case (TINYINT, SMALLINT) => s"(short) ${expr.resultTerm}"
-      case (TINYINT, INTEGER) => s"(int) ${expr.resultTerm}"
-      case (TINYINT, BIGINT) => s"(long) ${expr.resultTerm}"
-      case (TINYINT, DECIMAL) =>
-        val dt = targetType.asInstanceOf[DecimalType]
-        s"$DECIMAL_UTIL.castFrom(" +
-          s"${expr.resultTerm}, ${dt.getPrecision}, ${dt.getScale})"
-      case (TINYINT, FLOAT) => s"(float) ${expr.resultTerm}"
-      case (TINYINT, DOUBLE) => s"(double) ${expr.resultTerm}"
-
-      // short -> other numeric types
-      case (SMALLINT, INTEGER) => s"(int) ${expr.resultTerm}"
-      case (SMALLINT, BIGINT) => s"(long) ${expr.resultTerm}"
-      case (SMALLINT, DECIMAL) =>
-        val dt = targetType.asInstanceOf[DecimalType]
-        s"$DECIMAL_UTIL.castFrom(" +
-          s"${expr.resultTerm}, ${dt.getPrecision}, ${dt.getScale})"
-      case (SMALLINT, FLOAT) => s"(float) ${expr.resultTerm}"
-      case (SMALLINT, DOUBLE) => s"(double) ${expr.resultTerm}"
-
-      // int -> other numeric types
-      case (INTEGER, BIGINT) => s"(long) ${expr.resultTerm}"
-      case (INTEGER, DECIMAL) =>
-        val dt = targetType.asInstanceOf[DecimalType]
-        s"$DECIMAL_UTIL.castFrom(" +
-          s"${expr.resultTerm}, ${dt.getPrecision}, ${dt.getScale})"
-      case (INTEGER, FLOAT) => s"(float) ${expr.resultTerm}"
-      case (INTEGER, DOUBLE) => s"(double) ${expr.resultTerm}"
-
-      // long -> other numeric types
-      case (BIGINT, DECIMAL) =>
-        val dt = targetType.asInstanceOf[DecimalType]
-        s"$DECIMAL_UTIL.castFrom(" +
-          s"${expr.resultTerm}, ${dt.getPrecision}, ${dt.getScale})"
-      case (BIGINT, FLOAT) => s"(float) ${expr.resultTerm}"
-      case (BIGINT, DOUBLE) => s"(double) ${expr.resultTerm}"
-
-      // decimal -> other numeric types
-      case (DECIMAL, DECIMAL) =>
-        val dt = targetType.asInstanceOf[DecimalType]
-        s"$DECIMAL_UTIL.castToDecimal(" +
-          s"${expr.resultTerm}, ${dt.getPrecision}, ${dt.getScale})"
-      case (DECIMAL, FLOAT) =>
-        s"$DECIMAL_UTIL.castToFloat(${expr.resultTerm})"
-      case (DECIMAL, DOUBLE) =>
-        s"$DECIMAL_UTIL.castToDouble(${expr.resultTerm})"
-
-      // float -> other numeric types
-      case (FLOAT, DOUBLE) => s"(double) ${expr.resultTerm}"
-
-      case _ => null
-    }
-  }
-
   // -------------------------- Method & Enum ---------------------------------------
 
   def qualifyMethod(method: Method): String =
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 6895214..6009cc1 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -823,13 +823,13 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
             operands.foreach { operand =>
               requireComparable(operand)
             }
-            generateGreatestLeast(resultType, operands)
+            generateGreatestLeast(ctx, resultType, operands)
 
           case BuiltInFunctionDefinitions.LEAST =>
             operands.foreach { operand =>
               requireComparable(operand)
             }
-            generateGreatestLeast(resultType, operands, greatest = false)
+            generateGreatestLeast(ctx, resultType, operands, greatest = false)
 
           case BuiltInFunctionDefinitions.JSON_STRING =>
             new JsonStringCallGen(call).generate(ctx, operands, resultType)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index 55f820e..15132b0 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -18,15 +18,14 @@
 
 package org.apache.flink.table.planner.codegen.calls
 
-import org.apache.flink.table.data.{DecimalData, TimestampData}
+import org.apache.flink.table.data.{DecimalData, DecimalDataUtils, TimestampData}
 import org.apache.flink.table.runtime.functions._
 import org.apache.flink.table.utils.DateTimeUtils
 import org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange
 
 import org.apache.calcite.linq4j.tree.Types
 import org.apache.calcite.runtime.{JsonFunctions, SqlFunctions}
-import org.apache.calcite.sql.{SqlJsonConstructorNullClause, SqlJsonExistsErrorBehavior, SqlJsonQueryEmptyOrErrorBehavior, SqlJsonQueryWrapperBehavior, SqlJsonValueEmptyOrErrorBehavior}
-import org.apache.calcite.sql.{SqlJsonConstructorNullClause, SqlJsonExistsErrorBehavior, SqlJsonQueryEmptyOrErrorBehavior, SqlJsonQueryWrapperBehavior, SqlJsonValueEmptyOrErrorBehavior}
+import org.apache.calcite.sql.{SqlJsonExistsErrorBehavior, SqlJsonQueryEmptyOrErrorBehavior, SqlJsonQueryWrapperBehavior, SqlJsonValueEmptyOrErrorBehavior}
 import org.apache.flink.table.data.binary.BinaryStringData
 
 import java.lang.reflect.Method
@@ -548,4 +547,26 @@ object BuiltInMethods {
   val BINARY_STRING_DATA_FROM_STRING = Types.lookupMethod(classOf[BinaryStringData], "fromString",
     classOf[String])
 
+  // DecimalData functions
+
+  val DECIMAL_TO_DECIMAL = Types.lookupMethod(
+    classOf[DecimalDataUtils],
+    "castToDecimal", classOf[DecimalData], classOf[Int], classOf[Int])
+
+  val DECIMAL_TO_INTEGRAL = Types.lookupMethod(
+    classOf[DecimalDataUtils],
+    "castToIntegral", classOf[DecimalData])
+
+  val DECIMAL_TO_DOUBLE = Types.lookupMethod(
+    classOf[DecimalDataUtils],
+    "doubleValue", classOf[DecimalData])
+
+  val INTEGRAL_TO_DECIMAL = Types.lookupMethod(
+    classOf[DecimalDataUtils],
+    "castFrom", classOf[Long], classOf[Int], classOf[Int])
+
+  val DOUBLE_TO_DECIMAL = Types.lookupMethod(
+    classOf[DecimalDataUtils],
+    "castFrom", classOf[Long], classOf[Int], classOf[Int])
+
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala
index fcc62c3..af8061c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala
@@ -18,8 +18,11 @@
 
 package org.apache.flink.table.planner.codegen.calls
 
-import org.apache.flink.table.planner.codegen.CodeGenUtils.{primitiveDefaultValue, primitiveTypeTermForType}
-import org.apache.flink.table.planner.codegen.{CodeGenUtils, CodeGeneratorContext, GeneratedExpression}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, primitiveDefaultValue, primitiveTypeTermForType}
+import org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens.toCastContext
+import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGenUtils, CodeGeneratorContext, GeneratedExpression}
+import org.apache.flink.table.planner.functions.casting.{CastRuleProvider, ExpressionCodeGeneratorCastRule}
+import org.apache.flink.table.runtime.types.PlannerTypeUtils.isInteroperable
 import org.apache.flink.table.types.logical.LogicalType
 
 /**
@@ -36,8 +39,8 @@ class IfCallGen() extends CallGenerator {
     // Inferred return type is ARG1. Must be the same as ARG2.
     // This is a temporary solution which introduce type cast in codegen.
     // Not elegant, but can allow IF function to handle different numeric type arguments.
-    val castedResultTerm1 = CodeGenUtils.getNumericCastedResultTerm(operands(1), returnType)
-    val castedResultTerm2 = CodeGenUtils.getNumericCastedResultTerm(operands(2), returnType)
+    val castedResultTerm1 = normalizeArgument(ctx, operands(1), returnType)
+    val castedResultTerm2 = normalizeArgument(ctx, operands(2), returnType)
     if (castedResultTerm1 == null || castedResultTerm2 == null) {
       throw new Exception(String.format("Unsupported operand types: IF(boolean, %s, %s)",
         operands(1).resultType, operands(2).resultType))
@@ -51,6 +54,7 @@ class IfCallGen() extends CallGenerator {
 
     val resultCode =
       s"""
+         |// --- Start code generated by ${className[IfCallGen]}
          |${operands.head.code}
          |$resultTerm = $resultDefault;
          |if (${operands.head.resultTerm}) {
@@ -66,8 +70,28 @@ class IfCallGen() extends CallGenerator {
          |  }
          |  $nullTerm = ${operands(2).nullTerm};
          |}
+         |// --- End code generated by ${className[IfCallGen]}
        """.stripMargin
 
     GeneratedExpression(resultTerm, nullTerm, resultCode, returnType)
   }
+
+  /**
+   * This function will return the argument term casted if an expression casting can be performed,
+   * or null if no casting can be performed
+   */
+  private def normalizeArgument(
+    ctx: CodeGeneratorContext, expr: GeneratedExpression, targetType: LogicalType): String = {
+      val rule = CastRuleProvider.resolve(expr.resultType, targetType)
+      rule match {
+        case codeGeneratorCastRule: ExpressionCodeGeneratorCastRule[_, _] =>
+          codeGeneratorCastRule.generateExpression(
+            toCastContext(ctx),
+            expr.resultTerm,
+            expr.resultType,
+            targetType
+          )
+        case _ => null
+      }
+  }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 6532bd7..3021776 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -22,13 +22,14 @@ import org.apache.flink.table.api.{DataTypes, ValidationException}
 import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter
 import org.apache.flink.table.data.binary.BinaryArrayData
 import org.apache.flink.table.data.conversion.DataStructureConverters
-import org.apache.flink.table.planner.functions.casting.{CastRule, CastRuleProvider, CodeGeneratorCastRule}
+import org.apache.flink.table.planner.functions.casting.{CastRule, CastRuleProvider, CodeGeneratorCastRule, ExpressionCodeGeneratorCastRule}
 import org.apache.flink.table.data.util.{DataFormatConverters, MapDataUtil}
 import org.apache.flink.table.data.writer.{BinaryArrayWriter, BinaryRowWriter}
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{binaryRowFieldSetAccess, binaryRowSetNull, binaryWriterWriteField, binaryWriterWriteNull, _}
 import org.apache.flink.table.planner.codegen.GenerateUtils._
 import org.apache.flink.table.planner.codegen.GeneratedExpression.{ALWAYS_NULL, NEVER_NULL, NO_CODE}
 import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGenUtils, CodeGeneratorContext, GenerateUtils, GeneratedExpression}
+import org.apache.flink.table.planner.functions.casting.rules.{AbstractExpressionCodeGeneratorCastRule, IdentityCastRule}
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
 import org.apache.flink.table.runtime.functions.SqlFunctionUtils
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
@@ -81,19 +82,19 @@ object ScalarOperatorGens {
     val leftCasting = operator match {
       case "%" =>
         if (isInteroperable(left.resultType, right.resultType)) {
-          numericCasting(left.resultType, resultType)
+          numericCasting(ctx, left.resultType, resultType)
         } else {
           val castedType = if (isDecimal(left.resultType)) {
             new BigIntType()
           } else {
             left.resultType
           }
-          numericCasting(left.resultType, castedType)
+          numericCasting(ctx, left.resultType, castedType)
         }
-      case _ => numericCasting(left.resultType, resultType)
+      case _ => numericCasting(ctx, left.resultType, resultType)
     }
 
-    val rightCasting = numericCasting(right.resultType, resultType)
+    val rightCasting = numericCasting(ctx, right.resultType, resultType)
     val resultTypeTerm = primitiveTypeTermForType(resultType)
 
     generateOperatorIfNotNull(ctx, resultType, left, right) {
@@ -117,7 +118,7 @@ object ScalarOperatorGens {
     // use it as is during calculation.
     def castToDec(t: LogicalType): String => String = t match {
       case _: DecimalType => (operandTerm: String) => s"$operandTerm"
-      case _ => numericCasting(t, resultType)
+      case _ => numericCasting(ctx, t, resultType)
     }
     val methods = Map(
       "+" -> "add",
@@ -358,7 +359,7 @@ object ScalarOperatorGens {
       // we need to normalize the values for the hash set
       val castNumeric = widerType match {
         case Some(t) => (value: GeneratedExpression) =>
-          numericCasting(value.resultType, t)(value.resultTerm)
+          numericCasting(ctx, value.resultType, t)(value.resultTerm)
         case None => (value: GeneratedExpression) => value.resultTerm
       }
 
@@ -951,19 +952,8 @@ object ScalarOperatorGens {
         }
 
         // Generate the code block
-        val castContext = new CodeGeneratorCastRule.Context {
-          override def getSessionTimeZoneTerm: String = ctx.addReusableSessionTimeZone()
-          override def declareVariable(ty: String, variablePrefix: String): String =
-            ctx.addReusableLocalVariable(ty, variablePrefix)
-          override def declareTypeSerializer(ty: LogicalType): String =
-            ctx.addReusableTypeSerializer(ty)
-          override def declareClassField(ty: String, field: String, init: String): String = {
-              ctx.addReusableMember(s"private $ty $field = $init;")
-              field
-            }
-        }
         val castCodeBlock = codeGeneratorCastRule.generateCodeBlock(
-          castContext,
+          toCastContext(ctx),
           operand.resultTerm,
           operand.nullTerm,
           inputType,
@@ -1203,13 +1193,6 @@ object ScalarOperatorGens {
         operandTerm => s"$operandTerm != 0"
       }
 
-      // between NUMERIC TYPE | Decimal
-      case (_, _) if isNumeric(operand.resultType) && isNumeric(targetType) =>
-        val operandCasting = numericCasting(operand.resultType, targetType)
-        generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-          operandTerm => s"${operandCasting(operandTerm)}"
-        }
-
       // Date -> Timestamp
       case (DATE, TIMESTAMP_WITHOUT_TIME_ZONE) =>
         generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
@@ -1795,6 +1778,7 @@ object ScalarOperatorGens {
    * returns NULL if any argument is NULL.
    */
   def generateGreatestLeast(
+      ctx: CodeGeneratorContext,
       resultType: LogicalType,
       elements: Seq[GeneratedExpression],
       greatest: Boolean = true)
@@ -1806,7 +1790,7 @@ object ScalarOperatorGens {
 
     def castIfNumeric(t: GeneratedExpression): String = {
       if (isNumeric(widerType.get)) {
-         s"${numericCasting(t.resultType, widerType.get).apply(t.resultTerm)}"
+         s"${numericCasting(ctx, t.resultType, widerType.get).apply(t.resultTerm)}"
       } else {
          s"${t.resultTerm}"
       }
@@ -2247,58 +2231,41 @@ object ScalarOperatorGens {
     expr.copy(resultType = targetType)
   }
 
+  /**
+   * @deprecated You should use [[generateCast()]]
+   */
+  @deprecated
   private def numericCasting(
+      ctx: CodeGeneratorContext,
       operandType: LogicalType,
       resultType: LogicalType): String => String = {
 
-    val resultTypeTerm = primitiveTypeTermForType(resultType)
-
-    def decToPrimMethod(targetType: LogicalType): String = targetType.getTypeRoot match {
-      case TINYINT => "castToByte"
-      case SMALLINT => "castToShort"
-      case INTEGER => "castToInt"
-      case BIGINT => "castToLong"
-      case FLOAT => "castToFloat"
-      case DOUBLE => "castToDouble"
-      case BOOLEAN => "castToBoolean"
-      case _ => throw new CodeGenException(s"Unsupported decimal casting type: '$targetType'")
+    // All numeric rules are assumed to be instance of AbstractExpressionCodeGeneratorCastRule
+    val rule = CastRuleProvider.resolve(operandType, resultType)
+    rule match {
+      case codeGeneratorCastRule: ExpressionCodeGeneratorCastRule[_, _] =>
+        operandTerm => codeGeneratorCastRule.generateExpression(
+          toCastContext(ctx),
+          operandTerm,
+          operandType,
+          resultType
+        )
+      case _ =>
+        throw new CodeGenException(s"Unsupported casting from $operandType to $resultType.")
     }
+  }
 
-    // no casting necessary
-    if (isInteroperable(operandType, resultType)) {
-      operandTerm => s"$operandTerm"
-    }
-    // decimal to decimal, may have different precision/scale
-    else if (isDecimal(resultType) && isDecimal(operandType)) {
-      val dt = resultType.asInstanceOf[DecimalType]
-      operandTerm =>
-        s"$DECIMAL_UTIL.castToDecimal($operandTerm, ${dt.getPrecision}, ${dt.getScale})"
-    }
-    // non_decimal_numeric to decimal
-    else if (isDecimal(resultType) && isNumeric(operandType)) {
-      val dt = resultType.asInstanceOf[DecimalType]
-      operandTerm =>
-        s"$DECIMAL_UTIL.castFrom($operandTerm, ${dt.getPrecision}, ${dt.getScale})"
-    }
-    // decimal to non_decimal_numeric
-    else if (isNumeric(resultType) && isDecimal(operandType) ) {
-      operandTerm =>
-        s"$DECIMAL_UTIL.${decToPrimMethod(resultType)}($operandTerm)"
-    }
-    // numeric to numeric
-    // TODO: Create a wrapper layer that handles type conversion between numeric.
-    else if (isNumeric(operandType) && isNumeric(resultType)) {
-      val resultTypeValue = resultTypeTerm + "Value()"
-      val boxedTypeTerm = boxedTypeTermForType(operandType)
-      operandTerm =>
-        s"(new $boxedTypeTerm($operandTerm)).$resultTypeValue"
-    }
-    // result type is time interval and operand type is integer
-    else if (isTimeInterval(resultType) && isInteger(operandType)){
-      operandTerm => s"(($resultTypeTerm) $operandTerm)"
-    }
-    else {
-      throw new CodeGenException(s"Unsupported casting from $operandType to $resultType.")
+  def toCastContext(ctx: CodeGeneratorContext): CodeGeneratorCastRule.Context = {
+    new CodeGeneratorCastRule.Context {
+      override def getSessionTimeZoneTerm: String = ctx.addReusableSessionTimeZone()
+      override def declareVariable(ty: String, variablePrefix: String): String =
+        ctx.addReusableLocalVariable(ty, variablePrefix)
+      override def declareTypeSerializer(ty: LogicalType): String =
+        ctx.addReusableTypeSerializer(ty)
+      override def declareClassField(ty: String, field: String, init: String): String = {
+        ctx.addReusableMember(s"private $ty $field = $init;")
+        field
+      }
     }
   }
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
index 63dbbe9..587fc5a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
@@ -69,6 +69,7 @@ import static org.apache.flink.table.api.DataTypes.MAP;
 import static org.apache.flink.table.api.DataTypes.MONTH;
 import static org.apache.flink.table.api.DataTypes.RAW;
 import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SECOND;
 import static org.apache.flink.table.api.DataTypes.SMALLINT;
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.apache.flink.table.api.DataTypes.TIME;
@@ -92,6 +93,19 @@ class CastRulesTest {
             CastRule.Context.create(
                     ZoneId.of("CET"), Thread.currentThread().getContextClassLoader());
 
+    private static final byte DEFAULT_POSITIVE_TINY_INT = (byte) 5;
+    private static final byte DEFAULT_NEGATIVE_TINY_INT = (byte) -5;
+    private static final short DEFAULT_POSITIVE_SMALL_INT = (short) 12345;
+    private static final short DEFAULT_NEGATIVE_SMALL_INT = (short) -12345;
+    private static final int DEFAULT_POSITIVE_INT = 1234567;
+    private static final int DEFAULT_NEGATIVE_INT = -1234567;
+    private static final long DEFAULT_POSITIVE_BIGINT = 12345678901L;
+    private static final long DEFAULT_NEGATIVE_BIGINT = -12345678901L;
+    private static final float DEFAULT_POSITIVE_FLOAT = 123.456f;
+    private static final float DEFAULT_NEGATIVE_FLOAT = -123.456f;
+    private static final double DEFAULT_POSITIVE_DOUBLE = 123.456789d;
+    private static final double DEFAULT_NEGATIVE_DOUBLE = -123.456789d;
+
     private static final int DATE =
             DateTimeUtils.localDateToUnixDate(LocalDate.parse("2021-09-24"));
     private static final int TIME =
@@ -108,11 +122,232 @@ class CastRulesTest {
 
     Stream<CastTestSpecBuilder> testCases() {
         return Stream.of(
+                CastTestSpecBuilder.testCastTo(TINYINT())
+                        .fromCase(TINYINT(), null, null)
+                        .fromCase(
+                                DECIMAL(4, 3),
+                                DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
+                                (byte) 9)
+                        // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+                        // instead of overflow
+                        .fromCase(
+                                DECIMAL(10, 3),
+                                DecimalData.fromBigDecimal(new BigDecimal("9123.87"), 10, 3),
+                                (byte) -93)
+                        .fromCase(TINYINT(), DEFAULT_POSITIVE_TINY_INT, DEFAULT_POSITIVE_TINY_INT)
+                        .fromCase(TINYINT(), DEFAULT_NEGATIVE_TINY_INT, DEFAULT_NEGATIVE_TINY_INT)
+                        .fromCase(SMALLINT(), (short) 32, (byte) 32)
+                        .fromCase(SMALLINT(), DEFAULT_POSITIVE_SMALL_INT, (byte) 57)
+                        .fromCase(SMALLINT(), DEFAULT_NEGATIVE_SMALL_INT, (byte) -57)
+                        .fromCase(INT(), -12, (byte) -12)
+                        .fromCase(INT(), DEFAULT_POSITIVE_INT, (byte) -121)
+                        .fromCase(INT(), DEFAULT_NEGATIVE_INT, (byte) 121)
+                        .fromCase(BIGINT(), DEFAULT_POSITIVE_BIGINT, (byte) 53)
+                        .fromCase(BIGINT(), DEFAULT_NEGATIVE_BIGINT, (byte) -53)
+                        .fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, (byte) 123)
+                        .fromCase(FLOAT(), DEFAULT_NEGATIVE_FLOAT, (byte) -123)
+                        .fromCase(DOUBLE(), DEFAULT_POSITIVE_DOUBLE, (byte) 123)
+                        .fromCase(DOUBLE(), DEFAULT_NEGATIVE_DOUBLE, (byte) -123),
+                CastTestSpecBuilder.testCastTo(SMALLINT())
+                        .fromCase(SMALLINT(), null, null)
+                        .fromCase(
+                                DECIMAL(4, 3),
+                                DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
+                                (short) 9)
+                        // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+                        // instead of overflow
+                        .fromCase(
+                                DECIMAL(10, 3),
+                                DecimalData.fromBigDecimal(new BigDecimal("91235.87"), 10, 3),
+                                (short) 25699)
+                        .fromCase(
+                                TINYINT(),
+                                DEFAULT_POSITIVE_TINY_INT,
+                                (short) DEFAULT_POSITIVE_TINY_INT)
+                        .fromCase(
+                                TINYINT(),
+                                DEFAULT_NEGATIVE_TINY_INT,
+                                (short) DEFAULT_NEGATIVE_TINY_INT)
+                        .fromCase(
+                                SMALLINT(), DEFAULT_POSITIVE_SMALL_INT, DEFAULT_POSITIVE_SMALL_INT)
+                        .fromCase(
+                                SMALLINT(), DEFAULT_NEGATIVE_SMALL_INT, DEFAULT_NEGATIVE_SMALL_INT)
+                        .fromCase(SMALLINT(), (short) 32780, (short) -32756)
+                        .fromCase(INT(), DEFAULT_POSITIVE_INT, (short) -10617)
+                        .fromCase(INT(), DEFAULT_NEGATIVE_INT, (short) 10617)
+                        .fromCase(INT(), -12, (short) -12)
+                        .fromCase(BIGINT(), 123L, (short) 123)
+                        .fromCase(BIGINT(), DEFAULT_POSITIVE_BIGINT, (short) 7221)
+                        .fromCase(BIGINT(), DEFAULT_NEGATIVE_BIGINT, (short) -7221)
+                        .fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, (short) 123)
+                        .fromCase(FLOAT(), DEFAULT_NEGATIVE_FLOAT, (short) -123)
+                        .fromCase(FLOAT(), 123456.78f, (short) -7616)
+                        .fromCase(DOUBLE(), DEFAULT_POSITIVE_DOUBLE, (short) 123)
+                        .fromCase(DOUBLE(), DEFAULT_NEGATIVE_DOUBLE, (short) -123)
+                        .fromCase(DOUBLE(), 123456.7890d, (short) -7616),
+                CastTestSpecBuilder.testCastTo(INT())
+                        .fromCase(
+                                DECIMAL(4, 3),
+                                DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
+                                9)
+                        // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+                        // instead of overflow
+                        .fromCase(
+                                DECIMAL(20, 3),
+                                DecimalData.fromBigDecimal(
+                                        new BigDecimal("3276913443134.87"), 20, 3),
+                                -146603714)
+                        .fromCase(
+                                TINYINT(),
+                                DEFAULT_POSITIVE_TINY_INT,
+                                (int) DEFAULT_POSITIVE_TINY_INT)
+                        .fromCase(
+                                TINYINT(),
+                                DEFAULT_NEGATIVE_TINY_INT,
+                                (int) DEFAULT_NEGATIVE_TINY_INT)
+                        .fromCase(
+                                SMALLINT(),
+                                DEFAULT_POSITIVE_SMALL_INT,
+                                (int) DEFAULT_POSITIVE_SMALL_INT)
+                        .fromCase(
+                                SMALLINT(),
+                                DEFAULT_NEGATIVE_SMALL_INT,
+                                (int) DEFAULT_NEGATIVE_SMALL_INT)
+                        .fromCase(INT(), DEFAULT_POSITIVE_INT, DEFAULT_POSITIVE_INT)
+                        .fromCase(INT(), DEFAULT_NEGATIVE_INT, DEFAULT_NEGATIVE_INT)
+                        .fromCase(BIGINT(), 123L, 123)
+                        .fromCase(BIGINT(), DEFAULT_POSITIVE_BIGINT, -539222987)
+                        .fromCase(BIGINT(), DEFAULT_NEGATIVE_BIGINT, 539222987)
+                        .fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, 123)
+                        .fromCase(FLOAT(), DEFAULT_NEGATIVE_FLOAT, -123)
+                        .fromCase(FLOAT(), 9234567891.12f, 2147483647)
+                        .fromCase(DOUBLE(), DEFAULT_POSITIVE_DOUBLE, 123)
+                        .fromCase(DOUBLE(), DEFAULT_NEGATIVE_DOUBLE, -123)
+                        .fromCase(DOUBLE(), 9234567891.12345d, 2147483647)
+                        .fromCase(INTERVAL(YEAR(), MONTH()), 123, 123)
+                        .fromCase(INTERVAL(DAY(), SECOND()), 123L, 123),
                 CastTestSpecBuilder.testCastTo(BIGINT())
-                        .fromCase(BIGINT(), 10L, 10L)
-                        .fromCase(INT(), 10, 10L)
-                        .fromCase(SMALLINT(), (short) 10, 10L)
-                        .fromCase(TINYINT(), (byte) 10, 10L),
+                        .fromCase(BIGINT(), null, null)
+                        .fromCase(
+                                DECIMAL(4, 3),
+                                DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
+                                9L)
+                        .fromCase(
+                                DECIMAL(20, 3),
+                                DecimalData.fromBigDecimal(
+                                        new BigDecimal("3276913443134.87"), 20, 3),
+                                3276913443134L)
+                        .fromCase(
+                                TINYINT(),
+                                DEFAULT_POSITIVE_TINY_INT,
+                                (long) DEFAULT_POSITIVE_TINY_INT)
+                        .fromCase(
+                                TINYINT(),
+                                DEFAULT_NEGATIVE_TINY_INT,
+                                (long) DEFAULT_NEGATIVE_TINY_INT)
+                        .fromCase(
+                                SMALLINT(),
+                                DEFAULT_POSITIVE_SMALL_INT,
+                                (long) DEFAULT_POSITIVE_SMALL_INT)
+                        .fromCase(
+                                SMALLINT(),
+                                DEFAULT_NEGATIVE_SMALL_INT,
+                                (long) DEFAULT_NEGATIVE_SMALL_INT)
+                        .fromCase(INT(), DEFAULT_POSITIVE_INT, (long) DEFAULT_POSITIVE_INT)
+                        .fromCase(INT(), DEFAULT_NEGATIVE_INT, (long) DEFAULT_NEGATIVE_INT)
+                        .fromCase(BIGINT(), DEFAULT_POSITIVE_BIGINT, DEFAULT_POSITIVE_BIGINT)
+                        .fromCase(BIGINT(), DEFAULT_NEGATIVE_BIGINT, DEFAULT_NEGATIVE_BIGINT)
+                        .fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, 123L)
+                        .fromCase(FLOAT(), DEFAULT_NEGATIVE_FLOAT, -123L)
+                        .fromCase(FLOAT(), 9234567891.12f, 9234568192L)
+                        .fromCase(DOUBLE(), DEFAULT_POSITIVE_DOUBLE, 123L)
+                        .fromCase(DOUBLE(), DEFAULT_NEGATIVE_DOUBLE, -123L)
+                        .fromCase(DOUBLE(), 9234567891.12345d, 9234567891L),
+                CastTestSpecBuilder.testCastTo(FLOAT())
+                        .fromCase(FLOAT(), null, null)
+                        .fromCase(
+                                DECIMAL(4, 3),
+                                DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
+                                9.87f)
+                        // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+                        // instead of overflow
+                        .fromCase(
+                                DECIMAL(20, 3),
+                                DecimalData.fromBigDecimal(
+                                        new BigDecimal("3276913443134.87"), 20, 3),
+                                3.27691351E12f)
+                        .fromCase(
+                                TINYINT(),
+                                DEFAULT_POSITIVE_TINY_INT,
+                                (float) DEFAULT_POSITIVE_TINY_INT)
+                        .fromCase(
+                                TINYINT(),
+                                DEFAULT_NEGATIVE_TINY_INT,
+                                (float) DEFAULT_NEGATIVE_TINY_INT)
+                        .fromCase(
+                                SMALLINT(),
+                                DEFAULT_POSITIVE_SMALL_INT,
+                                (float) DEFAULT_POSITIVE_SMALL_INT)
+                        .fromCase(
+                                SMALLINT(),
+                                DEFAULT_NEGATIVE_SMALL_INT,
+                                (float) DEFAULT_NEGATIVE_SMALL_INT)
+                        .fromCase(INT(), DEFAULT_POSITIVE_INT, (float) DEFAULT_POSITIVE_INT)
+                        .fromCase(INT(), DEFAULT_NEGATIVE_INT, (float) DEFAULT_NEGATIVE_INT)
+                        .fromCase(
+                                BIGINT(), DEFAULT_POSITIVE_BIGINT, (float) DEFAULT_POSITIVE_BIGINT)
+                        .fromCase(
+                                BIGINT(), DEFAULT_NEGATIVE_BIGINT, (float) DEFAULT_NEGATIVE_BIGINT)
+                        .fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, DEFAULT_POSITIVE_FLOAT)
+                        .fromCase(FLOAT(), DEFAULT_NEGATIVE_FLOAT, DEFAULT_NEGATIVE_FLOAT)
+                        .fromCase(FLOAT(), 9234567891.12f, 9234567891.12f)
+                        .fromCase(DOUBLE(), DEFAULT_POSITIVE_DOUBLE, 123.456789f)
+                        .fromCase(DOUBLE(), DEFAULT_NEGATIVE_DOUBLE, -123.456789f)
+                        .fromCase(DOUBLE(), 1239234567891.1234567891234d, 1.23923451E12f),
+                CastTestSpecBuilder.testCastTo(DOUBLE())
+                        .fromCase(DOUBLE(), null, null)
+                        .fromCase(
+                                DECIMAL(4, 3),
+                                DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
+                                9.87d)
+                        .fromCase(
+                                DECIMAL(20, 3),
+                                DecimalData.fromBigDecimal(
+                                        new BigDecimal("3276913443134.87"), 20, 3),
+                                3.27691344313487E12d)
+                        .fromCase(
+                                DECIMAL(30, 20),
+                                DecimalData.fromBigDecimal(
+                                        new BigDecimal("123456789.123456789123456789"), 30, 20),
+                                1.2345678912345679E8d)
+                        .fromCase(
+                                TINYINT(),
+                                DEFAULT_POSITIVE_TINY_INT,
+                                (double) DEFAULT_POSITIVE_TINY_INT)
+                        .fromCase(
+                                TINYINT(),
+                                DEFAULT_NEGATIVE_TINY_INT,
+                                (double) DEFAULT_NEGATIVE_TINY_INT)
+                        .fromCase(
+                                SMALLINT(),
+                                DEFAULT_POSITIVE_SMALL_INT,
+                                (double) DEFAULT_POSITIVE_SMALL_INT)
+                        .fromCase(
+                                SMALLINT(),
+                                DEFAULT_NEGATIVE_SMALL_INT,
+                                (double) DEFAULT_NEGATIVE_SMALL_INT)
+                        .fromCase(INT(), DEFAULT_POSITIVE_INT, (double) DEFAULT_POSITIVE_INT)
+                        .fromCase(INT(), DEFAULT_NEGATIVE_INT, (double) DEFAULT_NEGATIVE_INT)
+                        .fromCase(
+                                BIGINT(), DEFAULT_POSITIVE_BIGINT, (double) DEFAULT_POSITIVE_BIGINT)
+                        .fromCase(
+                                BIGINT(), DEFAULT_NEGATIVE_BIGINT, (double) DEFAULT_NEGATIVE_BIGINT)
+                        .fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, 123.45600128173828d)
+                        .fromCase(FLOAT(), DEFAULT_NEGATIVE_FLOAT, -123.45600128173828)
+                        .fromCase(FLOAT(), 9234567891.12f, 9.234568192E9)
+                        .fromCase(DOUBLE(), DEFAULT_POSITIVE_DOUBLE, DEFAULT_POSITIVE_DOUBLE)
+                        .fromCase(DOUBLE(), DEFAULT_NEGATIVE_DOUBLE, DEFAULT_NEGATIVE_DOUBLE)
+                        .fromCase(DOUBLE(), 1239234567891.1234567891234d, 1.2392345678911235E12d),
                 CastTestSpecBuilder.testCastTo(STRING())
                         .fromCase(STRING(), null, null)
                         .fromCase(
@@ -236,6 +471,35 @@ class CastRulesTest {
                                 RawValueData.fromObject(
                                         LocalDateTime.parse("2020-11-11T18:08:01.123")),
                                 StringData.fromString("2020-11-11T18:08:01.123")),
+                CastTestSpecBuilder.testCastTo(DECIMAL(5, 3))
+                        .fromCase(
+                                DECIMAL(4, 3),
+                                DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
+                                DecimalData.fromBigDecimal(new BigDecimal("9.870"), 5, 3))
+                        .fromCase(
+                                TINYINT(),
+                                (byte) -1,
+                                DecimalData.fromBigDecimal(new BigDecimal("-1.000"), 5, 3))
+                        .fromCase(
+                                SMALLINT(),
+                                (short) 3,
+                                DecimalData.fromBigDecimal(new BigDecimal("3.000"), 5, 3))
+                        .fromCase(
+                                INT(),
+                                42,
+                                DecimalData.fromBigDecimal(new BigDecimal("42.000"), 5, 3))
+                        .fromCase(
+                                BIGINT(),
+                                8L,
+                                DecimalData.fromBigDecimal(new BigDecimal("8.000"), 5, 3))
+                        .fromCase(
+                                FLOAT(),
+                                -12.345f,
+                                DecimalData.fromBigDecimal(new BigDecimal("-12.345"), 5, 3))
+                        .fromCase(
+                                DOUBLE(),
+                                12.678d,
+                                DecimalData.fromBigDecimal(new BigDecimal("12.678"), 5, 3)),
                 CastTestSpecBuilder.testCastTo(ARRAY(STRING().nullable()))
                         .fromCase(
                                 ARRAY(TIMESTAMP().nullable()),
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/DecimalDataUtils.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/DecimalDataUtils.java
index 9ea7055..bb9ba00 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/DecimalDataUtils.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/DecimalDataUtils.java
@@ -173,30 +173,6 @@ public final class DecimalDataUtils {
         return bd.longValue();
     }
 
-    public static long castToLong(DecimalData dec) {
-        return castToIntegral(dec);
-    }
-
-    public static int castToInt(DecimalData dec) {
-        return (int) castToIntegral(dec);
-    }
-
-    public static short castToShort(DecimalData dec) {
-        return (short) castToIntegral(dec);
-    }
-
-    public static byte castToByte(DecimalData dec) {
-        return (byte) castToIntegral(dec);
-    }
-
-    public static float castToFloat(DecimalData dec) {
-        return (float) doubleValue(dec);
-    }
-
-    public static double castToDouble(DecimalData dec) {
-        return doubleValue(dec);
-    }
-
     public static DecimalData castToDecimal(DecimalData dec, int precision, int scale) {
         return fromBigDecimal(dec.toBigDecimal(), precision, scale);
     }