You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/12/14 13:11:01 UTC

[flink] branch master updated (efa3362 -> 4b1df49)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from efa3362  [FLINK-24232][coordination] Skip history server archiving for suspended jobs
     new 09aad58  [FLINK-24413][table] Apply trimming & padding when CASTing to CHAR/VARCHAR
     new b6ca017  [hotfix][table] Make use of VarCharType.STRING_TYPE
     new b0b68f1  [hotfix][table-planner][tests] Minor fixes to remove IDE warnings.
     new 4b1df49  [hotfix][table] Rename precision to length for CHAR/VARCHAR sink enforcer

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../generated/execution_config_configuration.html  |   4 +-
 .../table/api/config/ExecutionConfigOptions.java   |  31 +-
 .../flink/table/types/logical/VarCharType.java     |   2 +
 .../types/logical/utils/LogicalTypeParser.java     |   2 +-
 .../apache/flink/table/types/DataTypesTest.java    |   2 +-
 .../flink/table/types/LogicalCommonTypeTest.java   |   4 +-
 .../flink/table/types/LogicalTypeParserTest.java   |   2 +-
 .../types/extraction/DataTypeExtractorTest.java    |   9 +-
 .../functions/casting/ArrayToStringCastRule.java   | 187 +++++++-----
 .../functions/casting/BinaryToStringCastRule.java  |   3 +-
 .../functions/casting/BooleanToStringCastRule.java |   3 +-
 .../functions/casting/CastRulePredicate.java       |  52 ++--
 .../functions/casting/CastRuleProvider.java        |  23 +-
 .../casting/CharVarCharTrimPadCastRule.java        | 252 ++++++++++++++++
 .../functions/casting/DateToStringCastRule.java    |   7 +-
 .../casting/IntervalToStringCastRule.java          |   3 +-
 .../casting/MapAndMultisetToStringCastRule.java    | 300 +++++++++++--------
 .../functions/casting/NumericToStringCastRule.java |   3 +-
 .../functions/casting/RawToStringCastRule.java     |  54 +++-
 .../functions/casting/RowToStringCastRule.java     |  78 +++--
 .../functions/casting/TimeToStringCastRule.java    |   3 +-
 .../casting/TimestampToStringCastRule.java         |   3 +-
 .../plan/nodes/exec/common/CommonExecSink.java     |  10 +-
 .../table/planner/plan/type/FlinkReturnTypes.java  |   4 +-
 .../table/planner/codegen/calls/IfCallGen.scala    |  23 +-
 .../planner/codegen/calls/StringCallGen.scala      |   2 +-
 .../planner/codegen/SortCodeGeneratorTest.java     |   2 +-
 .../planner/functions/CastFunctionITCase.java      |  90 +++---
 .../functions/casting/CastRuleProviderTest.java    |  19 ++
 .../planner/functions/casting/CastRulesTest.java   | 332 +++++++++++++++++++++
 .../nodes/exec/common/CommonExecSinkITCase.java    |  18 +-
 .../apache/flink/table/api/batch/ExplainTest.scala |   2 +-
 .../flink/table/api/stream/ExplainTest.scala       |   2 +-
 .../planner/calcite/FlinkTypeFactoryTest.scala     |   6 +-
 .../table/planner/codegen/agg/AggTestBase.scala    |   4 +-
 .../codegen/agg/batch/BatchAggTestBase.scala       |   2 +-
 .../agg/batch/HashAggCodeGeneratorTest.scala       |   2 +-
 .../agg/batch/SortAggCodeGeneratorTest.scala       |   4 +-
 .../planner/expressions/ScalarFunctionsTest.scala  |  16 +-
 .../expressions/utils/ExpressionTestBase.scala     |   2 +-
 .../plan/batch/sql/DagOptimizationTest.scala       |   2 +-
 .../planner/plan/metadata/MetadataTestUtil.scala   |   6 +-
 .../plan/stream/sql/DagOptimizationTest.scala      |   2 +-
 .../planner/plan/stream/sql/LegacySinkTest.scala   |   2 +-
 .../stream/sql/MiniBatchIntervalInferTest.scala    |   2 +-
 .../batch/sql/PartitionableSinkITCase.scala        |   2 +-
 .../planner/runtime/batch/sql/UnionITCase.scala    |   2 +-
 .../planner/runtime/stream/sql/CalcITCase.scala    |   4 +-
 .../runtime/operators/sink/ConstraintEnforcer.java |  60 ++--
 .../flink/table/data/BinaryArrayDataTest.java      |   3 +-
 .../apache/flink/table/data/BinaryRowDataTest.java |   3 +-
 .../flink/table/data/DataFormatConvertersTest.java |   4 +-
 .../window/SlicingWindowAggOperatorTest.java       |   3 +-
 .../ProcTimeDeduplicateFunctionTestBase.java       |   3 +-
 .../RowTimeDeduplicateFunctionTestBase.java        |   3 +-
 .../RowTimeWindowDeduplicateOperatorTest.java      |   3 +-
 .../join/RandomSortMergeInnerJoinTest.java         |   6 +-
 .../join/String2HashJoinOperatorTest.java          |  14 +-
 .../join/String2SortMergeJoinOperatorTest.java     |  12 +-
 .../interval/TimeIntervalStreamJoinTestBase.java   |   6 +-
 .../TemporalProcessTimeJoinOperatorTest.java       |   6 +-
 .../temporal/TemporalTimeJoinOperatorTestBase.java |  12 +-
 .../join/window/WindowJoinOperatorTest.java        |   6 +-
 .../ProcTimeRangeBoundedPrecedingFunctionTest.java |   2 +-
 .../operators/over/RowTimeOverWindowTestBase.java  |   4 +-
 .../operators/rank/TopNFunctionTestBase.java       |   8 +-
 .../rank/window/WindowRankOperatorTest.java        |   5 +-
 .../operators/sort/ProcTimeSortOperatorTest.java   |   5 +-
 .../operators/sort/RowTimeSortOperatorTest.java    |  10 +-
 .../operators/sort/StreamSortOperatorTest.java     |   2 +-
 .../window/WindowOperatorContractTest.java         |   3 +-
 .../operators/window/WindowOperatorTest.java       |  11 +-
 .../runtime/types/DataTypePrecisionFixerTest.java  |   2 +-
 .../runtime/typeutils/RowDataSerializerTest.java   |   6 +-
 .../collections/binary/BytesHashMapTestBase.java   |   2 +-
 .../collections/binary/BytesMultiMapTestBase.java  |   4 +-
 76 files changed, 1298 insertions(+), 499 deletions(-)
 create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CharVarCharTrimPadCastRule.java

[flink] 01/04: [FLINK-24413][table] Apply trimming & padding when CASTing to CHAR/VARCHAR

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 09aad58943a1597466a78ebb8d543e6baa3f5092
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Thu Dec 9 13:52:38 2021 +0100

    [FLINK-24413][table] Apply trimming & padding when CASTing to CHAR/VARCHAR
    
    Apply trimming when CASTing to `CHAR(<length>)` or `VARCHAR(<length>)`
    and the length of the result string exceeds the length specified.
    Apply padding to the right with spaces when CASTing to `CHAR(<length>)`
    and the result string's length is less than the specified length, so
    that the length of result string matches exactly the length.
    
    This closes #18063.
---
 .../flink/table/types/logical/VarCharType.java     |   2 +
 .../functions/casting/ArrayToStringCastRule.java   | 187 +++++++-----
 .../functions/casting/BinaryToStringCastRule.java  |   3 +-
 .../functions/casting/BooleanToStringCastRule.java |   3 +-
 .../functions/casting/CastRulePredicate.java       |  52 ++--
 .../functions/casting/CastRuleProvider.java        |  23 +-
 .../casting/CharVarCharTrimPadCastRule.java        | 252 ++++++++++++++++
 .../functions/casting/DateToStringCastRule.java    |   7 +-
 .../casting/IntervalToStringCastRule.java          |   3 +-
 .../casting/MapAndMultisetToStringCastRule.java    | 300 +++++++++++--------
 .../functions/casting/NumericToStringCastRule.java |   3 +-
 .../functions/casting/RawToStringCastRule.java     |  54 +++-
 .../functions/casting/RowToStringCastRule.java     |  78 +++--
 .../functions/casting/TimeToStringCastRule.java    |   3 +-
 .../casting/TimestampToStringCastRule.java         |   3 +-
 .../table/planner/codegen/calls/IfCallGen.scala    |  23 +-
 .../planner/functions/CastFunctionITCase.java      |  29 +-
 .../functions/casting/CastRuleProviderTest.java    |  19 ++
 .../planner/functions/casting/CastRulesTest.java   | 332 +++++++++++++++++++++
 .../planner/expressions/ScalarFunctionsTest.scala  |  16 +-
 20 files changed, 1117 insertions(+), 275 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarCharType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarCharType.java
index 5a71b21..7c73b6c 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarCharType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarCharType.java
@@ -54,6 +54,8 @@ public final class VarCharType extends LogicalType {
 
     public static final int DEFAULT_LENGTH = 1;
 
+    public static final VarCharType STRING_TYPE = new VarCharType(MAX_LENGTH);
+
     private static final String FORMAT = "VARCHAR(%d)";
 
     private static final String MAX_FORMAT = "STRING";
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ArrayToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ArrayToStringCastRule.java
index e470739..57f9e48 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ArrayToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/ArrayToStringCastRule.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 
 import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
 import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
@@ -32,6 +33,9 @@ import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.NUL
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.strLiteral;
+import static org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule.couldTrim;
+import static org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule.stringExceedsLength;
+import static org.apache.flink.table.types.logical.VarCharType.STRING_TYPE;
 
 /** {@link LogicalTypeRoot#ARRAY} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
 class ArrayToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
@@ -51,28 +55,54 @@ class ArrayToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<Array
                         .build());
     }
 
-    /* Example generated code for ARRAY<INT>:
+    /* Example generated code for ARRAY<INT> -> CHAR(10)
 
     isNull$0 = _myInputIsNull;
     if (!isNull$0) {
         builder$1.setLength(0);
         builder$1.append("[");
-        for (int i$2 = 0; i$2 < _myInput.size(); i$2++) {
-            if (i$2 != 0) {
+        for (int i$3 = 0; i$3 < _myInput.size(); i$3++) {
+            if (builder$1.length() > 10) {
+                break;
+            }
+            if (i$3 != 0) {
                 builder$1.append(", ");
             }
-            int element$3 = -1;
-            boolean elementIsNull$4 = _myInput.isNullAt(i$2);
-            if (!elementIsNull$4) {
-                element$3 = _myInput.getInt(i$2);
-                result$2 = org.apache.flink.table.data.binary.BinaryStringData.fromString("" + element$3);
-                builder$1.append(result$2);
+            int element$4 = -1;
+            boolean elementIsNull$5 = _myInput.isNullAt(i$3);
+            if (!elementIsNull$5) {
+                element$4 = _myInput.getInt(i$3);
+                isNull$2 = false;
+                if (!isNull$2) {
+                    result$3 = org.apache.flink.table.data.binary.BinaryStringData.fromString("" + element$4);
+                    isNull$2 = result$3 == null;
+                } else {
+                    result$3 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+                }
+                builder$1.append(result$3);
             } else {
                 builder$1.append("null");
             }
         }
         builder$1.append("]");
-        result$1 = org.apache.flink.table.data.binary.BinaryStringData.fromString(builder$1.toString());
+        java.lang.String resultString$2;
+        resultString$2 = builder$1.toString();
+        if (builder$1.length() > 10) {
+            resultString$2 = builder$1.substring(0, java.lang.Math.min(builder$1.length(), 10));
+        } else {
+            if (resultString$2.length() < 10) {
+                int padLength$6;
+                padLength$6 = 10 - resultString$2.length();
+                java.lang.StringBuilder sbPadding$7;
+                sbPadding$7 = new java.lang.StringBuilder();
+                for (int i$8 = 0; i$8 < padLength$6; i$8++) {
+                    sbPadding$7.append(" ");
+                }
+                resultString$2 = resultString$2 + sbPadding$7.toString();
+            }
+        }
+        result$1 = org.apache.flink.table.data.binary.BinaryStringData.fromString(resultString$2);
+        isNull$0 = result$1 == null;
     } else {
         result$1 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
     }
@@ -91,74 +121,93 @@ class ArrayToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<Array
         context.declareClassField(
                 className(StringBuilder.class), builderTerm, constructorCall(StringBuilder.class));
 
-        return new CastRuleUtils.CodeWriter()
-                .stmt(methodCall(builderTerm, "setLength", 0))
-                .stmt(methodCall(builderTerm, "append", strLiteral("[")))
-                .forStmt(
-                        methodCall(inputTerm, "size"),
-                        (indexTerm, loopBodyWriter) -> {
-                            String elementTerm = newName("element");
-                            String elementIsNullTerm = newName("elementIsNull");
+        final String resultStringTerm = newName("resultString");
+        final int length = LogicalTypeChecks.getLength(targetLogicalType);
+
+        CastRuleUtils.CodeWriter writer =
+                new CastRuleUtils.CodeWriter()
+                        .stmt(methodCall(builderTerm, "setLength", 0))
+                        .stmt(methodCall(builderTerm, "append", strLiteral("[")))
+                        .forStmt(
+                                methodCall(inputTerm, "size"),
+                                (indexTerm, loopBodyWriter) -> {
+                                    String elementTerm = newName("element");
+                                    String elementIsNullTerm = newName("elementIsNull");
 
-                            CastCodeBlock codeBlock =
-                                    CastRuleProvider.generateCodeBlock(
-                                            context,
-                                            elementTerm,
-                                            "false",
-                                            // Null check is done at the array access level
-                                            innerInputType.copy(false),
-                                            targetLogicalType);
+                                    CastCodeBlock codeBlock =
+                                            CastRuleProvider.generateCodeBlock(
+                                                    context,
+                                                    elementTerm,
+                                                    "false",
+                                                    // Null check is done at the array
+                                                    // access level
+                                                    innerInputType.copy(false),
+                                                    STRING_TYPE);
 
-                            loopBodyWriter
-                                    // Write the comma
-                                    .ifStmt(
-                                            indexTerm + " != 0",
-                                            thenBodyWriter ->
-                                                    thenBodyWriter.stmt(
-                                                            methodCall(
-                                                                    builderTerm,
-                                                                    "append",
-                                                                    strLiteral(", "))))
-                                    // Extract element from array
-                                    .declPrimitiveStmt(innerInputType, elementTerm)
-                                    .declStmt(
-                                            boolean.class,
-                                            elementIsNullTerm,
-                                            methodCall(inputTerm, "isNullAt", indexTerm))
-                                    .ifStmt(
-                                            "!" + elementIsNullTerm,
-                                            thenBodyWriter ->
-                                                    thenBodyWriter
-                                                            // If element not null, extract it and
-                                                            // execute the cast
-                                                            .assignStmt(
-                                                                    elementTerm,
-                                                                    rowFieldReadAccess(
-                                                                            indexTerm,
-                                                                            inputTerm,
-                                                                            innerInputType))
-                                                            .append(codeBlock)
-                                                            .stmt(
+                                    if (!context.legacyBehaviour() && couldTrim(length)) {
+                                        // Break if the target length is already exceeded
+                                        loopBodyWriter.ifStmt(
+                                                stringExceedsLength(builderTerm, length),
+                                                thenBodyWriter -> thenBodyWriter.stmt("break"));
+                                    }
+                                    loopBodyWriter
+                                            // Write the comma
+                                            .ifStmt(
+                                                    indexTerm + " != 0",
+                                                    thenBodyWriter ->
+                                                            thenBodyWriter.stmt(
+                                                                    methodCall(
+                                                                            builderTerm,
+                                                                            "append",
+                                                                            strLiteral(", "))))
+                                            // Extract element from array
+                                            .declPrimitiveStmt(innerInputType, elementTerm)
+                                            .declStmt(
+                                                    boolean.class,
+                                                    elementIsNullTerm,
+                                                    methodCall(inputTerm, "isNullAt", indexTerm))
+                                            .ifStmt(
+                                                    "!" + elementIsNullTerm,
+                                                    thenBodyWriter ->
+                                                            thenBodyWriter
+                                                                    // If element not null,
+                                                                    // extract it and
+                                                                    // execute the cast
+                                                                    .assignStmt(
+                                                                            elementTerm,
+                                                                            rowFieldReadAccess(
+                                                                                    indexTerm,
+                                                                                    inputTerm,
+                                                                                    innerInputType))
+                                                                    .append(codeBlock)
+                                                                    .stmt(
+                                                                            methodCall(
+                                                                                    builderTerm,
+                                                                                    "append",
+                                                                                    codeBlock
+                                                                                            .getReturnTerm())),
+                                                    elseBodyWriter ->
+                                                            // If element is null, just
+                                                            // write NULL
+                                                            elseBodyWriter.stmt(
                                                                     methodCall(
                                                                             builderTerm,
                                                                             "append",
-                                                                            codeBlock
-                                                                                    .getReturnTerm())),
-                                            elseBodyWriter ->
-                                                    // If element is null, just write NULL
-                                                    elseBodyWriter.stmt(
-                                                            methodCall(
-                                                                    builderTerm,
-                                                                    "append",
-                                                                    NULL_STR_LITERAL)));
-                        })
-                .stmt(methodCall(builderTerm, "append", strLiteral("]")))
+                                                                            NULL_STR_LITERAL)));
+                                })
+                        .stmt(methodCall(builderTerm, "append", strLiteral("]")));
+        return CharVarCharTrimPadCastRule.padAndTrimStringIfNeeded(
+                        writer,
+                        targetLogicalType,
+                        context.legacyBehaviour(),
+                        length,
+                        resultStringTerm,
+                        builderTerm)
                 // Assign the result value
                 .assignStmt(
                         returnVariable,
                         CastRuleUtils.staticCall(
-                                BINARY_STRING_DATA_FROM_STRING(),
-                                methodCall(builderTerm, "toString")))
+                                BINARY_STRING_DATA_FROM_STRING(), resultStringTerm))
                 .toString();
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java
index fd95948..126e3c0 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java
@@ -25,6 +25,7 @@ import java.nio.charset.StandardCharsets;
 
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.accessStaticField;
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
+import static org.apache.flink.table.types.logical.VarCharType.STRING_TYPE;
 
 /**
  * {@link LogicalTypeFamily#BINARY_STRING} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule.
@@ -37,7 +38,7 @@ class BinaryToStringCastRule extends AbstractCharacterFamilyTargetRule<byte[]> {
         super(
                 CastRulePredicate.builder()
                         .input(LogicalTypeFamily.BINARY_STRING)
-                        .target(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(STRING_TYPE)
                         .build());
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BooleanToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BooleanToStringCastRule.java
index ae95571..0d3ab13 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BooleanToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BooleanToStringCastRule.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.EMPTY_STR_LITERAL;
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.stringConcat;
+import static org.apache.flink.table.types.logical.VarCharType.STRING_TYPE;
 
 /** {@link LogicalTypeRoot#BOOLEAN} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
 class BooleanToStringCastRule extends AbstractCharacterFamilyTargetRule<Object> {
@@ -34,7 +35,7 @@ class BooleanToStringCastRule extends AbstractCharacterFamilyTargetRule<Object>
         super(
                 CastRulePredicate.builder()
                         .input(LogicalTypeRoot.BOOLEAN)
-                        .target(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(STRING_TYPE)
                         .build());
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRulePredicate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRulePredicate.java
index 40555a8..3b3c67f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRulePredicate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRulePredicate.java
@@ -35,16 +35,17 @@ import java.util.function.BiPredicate;
  * of input and target type using this class. In particular, a rule is applied if:
  *
  * <ol>
- *   <li>{@link #getTargetTypes()} includes the {@link LogicalTypeRoot} of target type and either
+ *   <li>{@link #getTargetTypeRoots()} includes the {@link LogicalTypeRoot} of target type and
+ *       either
  *       <ol>
- *         <li>{@link #getInputTypes()} includes the {@link LogicalTypeRoot} of input type or
+ *         <li>{@link #getInputTypeRoots()} includes the {@link LogicalTypeRoot} of input type or
  *         <li>{@link #getInputTypeFamilies()} includes one of the {@link LogicalTypeFamily} of
  *             input type
  *       </ol>
  *   <li>Or {@link #getTargetTypeFamilies()} includes one of the {@link LogicalTypeFamily} of target
  *       type and either
  *       <ol>
- *         <li>{@link #getInputTypes()} includes the {@link LogicalTypeRoot} of input type or
+ *         <li>{@link #getInputTypeRoots()} includes the {@link LogicalTypeRoot} of input type or
  *         <li>{@link #getInputTypeFamilies()} includes one of the {@link LogicalTypeFamily} of
  *             input type
  *       </ol>
@@ -59,8 +60,10 @@ import java.util.function.BiPredicate;
 @Internal
 public class CastRulePredicate {
 
-    private final Set<LogicalTypeRoot> inputTypes;
-    private final Set<LogicalTypeRoot> targetTypes;
+    private final Set<LogicalType> targetTypes;
+
+    private final Set<LogicalTypeRoot> inputTypeRoots;
+    private final Set<LogicalTypeRoot> targetTypeRoots;
 
     private final Set<LogicalTypeFamily> inputTypeFamilies;
     private final Set<LogicalTypeFamily> targetTypeFamilies;
@@ -68,24 +71,30 @@ public class CastRulePredicate {
     private final BiPredicate<LogicalType, LogicalType> customPredicate;
 
     private CastRulePredicate(
-            Set<LogicalTypeRoot> inputTypes,
-            Set<LogicalTypeRoot> targetTypes,
+            Set<LogicalType> targetTypes,
+            Set<LogicalTypeRoot> inputTypeRoots,
+            Set<LogicalTypeRoot> targetTypeRoots,
             Set<LogicalTypeFamily> inputTypeFamilies,
             Set<LogicalTypeFamily> targetTypeFamilies,
             BiPredicate<LogicalType, LogicalType> customPredicate) {
-        this.inputTypes = inputTypes;
         this.targetTypes = targetTypes;
+        this.inputTypeRoots = inputTypeRoots;
+        this.targetTypeRoots = targetTypeRoots;
         this.inputTypeFamilies = inputTypeFamilies;
         this.targetTypeFamilies = targetTypeFamilies;
         this.customPredicate = customPredicate;
     }
 
-    public Set<LogicalTypeRoot> getInputTypes() {
-        return inputTypes;
+    public Set<LogicalType> getTargetTypes() {
+        return targetTypes;
     }
 
-    public Set<LogicalTypeRoot> getTargetTypes() {
-        return targetTypes;
+    public Set<LogicalTypeRoot> getInputTypeRoots() {
+        return inputTypeRoots;
+    }
+
+    public Set<LogicalTypeRoot> getTargetTypeRoots() {
+        return targetTypeRoots;
     }
 
     public Set<LogicalTypeFamily> getInputTypeFamilies() {
@@ -106,20 +115,26 @@ public class CastRulePredicate {
 
     /** Builder for the {@link CastRulePredicate}. */
     public static class Builder {
-        private final Set<LogicalTypeRoot> inputTypes = new HashSet<>();
-        private final Set<LogicalTypeRoot> targetTypes = new HashSet<>();
+        private final Set<LogicalTypeRoot> inputTypeRoots = new HashSet<>();
+        private final Set<LogicalTypeRoot> targetTypeRoots = new HashSet<>();
+        private final Set<LogicalType> targetTypes = new HashSet<>();
 
         private final Set<LogicalTypeFamily> inputTypeFamilies = new HashSet<>();
         private final Set<LogicalTypeFamily> targetTypeFamilies = new HashSet<>();
 
         private BiPredicate<LogicalType, LogicalType> customPredicate;
 
-        public Builder input(LogicalTypeRoot inputType) {
-            inputTypes.add(inputType);
+        public Builder input(LogicalTypeRoot inputTypeRoot) {
+            inputTypeRoots.add(inputTypeRoot);
+            return this;
+        }
+
+        public Builder target(LogicalTypeRoot outputTypeRoot) {
+            targetTypeRoots.add(outputTypeRoot);
             return this;
         }
 
-        public Builder target(LogicalTypeRoot outputType) {
+        public Builder target(LogicalType outputType) {
             targetTypes.add(outputType);
             return this;
         }
@@ -141,8 +156,9 @@ public class CastRulePredicate {
 
         public CastRulePredicate build() {
             return new CastRulePredicate(
-                    Collections.unmodifiableSet(inputTypes),
                     Collections.unmodifiableSet(targetTypes),
+                    Collections.unmodifiableSet(inputTypeRoots),
+                    Collections.unmodifiableSet(targetTypeRoots),
                     Collections.unmodifiableSet(inputTypeFamilies),
                     Collections.unmodifiableSet(targetTypeFamilies),
                     customPredicate);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
index afb91b7..b2d5fd1 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
@@ -78,6 +78,7 @@ public class CastRuleProvider {
                 .addRule(ArrayToArrayCastRule.INSTANCE)
                 .addRule(RowToRowCastRule.INSTANCE)
                 // Special rules
+                .addRule(CharVarCharTrimPadCastRule.INSTANCE)
                 .addRule(NullToStringCastRule.INSTANCE)
                 .addRule(IdentityCastRule.INSTANCE);
     }
@@ -148,10 +149,20 @@ public class CastRuleProvider {
     private CastRuleProvider addRule(CastRule<?, ?> rule) {
         CastRulePredicate predicate = rule.getPredicateDefinition();
 
-        for (LogicalTypeRoot targetTypeRoot : predicate.getTargetTypes()) {
+        for (LogicalType targetType : predicate.getTargetTypes()) {
+            final Map<Object, CastRule<?, ?>> map =
+                    rules.computeIfAbsent(targetType, k -> new HashMap<>());
+            for (LogicalTypeRoot inputTypeRoot : predicate.getInputTypeRoots()) {
+                map.put(inputTypeRoot, rule);
+            }
+            for (LogicalTypeFamily inputTypeFamily : predicate.getInputTypeFamilies()) {
+                map.put(inputTypeFamily, rule);
+            }
+        }
+        for (LogicalTypeRoot targetTypeRoot : predicate.getTargetTypeRoots()) {
             final Map<Object, CastRule<?, ?>> map =
                     rules.computeIfAbsent(targetTypeRoot, k -> new HashMap<>());
-            for (LogicalTypeRoot inputTypeRoot : predicate.getInputTypes()) {
+            for (LogicalTypeRoot inputTypeRoot : predicate.getInputTypeRoots()) {
                 map.put(inputTypeRoot, rule);
             }
             for (LogicalTypeFamily inputTypeFamily : predicate.getInputTypeFamilies()) {
@@ -161,7 +172,7 @@ public class CastRuleProvider {
         for (LogicalTypeFamily targetTypeFamily : predicate.getTargetTypeFamilies()) {
             final Map<Object, CastRule<?, ?>> map =
                     rules.computeIfAbsent(targetTypeFamily, k -> new HashMap<>());
-            for (LogicalTypeRoot inputTypeRoot : predicate.getInputTypes()) {
+            for (LogicalTypeRoot inputTypeRoot : predicate.getInputTypeRoots()) {
                 map.put(inputTypeRoot, rule);
             }
             for (LogicalTypeFamily inputTypeFamily : predicate.getInputTypeFamilies()) {
@@ -182,8 +193,10 @@ public class CastRuleProvider {
 
         final Iterator<Object> targetTypeRootFamilyIterator =
                 Stream.<Object>concat(
-                                Stream.of(targetType.getTypeRoot()),
-                                targetType.getTypeRoot().getFamilies().stream())
+                                Stream.of(targetType),
+                                Stream.<Object>concat(
+                                        Stream.of(targetType.getTypeRoot()),
+                                        targetType.getTypeRoot().getFamilies().stream()))
                         .iterator();
 
         // Try lookup by target type root/type families
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CharVarCharTrimPadCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CharVarCharTrimPadCastRule.java
new file mode 100644
index 0000000..9d87074
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CharVarCharTrimPadCastRule.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+import static org.apache.flink.table.types.logical.VarCharType.STRING_TYPE;
+
+/**
+ * Any source type to {@link LogicalTypeFamily#BINARY_STRING} cast rule.
+ *
+ * <p>This rule is used for casting from any of the {@link LogicalTypeFamily#PREDEFINED} types to
+ * {@link LogicalTypeRoot#CHAR} or {@link LogicalTypeRoot#VARCHAR}. It calls the underlying concrete
+ * matching rule, i.e.: {@link NumericToStringCastRule} to do the actual conversion and then
+ * performs any necessary trimming or padding so that the length of the result string value matches
+ * the one specified by the length of the target {@link LogicalTypeRoot#CHAR} or {@link
+ * LogicalTypeRoot#VARCHAR} type.
+ */
+class CharVarCharTrimPadCastRule
+        extends AbstractNullAwareCodeGeneratorCastRule<Object, StringData> {
+
+    static final CharVarCharTrimPadCastRule INSTANCE = new CharVarCharTrimPadCastRule();
+
+    private CharVarCharTrimPadCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                (inputType, targetType) ->
+                                        targetType.is(LogicalTypeFamily.CHARACTER_STRING)
+                                                && !targetType.equals(STRING_TYPE))
+                        .build());
+    }
+
+    /* Example generated code for STRING() -> CHAR(6) cast
+
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        if (_myInput.numChars() > 6) {
+            result$1 = _myInput.substring(0, 6);
+        } else {
+            if (_myInput.numChars() < 6) {
+                int padLength$1;
+                padLength$1 = 6 - _myInput.numChars();
+                org.apache.flink.table.data.binary.BinaryStringData padString$2;
+                padString$2 = org.apache.flink.table.data.binary.BinaryStringData.blankString(padLength$1);
+                result$1 = org.apache.flink.table.data.binary.BinaryStringDataUtil.concat(_myInput, padString$2);
+            } else {
+                result$1 = _myInput;
+            }
+        }
+        isNull$0 = result$1 == null;
+    } else {
+        result$1 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+    }
+
+    */
+    @Override
+    protected String generateCodeBlockInternal(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            String returnVariable,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final int length = LogicalTypeChecks.getLength(targetLogicalType);
+        CastRule<?, ?> castRule =
+                CastRuleProvider.resolve(inputLogicalType, VarCharType.STRING_TYPE);
+
+        // Only used for non-Constructed types - for constructed type and RAW, the trimming/padding
+        // is applied on each individual rule, i.e.: ArrayToStringCastRule, RawToStringCastRule
+        if (castRule instanceof ExpressionCodeGeneratorCastRule) {
+            @SuppressWarnings("rawtypes")
+            final String stringExpr =
+                    ((ExpressionCodeGeneratorCastRule) castRule)
+                            .generateExpression(
+                                    context, inputTerm, inputLogicalType, targetLogicalType);
+
+            final CastRuleUtils.CodeWriter writer = new CastRuleUtils.CodeWriter();
+            if (context.legacyBehaviour()
+                    || !(couldTrim(length) || couldPad(targetLogicalType, length))) {
+                return writer.assignStmt(returnVariable, stringExpr).toString();
+            }
+            return writer.ifStmt(
+                            methodCall(stringExpr, "numChars") + " > " + length,
+                            thenWriter ->
+                                    thenWriter.assignStmt(
+                                            returnVariable,
+                                            methodCall(stringExpr, "substring", 0, length)),
+                            elseWriter -> {
+                                if (couldPad(targetLogicalType, length)) {
+                                    final String padLength = newName("padLength");
+                                    final String padString = newName("padString");
+                                    elseWriter.ifStmt(
+                                            methodCall(stringExpr, "numChars") + " < " + length,
+                                            thenInnerWriter ->
+                                                    thenInnerWriter
+                                                            .declStmt(int.class, padLength)
+                                                            .assignStmt(
+                                                                    padLength,
+                                                                    length
+                                                                            + " - "
+                                                                            + methodCall(
+                                                                                    stringExpr,
+                                                                                    "numChars"))
+                                                            .declStmt(
+                                                                    BinaryStringData.class,
+                                                                    padString)
+                                                            .assignStmt(
+                                                                    padString,
+                                                                    staticCall(
+                                                                            BinaryStringData.class,
+                                                                            "blankString",
+                                                                            padLength))
+                                                            .assignStmt(
+                                                                    returnVariable,
+                                                                    staticCall(
+                                                                            BinaryStringDataUtil
+                                                                                    .class,
+                                                                            "concat",
+                                                                            stringExpr,
+                                                                            padString)),
+                                            elseInnerWriter ->
+                                                    elseInnerWriter.assignStmt(
+                                                            returnVariable, stringExpr));
+                                } else {
+                                    elseWriter.assignStmt(returnVariable, stringExpr);
+                                }
+                            })
+                    .toString();
+        } else {
+            throw new IllegalStateException("This is a bug. Please file an issue.");
+        }
+    }
+
+    // ---------------
+    // Shared methods
+    // ---------------
+
+    static String stringExceedsLength(String strTerm, int targetLength) {
+        return methodCall(strTerm, "length") + " > " + targetLength;
+    }
+
+    static String stringShouldPad(String strTerm, int targetLength) {
+        return methodCall(strTerm, "length") + " < " + targetLength;
+    }
+
+    static boolean couldTrim(int targetLength) {
+        return targetLength < VarCharType.MAX_LENGTH;
+    }
+
+    static boolean couldPad(LogicalType targetType, int targetLength) {
+        return targetType.is(LogicalTypeRoot.CHAR) && targetLength < VarCharType.MAX_LENGTH;
+    }
+
+    static CastRuleUtils.CodeWriter padAndTrimStringIfNeeded(
+            CastRuleUtils.CodeWriter writer,
+            LogicalType targetType,
+            boolean legacyBehaviour,
+            int length,
+            String resultStringTerm,
+            String builderTerm) {
+        writer.declStmt(String.class, resultStringTerm)
+                .assignStmt(resultStringTerm, methodCall(builderTerm, "toString"));
+
+        // Trim and Pad if needed
+        if (!legacyBehaviour && (couldTrim(length) || couldPad(targetType, length))) {
+            writer.ifStmt(
+                    stringExceedsLength(builderTerm, length),
+                    thenWriter ->
+                            thenWriter.assignStmt(
+                                    resultStringTerm,
+                                    methodCall(
+                                            builderTerm,
+                                            "substring",
+                                            0,
+                                            staticCall(
+                                                    Math.class,
+                                                    "min",
+                                                    methodCall(builderTerm, "length"),
+                                                    length))),
+                    elseWriter ->
+                            padStringIfNeeded(
+                                    elseWriter,
+                                    targetType,
+                                    legacyBehaviour,
+                                    length,
+                                    resultStringTerm));
+        }
+        return writer;
+    }
+
+    static void padStringIfNeeded(
+            CastRuleUtils.CodeWriter writer,
+            LogicalType targetType,
+            boolean legacyBehaviour,
+            int length,
+            String returnTerm) {
+
+        // Pad if needed
+        if (!legacyBehaviour && couldPad(targetType, length)) {
+            final String padLength = newName("padLength");
+            final String sbPadding = newName("sbPadding");
+            writer.ifStmt(
+                    stringShouldPad(returnTerm, length),
+                    thenWriter ->
+                            thenWriter
+                                    .declStmt(int.class, padLength)
+                                    .assignStmt(
+                                            padLength,
+                                            length + " - " + methodCall(returnTerm, "length"))
+                                    .declStmt(StringBuilder.class, sbPadding)
+                                    .assignStmt(sbPadding, constructorCall(StringBuilder.class))
+                                    .forStmt(
+                                            padLength,
+                                            (idx, loopWriter) ->
+                                                    loopWriter.stmt(
+                                                            methodCall(
+                                                                    sbPadding, "append", "\" \"")))
+                                    .assignStmt(
+                                            returnTerm,
+                                            returnTerm
+                                                    + " + "
+                                                    + methodCall(sbPadding, "toString")));
+        }
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/DateToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/DateToStringCastRule.java
index d4ab3b8..949b1af 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/DateToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/DateToStringCastRule.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
 import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.UNIX_DATE_TO_STRING;
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+import static org.apache.flink.table.types.logical.VarCharType.STRING_TYPE;
 
 /** {@link LogicalTypeRoot#DATE} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
 class DateToStringCastRule extends AbstractCharacterFamilyTargetRule<Long> {
@@ -31,11 +32,7 @@ class DateToStringCastRule extends AbstractCharacterFamilyTargetRule<Long> {
     static final DateToStringCastRule INSTANCE = new DateToStringCastRule();
 
     private DateToStringCastRule() {
-        super(
-                CastRulePredicate.builder()
-                        .input(LogicalTypeRoot.DATE)
-                        .target(LogicalTypeFamily.CHARACTER_STRING)
-                        .build());
+        super(CastRulePredicate.builder().input(LogicalTypeRoot.DATE).target(STRING_TYPE).build());
     }
 
     @Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/IntervalToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/IntervalToStringCastRule.java
index 8773a93..ba16178 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/IntervalToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/IntervalToStringCastRule.java
@@ -27,6 +27,7 @@ import java.lang.reflect.Method;
 import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.INTERVAL_DAY_TIME_TO_STRING;
 import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.INTERVAL_YEAR_MONTH_TO_STRING;
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+import static org.apache.flink.table.types.logical.VarCharType.STRING_TYPE;
 
 /** {@link LogicalTypeFamily#INTERVAL} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
 class IntervalToStringCastRule extends AbstractCharacterFamilyTargetRule<Object> {
@@ -37,7 +38,7 @@ class IntervalToStringCastRule extends AbstractCharacterFamilyTargetRule<Object>
         super(
                 CastRulePredicate.builder()
                         .input(LogicalTypeFamily.INTERVAL)
-                        .target(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(STRING_TYPE)
                         .build());
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapAndMultisetToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapAndMultisetToStringCastRule.java
index 25a4478..dd03c51 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapAndMultisetToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapAndMultisetToStringCastRule.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 
 import java.util.function.Consumer;
 
@@ -36,6 +37,9 @@ import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.NUL
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.strLiteral;
+import static org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule.couldTrim;
+import static org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule.stringExceedsLength;
+import static org.apache.flink.table.types.logical.VarCharType.STRING_TYPE;
 
 /**
  * {@link LogicalTypeRoot#MAP} and {@link LogicalTypeRoot#MULTISET} to {@link
@@ -64,7 +68,7 @@ class MapAndMultisetToStringCastRule
                                         ((MultisetType) input).getElementType(), target)));
     }
 
-    /* Example generated code for MAP<STRING, INTERVAL MONTH>:
+    /* Example generated code for MAP<STRING, INTERVAL MONTH> -> CHAR(12):
 
     isNull$0 = _myInputIsNull;
     if (!isNull$0) {
@@ -72,31 +76,57 @@ class MapAndMultisetToStringCastRule
         org.apache.flink.table.data.ArrayData values$3 = _myInput.valueArray();
         builder$1.setLength(0);
         builder$1.append("{");
-        for (int i$4 = 0; i$4 < _myInput.size(); i$4++) {
-            if (i$4 != 0) {
+        for (int i$5 = 0; i$5 < _myInput.size(); i$5++) {
+            if (builder$1.length() > 12) {
+                break;
+            }
+            if (i$5 != 0) {
                 builder$1.append(", ");
             }
-            org.apache.flink.table.data.binary.BinaryStringData key$5 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
-            boolean keyIsNull$6 = keys$2.isNullAt(i$4);
-            int value$7 = -1;
-            boolean valueIsNull$8 = values$3.isNullAt(i$4);
-            if (!keyIsNull$6) {
-                key$5 = ((org.apache.flink.table.data.binary.BinaryStringData) keys$2.getString(i$4));
-                builder$1.append(key$5);
+            org.apache.flink.table.data.binary.BinaryStringData key$6 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+            boolean keyIsNull$7 = keys$2.isNullAt(i$5);
+            int value$8 = -1;
+            boolean valueIsNull$9 = values$3.isNullAt(i$5);
+            if (!keyIsNull$7) {
+                key$6 = ((org.apache.flink.table.data.binary.BinaryStringData) keys$2.getString(i$5));
+                builder$1.append(key$6);
             } else {
                 builder$1.append("null");
             }
             builder$1.append("=");
-            if (!valueIsNull$8) {
-                value$7 = values$3.getInt(i$4);
-                result$2 = org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.flink.table.utils.DateTimeUtils.intervalYearMonthToString(value$7));
-                builder$1.append(result$2);
+            if (!valueIsNull$9) {
+                value$8 = values$3.getInt(i$5);
+                isNull$2 = valueIsNull$9;
+                if (!isNull$2) {
+                    result$3 = org.apache.flink.table.data.binary.BinaryStringData.fromString("" + value$8);
+                    isNull$2 = result$3 == null;
+                } else {
+                    result$3 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+                }
+                builder$1.append(result$3);
             } else {
                 builder$1.append("null");
             }
         }
         builder$1.append("}");
-        result$1 = org.apache.flink.table.data.binary.BinaryStringData.fromString(builder$1.toString());
+        java.lang.String resultString$4;
+        resultString$4 = builder$1.toString();
+        if (builder$1.length() > 12) {
+            resultString$4 = builder$1.substring(0, java.lang.Math.min(builder$1.length(), 12));
+        } else {
+            if (resultString$.length() < 12) {
+                int padLength$10;
+                padLength$10 = 12 - resultString$.length();
+                java.lang.StringBuilder sbPadding$11;
+                sbPadding$11 = new java.lang.StringBuilder();
+                for (int i$12 = 0; i$12 < padLength$10; i$12++) {
+                    sbPadding$11.append(" ");
+                }
+                resultString$4 = resultString$4 + sbPadding$11.toString();
+            }
+        }
+        result$1 = org.apache.flink.table.data.binary.BinaryStringData.fromString(resultString$4);
+        isNull$0 = result$1 == null;
     } else {
         result$1 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
     }
@@ -125,122 +155,154 @@ class MapAndMultisetToStringCastRule
         final String keyArrayTerm = newName("keys");
         final String valueArrayTerm = newName("values");
 
-        return new CastRuleUtils.CodeWriter()
-                .declStmt(ArrayData.class, keyArrayTerm, methodCall(inputTerm, "keyArray"))
-                .declStmt(ArrayData.class, valueArrayTerm, methodCall(inputTerm, "valueArray"))
-                .stmt(methodCall(builderTerm, "setLength", 0))
-                .stmt(methodCall(builderTerm, "append", strLiteral("{")))
-                .forStmt(
-                        methodCall(inputTerm, "size"),
-                        (indexTerm, loopBodyWriter) -> {
-                            String keyTerm = newName("key");
-                            String keyIsNullTerm = newName("keyIsNull");
-                            String valueTerm = newName("value");
-                            String valueIsNullTerm = newName("valueIsNull");
-
-                            CastCodeBlock keyCast =
-                                    CastRuleProvider.generateCodeBlock(
-                                            context,
-                                            keyTerm,
-                                            keyIsNullTerm,
-                                            // Null check is done at the key array access level
-                                            keyType.copy(false),
-                                            targetLogicalType);
-                            CastCodeBlock valueCast =
-                                    CastRuleProvider.generateCodeBlock(
-                                            context,
-                                            valueTerm,
-                                            valueIsNullTerm,
-                                            // Null check is done at the value array access level
-                                            valueType.copy(false),
-                                            targetLogicalType);
-
-                            Consumer<CastRuleUtils.CodeWriter> appendNonNullValue =
-                                    bodyWriter ->
-                                            bodyWriter
-                                                    // If value not null, extract it and
-                                                    // execute the cast
-                                                    .assignStmt(
-                                                            valueTerm,
-                                                            rowFieldReadAccess(
-                                                                    indexTerm,
-                                                                    valueArrayTerm,
-                                                                    valueType))
-                                                    .append(valueCast)
-                                                    .stmt(
-                                                            methodCall(
-                                                                    builderTerm,
-                                                                    "append",
-                                                                    valueCast.getReturnTerm()));
-                            loopBodyWriter
-                                    // Write the comma
-                                    .ifStmt(
-                                            indexTerm + " != 0",
-                                            thenBodyWriter ->
-                                                    thenBodyWriter.stmt(
-                                                            methodCall(
-                                                                    builderTerm,
-                                                                    "append",
-                                                                    strLiteral(", "))))
-                                    // Declare key and values variables
-                                    .declPrimitiveStmt(keyType, keyTerm)
-                                    .declStmt(
-                                            boolean.class,
-                                            keyIsNullTerm,
-                                            methodCall(keyArrayTerm, "isNullAt", indexTerm))
-                                    .declPrimitiveStmt(valueType, valueTerm)
-                                    .declStmt(
-                                            boolean.class,
-                                            valueIsNullTerm,
-                                            methodCall(valueArrayTerm, "isNullAt", indexTerm))
-                                    // Execute casting if inner key/value not null
-                                    .ifStmt(
-                                            "!" + keyIsNullTerm,
-                                            thenBodyWriter ->
-                                                    thenBodyWriter
-                                                            // If key not null, extract it and
+        final String resultStringTerm = newName("resultString");
+        final int length = LogicalTypeChecks.getLength(targetLogicalType);
+
+        CastRuleUtils.CodeWriter writer =
+                new CastRuleUtils.CodeWriter()
+                        .declStmt(ArrayData.class, keyArrayTerm, methodCall(inputTerm, "keyArray"))
+                        .declStmt(
+                                ArrayData.class,
+                                valueArrayTerm,
+                                methodCall(inputTerm, "valueArray"))
+                        .stmt(methodCall(builderTerm, "setLength", 0))
+                        .stmt(methodCall(builderTerm, "append", strLiteral("{")))
+                        .forStmt(
+                                methodCall(inputTerm, "size"),
+                                (indexTerm, loopBodyWriter) -> {
+                                    String keyTerm = newName("key");
+                                    String keyIsNullTerm = newName("keyIsNull");
+                                    String valueTerm = newName("value");
+                                    String valueIsNullTerm = newName("valueIsNull");
+
+                                    CastCodeBlock keyCast =
+                                            CastRuleProvider.generateCodeBlock(
+                                                    context,
+                                                    keyTerm,
+                                                    keyIsNullTerm,
+                                                    // Null check is done at the key array
+                                                    // access level
+                                                    keyType.copy(false),
+                                                    STRING_TYPE);
+                                    CastCodeBlock valueCast =
+                                            CastRuleProvider.generateCodeBlock(
+                                                    context,
+                                                    valueTerm,
+                                                    valueIsNullTerm,
+                                                    // Null check is done at the value array
+                                                    // access level
+                                                    valueType.copy(false),
+                                                    STRING_TYPE);
+
+                                    Consumer<CastRuleUtils.CodeWriter> appendNonNullValue =
+                                            bodyWriter ->
+                                                    bodyWriter
+                                                            // If value not null, extract it
+                                                            // and
                                                             // execute the cast
                                                             .assignStmt(
-                                                                    keyTerm,
+                                                                    valueTerm,
                                                                     rowFieldReadAccess(
                                                                             indexTerm,
-                                                                            keyArrayTerm,
-                                                                            keyType))
-                                                            .append(keyCast)
+                                                                            valueArrayTerm,
+                                                                            valueType))
+                                                            .append(valueCast)
                                                             .stmt(
                                                                     methodCall(
                                                                             builderTerm,
                                                                             "append",
-                                                                            keyCast
-                                                                                    .getReturnTerm())),
-                                            elseBodyWriter ->
-                                                    elseBodyWriter.stmt(
-                                                            methodCall(
-                                                                    builderTerm,
-                                                                    "append",
-                                                                    NULL_STR_LITERAL)))
-                                    .stmt(methodCall(builderTerm, "append", strLiteral("=")));
-                            if (inputLogicalType.is(LogicalTypeRoot.MULTISET)) {
-                                appendNonNullValue.accept(loopBodyWriter);
-                            } else {
-                                loopBodyWriter.ifStmt(
-                                        "!" + valueIsNullTerm,
-                                        appendNonNullValue,
-                                        elseBodyWriter ->
-                                                elseBodyWriter.stmt(
-                                                        methodCall(
-                                                                builderTerm,
-                                                                "append",
-                                                                NULL_STR_LITERAL)));
-                            }
-                        })
-                .stmt(methodCall(builderTerm, "append", strLiteral("}")))
+                                                                            valueCast
+                                                                                    .getReturnTerm()));
+                                    if (!context.legacyBehaviour() && couldTrim(length)) {
+                                        loopBodyWriter
+                                                // Break if the target length is already
+                                                // exceeded
+                                                .ifStmt(
+                                                stringExceedsLength(builderTerm, length),
+                                                thenBodyWriter -> thenBodyWriter.stmt("break"));
+                                    }
+                                    loopBodyWriter
+                                            // Write the comma
+                                            .ifStmt(
+                                                    indexTerm + " != 0",
+                                                    thenBodyWriter ->
+                                                            thenBodyWriter.stmt(
+                                                                    methodCall(
+                                                                            builderTerm,
+                                                                            "append",
+                                                                            strLiteral(", "))))
+                                            // Declare key and values variables
+                                            .declPrimitiveStmt(keyType, keyTerm)
+                                            .declStmt(
+                                                    boolean.class,
+                                                    keyIsNullTerm,
+                                                    methodCall(keyArrayTerm, "isNullAt", indexTerm))
+                                            .declPrimitiveStmt(valueType, valueTerm)
+                                            .declStmt(
+                                                    boolean.class,
+                                                    valueIsNullTerm,
+                                                    methodCall(
+                                                            valueArrayTerm, "isNullAt", indexTerm))
+                                            // Execute casting if inner key/value not null
+                                            .ifStmt(
+                                                    "!" + keyIsNullTerm,
+                                                    thenBodyWriter ->
+                                                            thenBodyWriter
+                                                                    // If key not null,
+                                                                    // extract it and
+                                                                    // execute the cast
+                                                                    .assignStmt(
+                                                                            keyTerm,
+                                                                            rowFieldReadAccess(
+                                                                                    indexTerm,
+                                                                                    keyArrayTerm,
+                                                                                    keyType))
+                                                                    .append(keyCast)
+                                                                    .stmt(
+                                                                            methodCall(
+                                                                                    builderTerm,
+                                                                                    "append",
+                                                                                    keyCast
+                                                                                            .getReturnTerm())),
+                                                    elseBodyWriter ->
+                                                            elseBodyWriter.stmt(
+                                                                    methodCall(
+                                                                            builderTerm,
+                                                                            "append",
+                                                                            NULL_STR_LITERAL)))
+                                            .stmt(
+                                                    methodCall(
+                                                            builderTerm,
+                                                            "append",
+                                                            strLiteral("=")));
+                                    if (inputLogicalType.is(LogicalTypeRoot.MULTISET)) {
+                                        appendNonNullValue.accept(loopBodyWriter);
+                                    } else {
+                                        loopBodyWriter.ifStmt(
+                                                "!" + valueIsNullTerm,
+                                                appendNonNullValue,
+                                                elseBodyWriter ->
+                                                        elseBodyWriter.stmt(
+                                                                methodCall(
+                                                                        builderTerm,
+                                                                        "append",
+                                                                        NULL_STR_LITERAL)));
+                                    }
+                                })
+                        .stmt(methodCall(builderTerm, "append", strLiteral("}")));
+
+        return CharVarCharTrimPadCastRule.padAndTrimStringIfNeeded(
+                        writer,
+                        targetLogicalType,
+                        context.legacyBehaviour(),
+                        length,
+                        resultStringTerm,
+                        builderTerm)
                 // Assign the result value
                 .assignStmt(
                         returnVariable,
                         CastRuleUtils.staticCall(
-                                BINARY_STRING_DATA_FROM_STRING(),
-                                methodCall(builderTerm, "toString")))
+                                BINARY_STRING_DATA_FROM_STRING(), resultStringTerm))
                 .toString();
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/NumericToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/NumericToStringCastRule.java
index d83741e..c6a03c2 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/NumericToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/NumericToStringCastRule.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.types.logical.LogicalTypeFamily;
 
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.EMPTY_STR_LITERAL;
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.stringConcat;
+import static org.apache.flink.table.types.logical.VarCharType.STRING_TYPE;
 
 /** {@link LogicalTypeFamily#NUMERIC} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
 class NumericToStringCastRule extends AbstractCharacterFamilyTargetRule<Object> {
@@ -33,7 +34,7 @@ class NumericToStringCastRule extends AbstractCharacterFamilyTargetRule<Object>
         super(
                 CastRulePredicate.builder()
                         .input(LogicalTypeFamily.NUMERIC)
-                        .target(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(STRING_TYPE)
                         .build());
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RawToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RawToStringCastRule.java
index 3301f5f..454a7ab 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RawToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RawToStringCastRule.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.table.planner.functions.casting;
 
+import org.apache.flink.table.planner.codegen.CodeGenUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 
 import static org.apache.flink.table.codesplit.CodeSplitUtil.newName;
 import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
@@ -39,6 +41,38 @@ class RawToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<Object,
                         .build());
     }
 
+    /* Example RAW(LocalDateTime.class) -> CHAR(12)
+
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        java.lang.Object deserializedObj$0 = _myInput.toObject(typeSerializer$2);
+        if (deserializedObj$0 != null) {
+            java.lang.String resultString$1;
+            resultString$1 = deserializedObj$0.toString().toString();
+            if (deserializedObj$0.toString().length() > 12) {
+                resultString$1 = deserializedObj$0.toString().substring(0, java.lang.Math.min(deserializedObj$0.toString().length(), 12));
+            } else {
+                if (resultString$1.length() < 12) {
+                    int padLength$2;
+                    padLength$2 = 12 - resultString$1.length();
+                    java.lang.StringBuilder sbPadding$3;
+                    sbPadding$3 = new java.lang.StringBuilder();
+                    for (int i$4 = 0; i$4 < padLength$2; i$4++) {
+                        sbPadding$3.append(" ");
+                    }
+                    resultString$1 = resultString$1 + sbPadding$3.toString();
+                }
+            }
+            result$1 = org.apache.flink.table.data.binary.BinaryStringData.fromString(resultString$1);
+        } else {
+            result$1 = null;
+        }
+        isNull$0 = result$1 == null;
+    } else {
+        result$1 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+    }
+
+     */
     @Override
     protected String generateCodeBlockInternal(
             CodeGeneratorCastRule.Context context,
@@ -49,6 +83,9 @@ class RawToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<Object,
         final String typeSerializer = context.declareTypeSerializer(inputLogicalType);
         final String deserializedObjTerm = newName("deserializedObj");
 
+        final String resultStringTerm = CodeGenUtils.newName("resultString");
+        final int length = LogicalTypeChecks.getLength(targetLogicalType);
+
         return new CastRuleUtils.CodeWriter()
                 .declStmt(
                         Object.class,
@@ -57,11 +94,18 @@ class RawToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<Object,
                 .ifStmt(
                         deserializedObjTerm + " != null",
                         thenWriter ->
-                                thenWriter.assignStmt(
-                                        returnVariable,
-                                        CastRuleUtils.staticCall(
-                                                BINARY_STRING_DATA_FROM_STRING(),
-                                                methodCall(deserializedObjTerm, "toString"))),
+                                CharVarCharTrimPadCastRule.padAndTrimStringIfNeeded(
+                                                thenWriter,
+                                                targetLogicalType,
+                                                context.legacyBehaviour(),
+                                                length,
+                                                resultStringTerm,
+                                                methodCall(deserializedObjTerm, "toString"))
+                                        .assignStmt(
+                                                returnVariable,
+                                                CastRuleUtils.staticCall(
+                                                        BINARY_STRING_DATA_FROM_STRING(),
+                                                        resultStringTerm)),
                         elseWriter -> elseWriter.assignStmt(returnVariable, "null"))
                 .toString();
     }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java
index 3c38f5d..d206618 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 
 import java.util.List;
@@ -54,32 +55,55 @@ class RowToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayDa
                         .allMatch(fieldType -> CastRuleProvider.exists(fieldType, target));
     }
 
-    /* Example generated code for ROW<`f0` INT, `f1` STRING>:
+    /* Example generated code for ROW<`f0` INT, `f1` STRING> -> CHAR(12):
 
     isNull$0 = _myInputIsNull;
     if (!isNull$0) {
         builder$1.setLength(0);
         builder$1.append("(");
-        int f0Value$2 = -1;
-        boolean f0IsNull$3 = _myInput.isNullAt(0);
-        if (!f0IsNull$3) {
-            f0Value$2 = _myInput.getInt(0);
-            result$2 = org.apache.flink.table.data.binary.BinaryStringData.fromString("" + f0Value$2);
-            builder$1.append(result$2);
+        int f0Value$3 = -1;
+        boolean f0IsNull$4 = _myInput.isNullAt(0);
+        if (!f0IsNull$4) {
+            f0Value$3 = _myInput.getInt(0);
+            isNull$2 = f0IsNull$4;
+            if (!isNull$2) {
+                result$3 = org.apache.flink.table.data.binary.BinaryStringData.fromString("" + f0Value$3);
+                isNull$2 = result$3 == null;
+            } else {
+                result$3 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+            }
+            builder$1.append(result$3);
         } else {
             builder$1.append("null");
         }
-        builder$1.append(",");
-        org.apache.flink.table.data.binary.BinaryStringData f1Value$4 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
-        boolean f1IsNull$5 = _myInput.isNullAt(1);
-        if (!f1IsNull$5) {
-            f1Value$4 = ((org.apache.flink.table.data.binary.BinaryStringData) _myInput.getString(1));
-            builder$1.append(f1Value$4);
+        builder$1.append(", ");
+        org.apache.flink.table.data.binary.BinaryStringData f1Value$5 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+        boolean f1IsNull$6 = _myInput.isNullAt(1);
+        if (!f1IsNull$6) {
+            f1Value$5 = ((org.apache.flink.table.data.binary.BinaryStringData) _myInput.getString(1));
+            builder$1.append(f1Value$5);
         } else {
             builder$1.append("null");
         }
         builder$1.append(")");
-        result$1 = org.apache.flink.table.data.binary.BinaryStringData.fromString(builder$1.toString());
+        java.lang.String resultString$2;
+        resultString$2 = builder$1.toString();
+        if (builder$1.length() > 12) {
+            resultString$2 = builder$1.substring(0, java.lang.Math.min(builder$1.length(), 12));
+        } else {
+            if (resultString$2.length() < 12) {
+                int padLength$7;
+                padLength$7 = 12 - resultString$2.length();
+                java.lang.StringBuilder sbPadding$8;
+                sbPadding$8 = new java.lang.StringBuilder();
+                for (int i$9 = 0; i$9 < padLength$7; i$9++) {
+                    sbPadding$8.append(" ");
+                }
+                resultString$2 = resultString$2 + sbPadding$8.toString();
+            }
+        }
+        result$1 = org.apache.flink.table.data.binary.BinaryStringData.fromString(resultString$2);
+        isNull$0 = result$1 == null;
     } else {
         result$1 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
     }
@@ -98,6 +122,13 @@ class RowToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayDa
         context.declareClassField(
                 className(StringBuilder.class), builderTerm, constructorCall(StringBuilder.class));
 
+        final String resultStringTerm = newName("resultString");
+        final int length = LogicalTypeChecks.getLength(targetLogicalType);
+        final LogicalType targetTypeForElementCast =
+                targetLogicalType.is(LogicalTypeFamily.CHARACTER_STRING)
+                        ? VarCharType.STRING_TYPE
+                        : targetLogicalType;
+
         final CastRuleUtils.CodeWriter writer =
                 new CastRuleUtils.CodeWriter()
                         .stmt(methodCall(builderTerm, "setLength", 0))
@@ -117,7 +148,7 @@ class RowToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayDa
                             fieldIsNullTerm,
                             // Null check is done at the row access level
                             fieldType.copy(false),
-                            targetLogicalType);
+                            targetTypeForElementCast);
 
             // Write the comma
             if (fieldIndex != 0) {
@@ -154,18 +185,23 @@ class RowToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayDa
                                             methodCall(builderTerm, "append", NULL_STR_LITERAL)));
         }
 
-        writer.stmt(methodCall(builderTerm, "append", strLiteral(")")))
+        writer.stmt(methodCall(builderTerm, "append", strLiteral(")")));
+        return CharVarCharTrimPadCastRule.padAndTrimStringIfNeeded(
+                        writer,
+                        targetLogicalType,
+                        context.legacyBehaviour(),
+                        length,
+                        resultStringTerm,
+                        builderTerm)
                 // Assign the result value
                 .assignStmt(
                         returnVariable,
                         CastRuleUtils.staticCall(
-                                BINARY_STRING_DATA_FROM_STRING(),
-                                methodCall(builderTerm, "toString")));
-
-        return writer.toString();
+                                BINARY_STRING_DATA_FROM_STRING(), resultStringTerm))
+                .toString();
     }
 
-    private String getDelimiter(CodeGeneratorCastRule.Context context) {
+    private static String getDelimiter(CodeGeneratorCastRule.Context context) {
         final String comma;
         if (context.legacyBehaviour()) {
             comma = strLiteral(",");
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimeToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimeToStringCastRule.java
index cc07a79..e9bdc6b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimeToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimeToStringCastRule.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 
 import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.UNIX_TIME_TO_STRING;
+import static org.apache.flink.table.types.logical.VarCharType.STRING_TYPE;
 
 /**
  * {@link LogicalTypeRoot#TIME_WITHOUT_TIME_ZONE} to {@link LogicalTypeFamily#CHARACTER_STRING} cast
@@ -37,7 +38,7 @@ class TimeToStringCastRule extends AbstractCharacterFamilyTargetRule<Long> {
         super(
                 CastRulePredicate.builder()
                         .input(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE)
-                        .target(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(STRING_TYPE)
                         .build());
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimestampToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimestampToStringCastRule.java
index 965b378..0385d44 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimestampToStringCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimestampToStringCastRule.java
@@ -29,6 +29,7 @@ import org.apache.calcite.avatica.util.DateTimeUtils;
 import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.TIMESTAMP_TO_STRING_TIME_ZONE;
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.accessStaticField;
 import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+import static org.apache.flink.table.types.logical.VarCharType.STRING_TYPE;
 
 /** {@link LogicalTypeFamily#TIMESTAMP} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
 class TimestampToStringCastRule extends AbstractCharacterFamilyTargetRule<TimestampData> {
@@ -39,7 +40,7 @@ class TimestampToStringCastRule extends AbstractCharacterFamilyTargetRule<Timest
         super(
                 CastRulePredicate.builder()
                         .input(LogicalTypeFamily.TIMESTAMP)
-                        .target(LogicalTypeFamily.CHARACTER_STRING)
+                        .target(STRING_TYPE)
                         .build());
     }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala
index 5fe1dd1..9eec6b5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.codegen.calls
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, primitiveDefaultValue, primitiveTypeTermForType}
 import org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens.toCodegenCastContext
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedExpression}
-import org.apache.flink.table.planner.functions.casting.{CastRuleProvider, ExpressionCodeGeneratorCastRule}
+import org.apache.flink.table.planner.functions.casting.{CastCodeBlock, CastRuleProvider, CodeGeneratorCastRule, ExpressionCodeGeneratorCastRule}
 import org.apache.flink.table.types.logical.LogicalType
 
 /**
@@ -54,18 +54,20 @@ class IfCallGen() extends CallGenerator {
     val resultCode =
       s"""
          |// --- Start code generated by ${className[IfCallGen]}
+         |${castedResultTerm1.getCode}
+         |${castedResultTerm2.getCode}
          |${operands.head.code}
          |$resultTerm = $resultDefault;
          |if (${operands.head.resultTerm}) {
          |  ${operands(1).code}
          |  if (!${operands(1).nullTerm}) {
-         |    $resultTerm = $castedResultTerm1;
+         |    $resultTerm = ${castedResultTerm1.getReturnTerm};
          |  }
          |  $nullTerm = ${operands(1).nullTerm};
          |} else {
          |  ${operands(2).code}
          |  if (!${operands(2).nullTerm}) {
-         |    $resultTerm = $castedResultTerm2;
+         |    $resultTerm = ${castedResultTerm2.getReturnTerm};
          |  }
          |  $nullTerm = ${operands(2).nullTerm};
          |}
@@ -80,13 +82,24 @@ class IfCallGen() extends CallGenerator {
    * or null if no casting can be performed
    */
   private def normalizeArgument(
-    ctx: CodeGeneratorContext, expr: GeneratedExpression, targetType: LogicalType): String = {
+      ctx: CodeGeneratorContext,
+      expr: GeneratedExpression,
+      targetType: LogicalType): CastCodeBlock = {
+
       val rule = CastRuleProvider.resolve(expr.resultType, targetType)
       rule match {
         case codeGeneratorCastRule: ExpressionCodeGeneratorCastRule[_, _] =>
-          codeGeneratorCastRule.generateExpression(
+          CastCodeBlock.withoutCode(codeGeneratorCastRule.generateExpression(
+            toCodegenCastContext(ctx),
+            expr.resultTerm,
+            expr.resultType,
+            targetType
+          ), expr.nullTerm)
+        case codeGeneratorCastRule: CodeGeneratorCastRule[_, _] =>
+          codeGeneratorCastRule.generateCodeBlock(
             toCodegenCastContext(ctx),
             expr.resultTerm,
+            expr.nullTerm,
             expr.resultType,
             targetType
           )
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
index f21a26d..ae5481b 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.functions;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.types.AbstractDataType;
@@ -70,6 +71,7 @@ import static org.apache.flink.table.api.DataTypes.VARBINARY;
 import static org.apache.flink.table.api.DataTypes.VARCHAR;
 import static org.apache.flink.table.api.DataTypes.YEAR;
 import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.LegacyCastBehaviour;
 
 /** Tests for {@link BuiltInFunctionDefinitions#CAST}. */
 public class CastFunctionITCase extends BuiltInFunctionTestBase {
@@ -107,7 +109,11 @@ public class CastFunctionITCase extends BuiltInFunctionTestBase {
 
     @Override
     protected Configuration configuration() {
-        return super.configuration().set(TableConfigOptions.LOCAL_TIME_ZONE, TEST_TZ.getId());
+        return super.configuration()
+                .set(TableConfigOptions.LOCAL_TIME_ZONE, TEST_TZ.getId())
+                .set(
+                        ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR,
+                        LegacyCastBehaviour.DISABLED);
     }
 
     @Parameterized.Parameters(name = "{index}: {0}")
@@ -125,27 +131,22 @@ public class CastFunctionITCase extends BuiltInFunctionTestBase {
                 CastTestSpecBuilder.testCastTo(CHAR(3))
                         .fromCase(CHAR(5), null, null)
                         .fromCase(CHAR(3), "foo", "foo")
-                        .fromCase(CHAR(4), "foo", "foo ")
-                        .fromCase(CHAR(4), "foo ", "foo ")
                         .fromCase(VARCHAR(3), "foo", "foo")
                         .fromCase(VARCHAR(5), "foo", "foo")
-                        .fromCase(VARCHAR(5), "foo ", "foo ")
-                        // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
-                        // in this case down to 3 chars
-                        .fromCase(STRING(), "abcdef", "abcdef") // "abc"
-                        .fromCase(DATE(), DEFAULT_DATE, "2021-09-24") // "202"
+                        .fromCase(STRING(), "abcdef", "abc")
+                        .fromCase(DATE(), DEFAULT_DATE, "202")
+                        .build(),
+                CastTestSpecBuilder.testCastTo(CHAR(5))
+                        .fromCase(CHAR(5), null, null)
+                        .fromCase(CHAR(3), "foo", "foo  ")
                         .build(),
                 CastTestSpecBuilder.testCastTo(VARCHAR(3))
                         .fromCase(VARCHAR(5), null, null)
                         .fromCase(CHAR(3), "foo", "foo")
-                        .fromCase(CHAR(4), "foo", "foo ")
-                        .fromCase(CHAR(4), "foo ", "foo ")
+                        .fromCase(CHAR(4), "foo", "foo")
                         .fromCase(VARCHAR(3), "foo", "foo")
                         .fromCase(VARCHAR(5), "foo", "foo")
-                        .fromCase(VARCHAR(5), "foo ", "foo ")
-                        // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
-                        // in this case down to 3 chars
-                        .fromCase(STRING(), "abcdef", "abcdef")
+                        .fromCase(STRING(), "abcdef", "abc")
                         .build(),
                 CastTestSpecBuilder.testCastTo(STRING())
                         .fromCase(STRING(), null, null)
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRuleProviderTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRuleProviderTest.java
index 5ee289a..97778b9 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRuleProviderTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRuleProviderTest.java
@@ -20,13 +20,16 @@ package org.apache.flink.table.planner.functions.casting;
 
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.DistinctType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
 
 import org.junit.jupiter.api.Test;
 
 import static org.apache.flink.table.api.DataTypes.BIGINT;
 import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.types.logical.VarCharType.STRING_TYPE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 class CastRuleProviderTest {
@@ -58,4 +61,20 @@ class CastRuleProviderTest {
         assertThat(CastRuleProvider.resolve(new ArrayType(INT), new ArrayType(DISTINCT_BIG_INT)))
                 .isSameAs(ArrayToArrayCastRule.INSTANCE);
     }
+
+    @Test
+    void testResolvePredefinedToString() {
+        assertThat(CastRuleProvider.resolve(INT, new VarCharType(10)))
+                .isSameAs(CharVarCharTrimPadCastRule.INSTANCE);
+        assertThat(CastRuleProvider.resolve(INT, new CharType(10)))
+                .isSameAs(CharVarCharTrimPadCastRule.INSTANCE);
+        assertThat(CastRuleProvider.resolve(INT, STRING_TYPE))
+                .isSameAs(NumericToStringCastRule.INSTANCE);
+    }
+
+    @Test
+    void testResolveConstructedToString() {
+        assertThat(CastRuleProvider.resolve(new ArrayType(INT), new VarCharType(10)))
+                .isSameAs(ArrayToStringCastRule.INSTANCE);
+    }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
index 99cbfb7..76cd6d9 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.functions.casting;
 
+import org.apache.flink.api.common.typeutils.base.LocalDateSerializer;
 import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.GenericArrayData;
@@ -88,6 +89,7 @@ import static org.apache.flink.table.api.DataTypes.VARCHAR;
 import static org.apache.flink.table.api.DataTypes.YEAR;
 import static org.apache.flink.table.data.DecimalData.fromBigDecimal;
 import static org.apache.flink.table.data.StringData.fromString;
+import static org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -101,6 +103,8 @@ class CastRulesTest {
 
     private static final CastRule.Context CET_CONTEXT =
             CastRule.Context.create(false, CET, Thread.currentThread().getContextClassLoader());
+    private static final CastRule.Context CET_CONTEXT_LEGACY =
+            CastRule.Context.create(true, CET, Thread.currentThread().getContextClassLoader());
 
     private static final byte DEFAULT_POSITIVE_TINY_INT = (byte) 5;
     private static final byte DEFAULT_NEGATIVE_TINY_INT = (byte) -5;
@@ -606,6 +610,334 @@ class CastRulesTest {
                                                     fromString("c")
                                                 })),
                                 fromString("(10, null, 12:34:56.123, [a, b, c])")),
+                CastTestSpecBuilder.testCastTo(CHAR(6))
+                        .fromCase(STRING(), null, EMPTY_UTF8, false)
+                        .fromCase(STRING(), null, EMPTY_UTF8, true)
+                        .fromCase(CHAR(6), fromString("Apache"), fromString("Apache"), false)
+                        .fromCase(CHAR(6), fromString("Apache"), fromString("Apache"), true)
+                        .fromCase(VARCHAR(5), fromString("Flink"), fromString("Flink "), false)
+                        .fromCase(VARCHAR(5), fromString("Flink"), fromString("Flink"), true)
+                        .fromCase(STRING(), fromString("foo"), fromString("foo   "), false)
+                        .fromCase(STRING(), fromString("foo"), fromString("foo"), true)
+                        .fromCase(BOOLEAN(), true, fromString("true  "), false)
+                        .fromCase(BOOLEAN(), true, fromString("true"), true)
+                        .fromCase(BOOLEAN(), false, fromString("false "), false)
+                        .fromCase(BOOLEAN(), false, fromString("false"), true)
+                        .fromCase(
+                                BINARY(3),
+                                new byte[] {0, 1, 2},
+                                fromString("\u0000\u0001\u0002   "),
+                                false)
+                        .fromCase(
+                                BINARY(3),
+                                new byte[] {0, 1, 2},
+                                fromString("\u0000\u0001\u0002"),
+                                true)
+                        .fromCase(
+                                VARBINARY(4),
+                                new byte[] {0, 1, 2, 3},
+                                fromString("\u0000\u0001\u0002\u0003  "),
+                                false)
+                        .fromCase(
+                                VARBINARY(4),
+                                new byte[] {0, 1, 2, 3},
+                                fromString("\u0000\u0001\u0002\u0003"),
+                                true)
+                        .fromCase(
+                                BYTES(),
+                                new byte[] {0, 1, 2, 3, 4},
+                                fromString("\u0000\u0001\u0002\u0003\u0004 "),
+                                false)
+                        .fromCase(
+                                BYTES(),
+                                new byte[] {0, 1, 2, 3, 4},
+                                fromString("\u0000\u0001\u0002\u0003\u0004"),
+                                true)
+                        .fromCase(TINYINT(), (byte) -125, fromString("-125  "), false)
+                        .fromCase(TINYINT(), (byte) -125, fromString("-125"), true)
+                        .fromCase(SMALLINT(), (short) 32767, fromString("32767 "), false)
+                        .fromCase(SMALLINT(), (short) 32767, fromString("32767"), true)
+                        .fromCase(INT(), -1234, fromString("-1234 "), false)
+                        .fromCase(INT(), -1234, fromString("-1234"), true)
+                        .fromCase(BIGINT(), 12345L, fromString("12345 "), false)
+                        .fromCase(BIGINT(), 12345L, fromString("12345"), true)
+                        .fromCase(FLOAT(), -1.23f, fromString("-1.23 "), false)
+                        .fromCase(FLOAT(), -1.23f, fromString("-1.23"), true)
+                        .fromCase(DOUBLE(), 123.4d, fromString("123.4 "), false)
+                        .fromCase(DOUBLE(), 123.4d, fromString("123.4"), true)
+                        .fromCase(INTERVAL(YEAR()), 84, fromString("+7-00 "), false)
+                        .fromCase(INTERVAL(YEAR()), 84, fromString("+7-00"), true)
+                        .fromCase(INTERVAL(MONTH()), 5, fromString("+0-05 "), false)
+                        .fromCase(INTERVAL(MONTH()), 5, fromString("+0-05"), true),
+                CastTestSpecBuilder.testCastTo(CHAR(12))
+                        .fromCase(
+                                ARRAY(INT()),
+                                new GenericArrayData(new int[] {-1, 2, 3}),
+                                fromString("[-1, 2, 3]  "),
+                                false)
+                        .fromCase(
+                                ARRAY(INT()),
+                                new GenericArrayData(new int[] {-1, 2, 3}),
+                                fromString("[-1, 2, 3]"),
+                                true)
+                        .fromCase(ARRAY(INT()).nullable(), null, EMPTY_UTF8, false)
+                        .fromCase(ARRAY(INT()).nullable(), null, EMPTY_UTF8, true)
+                        .fromCase(
+                                MAP(STRING(), INT()),
+                                mapData(entry(fromString("a"), 1), entry(fromString("b"), 8)),
+                                fromString("{a=1, b=8}  "),
+                                false)
+                        .fromCase(
+                                MAP(STRING(), INT()),
+                                mapData(entry(fromString("a"), 1), entry(fromString("b"), 8)),
+                                fromString("{a=1, b=8}"),
+                                true)
+                        .fromCase(
+                                MAP(STRING(), INTERVAL(MONTH())).nullable(), null, EMPTY_UTF8, true)
+                        .fromCase(
+                                MAP(STRING(), INTERVAL(MONTH())).nullable(),
+                                null,
+                                EMPTY_UTF8,
+                                false)
+                        .fromCase(
+                                MULTISET(STRING()),
+                                mapData(entry(fromString("a"), 1), entry(fromString("b"), 1)),
+                                fromString("{a=1, b=1}  "),
+                                false)
+                        .fromCase(
+                                MULTISET(STRING()),
+                                mapData(entry(fromString("a"), 1), entry(fromString("b"), 1)),
+                                fromString("{a=1, b=1}"),
+                                true)
+                        .fromCase(MULTISET(STRING()).nullable(), null, EMPTY_UTF8, false)
+                        .fromCase(MULTISET(STRING()), null, EMPTY_UTF8, true)
+                        .fromCase(
+                                ROW(FIELD("f0", INT()), FIELD("f1", STRING())),
+                                GenericRowData.of(123, fromString("foo")),
+                                fromString("(123, foo)  "),
+                                false)
+                        .fromCase(
+                                ROW(FIELD("f0", INT()), FIELD("f1", STRING())),
+                                GenericRowData.of(123, fromString("foo")),
+                                fromString("(123,foo)"),
+                                true)
+                        .fromCase(
+                                ROW(FIELD("f0", STRING()), FIELD("f1", STRING())).nullable(),
+                                null,
+                                EMPTY_UTF8,
+                                false)
+                        .fromCase(
+                                ROW(FIELD("f0", STRING()), FIELD("f1", STRING())).nullable(),
+                                null,
+                                EMPTY_UTF8,
+                                true)
+                        .fromCase(
+                                RAW(LocalDate.class, new LocalDateSerializer()),
+                                RawValueData.fromObject(LocalDate.parse("2020-12-09")),
+                                fromString("2020-12-09  "),
+                                false)
+                        .fromCase(
+                                RAW(LocalDate.class, new LocalDateSerializer()),
+                                RawValueData.fromObject(LocalDate.parse("2020-12-09")),
+                                fromString("2020-12-09"),
+                                true)
+                        .fromCase(
+                                RAW(LocalDateTime.class, new LocalDateTimeSerializer()).nullable(),
+                                null,
+                                EMPTY_UTF8,
+                                false)
+                        .fromCase(
+                                RAW(LocalDateTime.class, new LocalDateTimeSerializer()).nullable(),
+                                null,
+                                EMPTY_UTF8,
+                                true),
+                CastTestSpecBuilder.testCastTo(VARCHAR(3))
+                        .fromCase(STRING(), null, EMPTY_UTF8, false)
+                        .fromCase(STRING(), null, EMPTY_UTF8, true)
+                        .fromCase(CHAR(6), fromString("Apache"), fromString("Apa"), false)
+                        .fromCase(CHAR(6), fromString("Apache"), fromString("Apache"), true)
+                        .fromCase(VARCHAR(5), fromString("Flink"), fromString("Fli"), false)
+                        .fromCase(VARCHAR(5), fromString("Flink"), fromString("Flink"), true)
+                        .fromCase(STRING(), fromString("Apache Flink"), fromString("Apa"), false)
+                        .fromCase(
+                                STRING(),
+                                fromString("Apache Flink"),
+                                fromString("Apache Flink"),
+                                true)
+                        .fromCase(BOOLEAN(), true, fromString("tru"), false)
+                        .fromCase(BOOLEAN(), true, fromString("true"), true)
+                        .fromCase(BOOLEAN(), false, fromString("fal"), false)
+                        .fromCase(BOOLEAN(), false, fromString("false"), true)
+                        .fromCase(
+                                BINARY(5),
+                                new byte[] {0, 1, 2, 3, 4},
+                                fromString("\u0000\u0001\u0002"),
+                                false)
+                        .fromCase(
+                                BINARY(5),
+                                new byte[] {0, 1, 2, 3, 4},
+                                fromString("\u0000\u0001\u0002\u0003\u0004"),
+                                true)
+                        .fromCase(
+                                VARBINARY(5),
+                                new byte[] {0, 1, 2, 3, 4},
+                                fromString("\u0000\u0001\u0002"),
+                                false)
+                        .fromCase(
+                                VARBINARY(5),
+                                new byte[] {0, 1, 2, 3, 4},
+                                fromString("\u0000\u0001\u0002\u0003\u0004"),
+                                true)
+                        .fromCase(
+                                BYTES(),
+                                new byte[] {0, 1, 2, 3, 4},
+                                fromString("\u0000\u0001\u0002"),
+                                false)
+                        .fromCase(
+                                BYTES(),
+                                new byte[] {0, 1, 2, 3, 4},
+                                fromString("\u0000\u0001\u0002\u0003\u0004"),
+                                true)
+                        .fromCase(
+                                DECIMAL(4, 3),
+                                fromBigDecimal(new BigDecimal("9.8765"), 5, 4),
+                                fromString("9.8"),
+                                false)
+                        .fromCase(
+                                DECIMAL(4, 3),
+                                fromBigDecimal(new BigDecimal("9.8765"), 5, 4),
+                                fromString("9.8765"),
+                                true)
+                        .fromCase(TINYINT(), (byte) -125, fromString("-12"), false)
+                        .fromCase(TINYINT(), (byte) -125, fromString("-125"), true)
+                        .fromCase(SMALLINT(), (short) 32767, fromString("327"), false)
+                        .fromCase(SMALLINT(), (short) 32767, fromString("32767"), true)
+                        .fromCase(INT(), -12345678, fromString("-12"), false)
+                        .fromCase(INT(), -12345678, fromString("-12345678"), true)
+                        .fromCase(BIGINT(), 1234567891234L, fromString("123"), false)
+                        .fromCase(BIGINT(), 1234567891234L, fromString("1234567891234"), true)
+                        .fromCase(FLOAT(), -123.456f, fromString("-12"), false)
+                        .fromCase(FLOAT(), -123.456f, fromString("-123.456"), true)
+                        .fromCase(DOUBLE(), 12345.678901d, fromString("123"), false)
+                        .fromCase(DOUBLE(), 12345.678901d, fromString("12345.678901"), true)
+                        .fromCase(FLOAT(), Float.MAX_VALUE, fromString("3.4"), false)
+                        .fromCase(
+                                FLOAT(),
+                                Float.MAX_VALUE,
+                                fromString(String.valueOf(Float.MAX_VALUE)),
+                                true)
+                        .fromCase(DOUBLE(), Double.MAX_VALUE, fromString("1.7"), false)
+                        .fromCase(
+                                DOUBLE(),
+                                Double.MAX_VALUE,
+                                fromString(String.valueOf(Double.MAX_VALUE)),
+                                true)
+                        .fromCase(TIMESTAMP(), TIMESTAMP, fromString("202"), false)
+                        .fromCase(TIMESTAMP(), TIMESTAMP, TIMESTAMP_STRING, true)
+                        .fromCase(TIMESTAMP_LTZ(), CET_CONTEXT, TIMESTAMP, fromString("202"))
+                        .fromCase(
+                                TIMESTAMP_LTZ(),
+                                CET_CONTEXT_LEGACY,
+                                TIMESTAMP,
+                                TIMESTAMP_STRING_CET)
+                        .fromCase(DATE(), DATE, fromString("202"), false)
+                        .fromCase(DATE(), DATE, DATE_STRING, true)
+                        .fromCase(TIME(5), TIME, fromString("12:"), false)
+                        .fromCase(TIME(5), TIME, TIME_STRING, true)
+                        .fromCase(INTERVAL(YEAR()), 84, fromString("+7-"), false)
+                        .fromCase(INTERVAL(YEAR()), 84, fromString("+7-00"), true)
+                        .fromCase(INTERVAL(MONTH()), 5, fromString("+0-"), false)
+                        .fromCase(INTERVAL(MONTH()), 5, fromString("+0-05"), true)
+                        .fromCase(INTERVAL(DAY()), 10L, fromString("+0 "), false)
+                        .fromCase(INTERVAL(DAY()), 10L, fromString("+0 00:00:00.010"), true)
+                        .fromCase(
+                                ARRAY(INT()),
+                                new GenericArrayData(new int[] {-123, 456}),
+                                fromString("[-1"),
+                                false)
+                        .fromCase(
+                                ARRAY(INT()),
+                                new GenericArrayData(new int[] {-123, 456}),
+                                fromString("[-123, 456]"),
+                                true)
+                        .fromCase(ARRAY(INT()).nullable(), null, EMPTY_UTF8, false)
+                        .fromCase(ARRAY(INT()).nullable(), null, EMPTY_UTF8, true)
+                        .fromCase(
+                                MAP(STRING(), INTERVAL(MONTH())),
+                                mapData(entry(fromString("a"), -123), entry(fromString("b"), 123)),
+                                fromString("{a="),
+                                false)
+                        .fromCase(
+                                MAP(STRING(), INTERVAL(MONTH())),
+                                mapData(entry(fromString("a"), -123), entry(fromString("b"), 123)),
+                                fromString("{a=-10-03, b=+10-03}"),
+                                true)
+                        .fromCase(
+                                MAP(STRING(), INTERVAL(MONTH())).nullable(),
+                                null,
+                                EMPTY_UTF8,
+                                false)
+                        .fromCase(
+                                MAP(STRING(), INTERVAL(MONTH())).nullable(), null, EMPTY_UTF8, true)
+                        .fromCase(
+                                MAP(STRING(), INTERVAL(MONTH())).nullable(),
+                                null,
+                                EMPTY_UTF8,
+                                false)
+                        .fromCase(
+                                MULTISET(STRING()),
+                                mapData(entry(fromString("a"), 1), entry(fromString("b"), 1)),
+                                fromString("{a="),
+                                false)
+                        .fromCase(
+                                MULTISET(STRING()),
+                                mapData(entry(fromString("a"), 1), entry(fromString("b"), 1)),
+                                fromString("{a=1, b=1}"),
+                                true)
+                        .fromCase(MULTISET(STRING()).nullable(), null, EMPTY_UTF8, false)
+                        .fromCase(MULTISET(STRING()), null, EMPTY_UTF8, true)
+                        .fromCase(
+                                ROW(FIELD("f0", INT()), FIELD("f1", STRING())),
+                                GenericRowData.of(123, fromString("abc")),
+                                fromString("(12"),
+                                false)
+                        .fromCase(
+                                ROW(FIELD("f0", INT()), FIELD("f1", STRING())),
+                                GenericRowData.of(123, fromString("abc")),
+                                fromString("(123,abc)"),
+                                true)
+                        .fromCase(
+                                ROW(FIELD("f0", STRING()), FIELD("f1", STRING())).nullable(),
+                                null,
+                                EMPTY_UTF8,
+                                false)
+                        .fromCase(
+                                ROW(FIELD("f0", STRING()), FIELD("f1", STRING())).nullable(),
+                                null,
+                                EMPTY_UTF8,
+                                true)
+                        .fromCase(
+                                RAW(LocalDateTime.class, new LocalDateTimeSerializer()),
+                                RawValueData.fromObject(
+                                        LocalDateTime.parse("2020-11-11T18:08:01.123")),
+                                fromString("202"),
+                                false)
+                        .fromCase(
+                                RAW(LocalDateTime.class, new LocalDateTimeSerializer()),
+                                RawValueData.fromObject(
+                                        LocalDateTime.parse("2020-11-11T18:08:01.123")),
+                                fromString("2020-11-11T18:08:01.123"),
+                                true)
+                        .fromCase(
+                                RAW(LocalDateTime.class, new LocalDateTimeSerializer()).nullable(),
+                                null,
+                                EMPTY_UTF8,
+                                false)
+                        .fromCase(
+                                RAW(LocalDateTime.class, new LocalDateTimeSerializer()).nullable(),
+                                null,
+                                EMPTY_UTF8,
+                                true),
                 CastTestSpecBuilder.testCastTo(BOOLEAN())
                         .fromCase(BOOLEAN(), null, null)
                         .fail(CHAR(3), fromString("foo"), TableException.class)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 396ebca..410b5ed 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -4303,10 +4303,10 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
     val url = "CAST('http://user:pass@host' AS VARCHAR(50))"
     val base64 = "CAST('aGVsbG8gd29ybGQ=' AS VARCHAR(20))"
 
-    testSqlApi(s"IFNULL(SUBSTR($str1, 2, 3), $str2)", "ell")
-    testSqlApi(s"IFNULL(SUBSTRING($str1, 2, 3), $str2)", "ell")
-    testSqlApi(s"IFNULL(LEFT($str1, 3), $str2)", "Hel")
-    testSqlApi(s"IFNULL(RIGHT($str1, 3), $str2)", "llo")
+    testSqlApi(s"IFNULL(SUBSTR($str1, 2, 3), $str2)", "el")
+    testSqlApi(s"IFNULL(SUBSTRING($str1, 2, 3), $str2)", "el")
+    testSqlApi(s"IFNULL(LEFT($str1, 3), $str2)", "He")
+    testSqlApi(s"IFNULL(RIGHT($str1, 3), $str2)", "ll")
     testSqlApi(s"IFNULL(REGEXP_EXTRACT($str1, 'H(.+?)l(.+?)$$', 2), $str2)", "lo")
     testSqlApi(s"IFNULL(REGEXP_REPLACE($str1, 'e.l', 'EXL'), $str2)", "HEXLo")
     testSqlApi(s"IFNULL(UPPER($str1), $str2)", "HELLO")
@@ -4316,9 +4316,9 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
     testSqlApi(s"IFNULL(LPAD($str1, 7, $str3), $str2)", "heHello")
     testSqlApi(s"IFNULL(RPAD($str1, 7, $str3), $str2)", "Hellohe")
     testSqlApi(s"IFNULL(REPEAT($str1, 2), $str2)", "HelloHello")
-    testSqlApi(s"IFNULL(REVERSE($str1), $str2)", "olleH")
+    testSqlApi(s"IFNULL(REVERSE($str1), $str2)", "ol")
     testSqlApi(s"IFNULL(REPLACE($str3, ' ', '_'), $str2)", "hello_world")
-    testSqlApi(s"IFNULL(SPLIT_INDEX($str3, ' ', 1), $str2)", "world")
+    testSqlApi(s"IFNULL(SPLIT_INDEX($str3, ' ', 1), $str2)", "wo")
     testSqlApi(s"IFNULL(MD5($str1), $str2)", "8b1a9953c4611296a827abf8c47804d7")
     testSqlApi(s"IFNULL(SHA1($str1), $str2)", "f7ff9e8b7bb2e09b70935a5d785e0cc5d9d0abf0")
     testSqlApi(
@@ -4338,7 +4338,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
     testSqlApi(
       s"IFNULL(SHA2($str1, 256), $str2)",
       "185f8db32271fe25f561a6fc938b2e264306ec304eda518007d1764826381969")
-    testSqlApi(s"IFNULL(PARSE_URL($url, 'HOST'), $str2)", "host")
+    testSqlApi(s"IFNULL(PARSE_URL($url, 'HOST'), $str2)", "ho")
     testSqlApi(s"IFNULL(FROM_BASE64($base64), $str2)", "hello world")
     testSqlApi(s"IFNULL(TO_BASE64($str3), $str2)", "aGVsbG8gd29ybGQ=")
     testSqlApi(s"IFNULL(CHR(65), $str2)", "A")
@@ -4350,7 +4350,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
     testSqlApi(s"IFNULL(RTRIM($str4), $str2)", " hello")
     testSqlApi(s"IFNULL($str1 || $str2, $str2)", "HelloHi")
     testSqlApi(s"IFNULL(SUBSTRING(UUID(), 9, 1), $str2)", "-")
-    testSqlApi(s"IFNULL(DECODE(ENCODE($str1, 'utf-8'), 'utf-8'), $str2)", "Hello")
+    testSqlApi(s"IFNULL(DECODE(ENCODE($str1, 'utf-8'), 'utf-8'), $str2)", "He")
 
     testSqlApi(s"IFNULL(CAST(DATE '2021-04-06' AS VARCHAR(10)), $str2)", "2021-04-06")
     testSqlApi(s"IFNULL(CAST(TIME '11:05:30' AS VARCHAR(8)), $str2)", "11:05:30")

[flink] 03/04: [hotfix][table-planner][tests] Minor fixes to remove IDE warnings.

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b0b68f1176888137c615902e0aa5dadf13c75ef9
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Fri Dec 10 09:49:40 2021 +0100

    [hotfix][table-planner][tests] Minor fixes to remove IDE warnings.
---
 .../planner/functions/CastFunctionITCase.java      | 61 ++++++++++------------
 1 file changed, 27 insertions(+), 34 deletions(-)

diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
index ae5481b..eceea3c 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
@@ -549,19 +549,19 @@ public class CastFunctionITCase extends BuiltInFunctionTestBase {
                         .fromCase(
                                 TINYINT(),
                                 DEFAULT_POSITIVE_TINY_INT,
-                                Integer.valueOf(DEFAULT_POSITIVE_TINY_INT))
+                                (int) DEFAULT_POSITIVE_TINY_INT)
                         .fromCase(
                                 TINYINT(),
                                 DEFAULT_NEGATIVE_TINY_INT,
-                                Integer.valueOf(DEFAULT_NEGATIVE_TINY_INT))
+                                (int) DEFAULT_NEGATIVE_TINY_INT)
                         .fromCase(
                                 SMALLINT(),
                                 DEFAULT_POSITIVE_SMALL_INT,
-                                Integer.valueOf(DEFAULT_POSITIVE_SMALL_INT))
+                                (int) DEFAULT_POSITIVE_SMALL_INT)
                         .fromCase(
                                 SMALLINT(),
                                 DEFAULT_NEGATIVE_SMALL_INT,
-                                Integer.valueOf(DEFAULT_NEGATIVE_SMALL_INT))
+                                (int) DEFAULT_NEGATIVE_SMALL_INT)
                         .fromCase(INT(), DEFAULT_POSITIVE_INT, DEFAULT_POSITIVE_INT)
                         .fromCase(INT(), DEFAULT_NEGATIVE_INT, DEFAULT_NEGATIVE_INT)
                         .fromCase(BIGINT(), 123, 123)
@@ -607,21 +607,21 @@ public class CastFunctionITCase extends BuiltInFunctionTestBase {
                         .fromCase(
                                 TINYINT(),
                                 DEFAULT_POSITIVE_TINY_INT,
-                                Long.valueOf(DEFAULT_POSITIVE_TINY_INT))
+                                (long) DEFAULT_POSITIVE_TINY_INT)
                         .fromCase(
                                 TINYINT(),
                                 DEFAULT_NEGATIVE_TINY_INT,
-                                Long.valueOf(DEFAULT_NEGATIVE_TINY_INT))
+                                (long) DEFAULT_NEGATIVE_TINY_INT)
                         .fromCase(
                                 SMALLINT(),
                                 DEFAULT_POSITIVE_SMALL_INT,
-                                Long.valueOf(DEFAULT_POSITIVE_SMALL_INT))
+                                (long) DEFAULT_POSITIVE_SMALL_INT)
                         .fromCase(
                                 SMALLINT(),
                                 DEFAULT_NEGATIVE_SMALL_INT,
-                                Long.valueOf(DEFAULT_NEGATIVE_SMALL_INT))
-                        .fromCase(INT(), DEFAULT_POSITIVE_INT, Long.valueOf(DEFAULT_POSITIVE_INT))
-                        .fromCase(INT(), DEFAULT_NEGATIVE_INT, Long.valueOf(DEFAULT_NEGATIVE_INT))
+                                (long) DEFAULT_NEGATIVE_SMALL_INT)
+                        .fromCase(INT(), DEFAULT_POSITIVE_INT, (long) DEFAULT_POSITIVE_INT)
+                        .fromCase(INT(), DEFAULT_NEGATIVE_INT, (long) DEFAULT_NEGATIVE_INT)
                         .fromCase(BIGINT(), DEFAULT_POSITIVE_BIGINT, DEFAULT_POSITIVE_BIGINT)
                         .fromCase(BIGINT(), DEFAULT_NEGATIVE_BIGINT, DEFAULT_NEGATIVE_BIGINT)
                         .fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, 123L)
@@ -667,29 +667,25 @@ public class CastFunctionITCase extends BuiltInFunctionTestBase {
                         .fromCase(
                                 TINYINT(),
                                 DEFAULT_POSITIVE_TINY_INT,
-                                Float.valueOf(DEFAULT_POSITIVE_TINY_INT))
+                                (float) DEFAULT_POSITIVE_TINY_INT)
                         .fromCase(
                                 TINYINT(),
                                 DEFAULT_NEGATIVE_TINY_INT,
-                                Float.valueOf(DEFAULT_NEGATIVE_TINY_INT))
+                                (float) DEFAULT_NEGATIVE_TINY_INT)
                         .fromCase(
                                 SMALLINT(),
                                 DEFAULT_POSITIVE_SMALL_INT,
-                                Float.valueOf(DEFAULT_POSITIVE_SMALL_INT))
+                                (float) DEFAULT_POSITIVE_SMALL_INT)
                         .fromCase(
                                 SMALLINT(),
                                 DEFAULT_NEGATIVE_SMALL_INT,
-                                Float.valueOf(DEFAULT_NEGATIVE_SMALL_INT))
-                        .fromCase(INT(), DEFAULT_POSITIVE_INT, Float.valueOf(DEFAULT_POSITIVE_INT))
-                        .fromCase(INT(), DEFAULT_NEGATIVE_INT, Float.valueOf(DEFAULT_NEGATIVE_INT))
+                                (float) DEFAULT_NEGATIVE_SMALL_INT)
+                        .fromCase(INT(), DEFAULT_POSITIVE_INT, (float) DEFAULT_POSITIVE_INT)
+                        .fromCase(INT(), DEFAULT_NEGATIVE_INT, (float) DEFAULT_NEGATIVE_INT)
                         .fromCase(
-                                BIGINT(),
-                                DEFAULT_POSITIVE_BIGINT,
-                                Float.valueOf(DEFAULT_POSITIVE_BIGINT))
+                                BIGINT(), DEFAULT_POSITIVE_BIGINT, (float) DEFAULT_POSITIVE_BIGINT)
                         .fromCase(
-                                BIGINT(),
-                                DEFAULT_NEGATIVE_BIGINT,
-                                Float.valueOf(DEFAULT_NEGATIVE_BIGINT))
+                                BIGINT(), DEFAULT_NEGATIVE_BIGINT, (float) DEFAULT_NEGATIVE_BIGINT)
                         .fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, DEFAULT_POSITIVE_FLOAT)
                         .fromCase(FLOAT(), DEFAULT_NEGATIVE_FLOAT, DEFAULT_NEGATIVE_FLOAT)
                         .fromCase(FLOAT(), 9234567891.12, 9234567891.12f)
@@ -734,29 +730,25 @@ public class CastFunctionITCase extends BuiltInFunctionTestBase {
                         .fromCase(
                                 TINYINT(),
                                 DEFAULT_POSITIVE_TINY_INT,
-                                Double.valueOf(DEFAULT_POSITIVE_TINY_INT))
+                                (double) DEFAULT_POSITIVE_TINY_INT)
                         .fromCase(
                                 TINYINT(),
                                 DEFAULT_NEGATIVE_TINY_INT,
-                                Double.valueOf(DEFAULT_NEGATIVE_TINY_INT))
+                                (double) DEFAULT_NEGATIVE_TINY_INT)
                         .fromCase(
                                 SMALLINT(),
                                 DEFAULT_POSITIVE_SMALL_INT,
-                                Double.valueOf(DEFAULT_POSITIVE_SMALL_INT))
+                                (double) DEFAULT_POSITIVE_SMALL_INT)
                         .fromCase(
                                 SMALLINT(),
                                 DEFAULT_NEGATIVE_SMALL_INT,
-                                Double.valueOf(DEFAULT_NEGATIVE_SMALL_INT))
-                        .fromCase(INT(), DEFAULT_POSITIVE_INT, Double.valueOf(DEFAULT_POSITIVE_INT))
-                        .fromCase(INT(), DEFAULT_NEGATIVE_INT, Double.valueOf(DEFAULT_NEGATIVE_INT))
+                                (double) DEFAULT_NEGATIVE_SMALL_INT)
+                        .fromCase(INT(), DEFAULT_POSITIVE_INT, (double) DEFAULT_POSITIVE_INT)
+                        .fromCase(INT(), DEFAULT_NEGATIVE_INT, (double) DEFAULT_NEGATIVE_INT)
                         .fromCase(
-                                BIGINT(),
-                                DEFAULT_POSITIVE_BIGINT,
-                                Double.valueOf(DEFAULT_POSITIVE_BIGINT))
+                                BIGINT(), DEFAULT_POSITIVE_BIGINT, (double) DEFAULT_POSITIVE_BIGINT)
                         .fromCase(
-                                BIGINT(),
-                                DEFAULT_NEGATIVE_BIGINT,
-                                Double.valueOf(DEFAULT_NEGATIVE_BIGINT))
+                                BIGINT(), DEFAULT_NEGATIVE_BIGINT, (double) DEFAULT_NEGATIVE_BIGINT)
                         .fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, 123.456d)
                         .fromCase(FLOAT(), DEFAULT_NEGATIVE_FLOAT, -123.456)
                         .fromCase(FLOAT(), 9234567891.12, 9234567891.12d)
@@ -1078,6 +1070,7 @@ public class CastFunctionITCase extends BuiltInFunctionTestBase {
                         .build());
     }
 
+    @SuppressWarnings("NumericOverflow")
     public static List<TestSpec> numericBounds() {
         return Arrays.asList(
                 CastTestSpecBuilder.testCastTo(TINYINT())

[flink] 02/04: [hotfix][table] Make use of VarCharType.STRING_TYPE

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b6ca017e095de4b055e58af195e1f0a00e312d6e
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Thu Dec 9 13:50:10 2021 +0100

    [hotfix][table] Make use of VarCharType.STRING_TYPE
    
    Replace occurrences of `new VarCharType(MAX_LENGTH()` with new constant
    `VarCharType.STRING_TYPE`.
---
 .../flink/table/types/logical/utils/LogicalTypeParser.java |  2 +-
 .../java/org/apache/flink/table/types/DataTypesTest.java   |  2 +-
 .../apache/flink/table/types/LogicalCommonTypeTest.java    |  4 ++--
 .../apache/flink/table/types/LogicalTypeParserTest.java    |  2 +-
 .../table/types/extraction/DataTypeExtractorTest.java      |  9 +++------
 .../flink/table/planner/plan/type/FlinkReturnTypes.java    |  4 ++--
 .../flink/table/planner/codegen/calls/StringCallGen.scala  |  2 +-
 .../flink/table/planner/codegen/SortCodeGeneratorTest.java |  2 +-
 .../org/apache/flink/table/api/batch/ExplainTest.scala     |  2 +-
 .../org/apache/flink/table/api/stream/ExplainTest.scala    |  2 +-
 .../flink/table/planner/calcite/FlinkTypeFactoryTest.scala |  6 +++---
 .../flink/table/planner/codegen/agg/AggTestBase.scala      |  4 ++--
 .../table/planner/codegen/agg/batch/BatchAggTestBase.scala |  2 +-
 .../codegen/agg/batch/HashAggCodeGeneratorTest.scala       |  2 +-
 .../codegen/agg/batch/SortAggCodeGeneratorTest.scala       |  4 ++--
 .../planner/expressions/utils/ExpressionTestBase.scala     |  2 +-
 .../table/planner/plan/batch/sql/DagOptimizationTest.scala |  2 +-
 .../table/planner/plan/metadata/MetadataTestUtil.scala     |  6 +++---
 .../planner/plan/stream/sql/DagOptimizationTest.scala      |  2 +-
 .../table/planner/plan/stream/sql/LegacySinkTest.scala     |  2 +-
 .../plan/stream/sql/MiniBatchIntervalInferTest.scala       |  2 +-
 .../runtime/batch/sql/PartitionableSinkITCase.scala        |  2 +-
 .../table/planner/runtime/batch/sql/UnionITCase.scala      |  2 +-
 .../table/planner/runtime/stream/sql/CalcITCase.scala      |  4 ++--
 .../org/apache/flink/table/data/BinaryArrayDataTest.java   |  3 +--
 .../org/apache/flink/table/data/BinaryRowDataTest.java     |  3 +--
 .../apache/flink/table/data/DataFormatConvertersTest.java  |  4 ++--
 .../aggregate/window/SlicingWindowAggOperatorTest.java     |  3 +--
 .../deduplicate/ProcTimeDeduplicateFunctionTestBase.java   |  3 +--
 .../deduplicate/RowTimeDeduplicateFunctionTestBase.java    |  3 +--
 .../window/RowTimeWindowDeduplicateOperatorTest.java       |  3 +--
 .../operators/join/RandomSortMergeInnerJoinTest.java       |  6 +++---
 .../operators/join/String2HashJoinOperatorTest.java        | 14 ++++++--------
 .../operators/join/String2SortMergeJoinOperatorTest.java   | 12 +++++-------
 .../join/interval/TimeIntervalStreamJoinTestBase.java      |  6 +++---
 .../join/temporal/TemporalProcessTimeJoinOperatorTest.java |  6 +++---
 .../join/temporal/TemporalTimeJoinOperatorTestBase.java    | 12 +++++-------
 .../operators/join/window/WindowJoinOperatorTest.java      |  6 +++---
 .../over/ProcTimeRangeBoundedPrecedingFunctionTest.java    |  2 +-
 .../runtime/operators/over/RowTimeOverWindowTestBase.java  |  4 +---
 .../table/runtime/operators/rank/TopNFunctionTestBase.java |  8 ++------
 .../operators/rank/window/WindowRankOperatorTest.java      |  5 ++---
 .../runtime/operators/sort/ProcTimeSortOperatorTest.java   |  5 +----
 .../runtime/operators/sort/RowTimeSortOperatorTest.java    | 10 ++--------
 .../runtime/operators/sort/StreamSortOperatorTest.java     |  2 +-
 .../operators/window/WindowOperatorContractTest.java       |  3 +--
 .../table/runtime/operators/window/WindowOperatorTest.java | 11 ++++-------
 .../table/runtime/types/DataTypePrecisionFixerTest.java    |  2 +-
 .../table/runtime/typeutils/RowDataSerializerTest.java     |  6 +++---
 .../util/collections/binary/BytesHashMapTestBase.java      |  2 +-
 .../util/collections/binary/BytesMultiMapTestBase.java     |  4 ++--
 51 files changed, 93 insertions(+), 128 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
index 1c69d77..15b5daa 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
@@ -528,7 +528,7 @@ public final class LogicalTypeParser {
                 case VARCHAR:
                     return parseVarCharType();
                 case STRING:
-                    return new VarCharType(VarCharType.MAX_LENGTH);
+                    return VarCharType.STRING_TYPE;
                 case BOOLEAN:
                     return new BooleanType();
                 case BINARY:
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
index 9d5288e..a658285 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
@@ -118,7 +118,7 @@ public class DataTypesTest {
                         .expectLogicalType(new VarCharType(2))
                         .expectConversionClass(String.class),
                 TestSpec.forDataType(STRING())
-                        .expectLogicalType(new VarCharType(VarCharType.MAX_LENGTH))
+                        .expectLogicalType(VarCharType.STRING_TYPE)
                         .expectConversionClass(String.class),
                 TestSpec.forDataType(BOOLEAN())
                         .expectLogicalType(new BooleanType())
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalCommonTypeTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalCommonTypeTest.java
index 29745ec..ad76ea6 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalCommonTypeTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalCommonTypeTest.java
@@ -141,8 +141,8 @@ public class LogicalCommonTypeTest {
 
                     // VARCHAR types of different length
                     {
-                        Arrays.asList(new VarCharType(2), new VarCharType(VarCharType.MAX_LENGTH)),
-                        new VarCharType(VarCharType.MAX_LENGTH)
+                        Arrays.asList(new VarCharType(2), VarCharType.STRING_TYPE),
+                        VarCharType.STRING_TYPE
                     },
 
                     // mixed VARCHAR and CHAR types
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java
index 1a58475..88758f2 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java
@@ -91,7 +91,7 @@ public class LogicalTypeParserTest {
                 TestSpec.forString("CHAR(33)").expectType(new CharType(33)),
                 TestSpec.forString("VARCHAR").expectType(new VarCharType()),
                 TestSpec.forString("VARCHAR(33)").expectType(new VarCharType(33)),
-                TestSpec.forString("STRING").expectType(new VarCharType(VarCharType.MAX_LENGTH)),
+                TestSpec.forString("STRING").expectType(VarCharType.STRING_TYPE),
                 TestSpec.forString("BOOLEAN").expectType(new BooleanType()),
                 TestSpec.forString("BINARY").expectType(new BinaryType()),
                 TestSpec.forString("BINARY(33)").expectType(new BinaryType(33)),
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
index f023a50..dd58c82 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
@@ -619,8 +619,7 @@ public class DataTypeExtractorTest {
                         new StructuredAttribute("intField", new IntType(true)),
                         new StructuredAttribute("primitiveBooleanField", new BooleanType(false)),
                         new StructuredAttribute("primitiveIntField", new IntType(false)),
-                        new StructuredAttribute(
-                                "stringField", new VarCharType(VarCharType.MAX_LENGTH))));
+                        new StructuredAttribute("stringField", VarCharType.STRING_TYPE)));
         builder.setFinal(true);
         builder.setInstantiable(true);
         final StructuredType structuredType = builder.build();
@@ -641,9 +640,7 @@ public class DataTypeExtractorTest {
         builder.attributes(
                 Arrays.asList(
                         new StructuredAttribute(
-                                "mapField",
-                                new MapType(
-                                        new VarCharType(VarCharType.MAX_LENGTH), new IntType())),
+                                "mapField", new MapType(VarCharType.STRING_TYPE, new IntType())),
                         new StructuredAttribute(
                                 "simplePojoField",
                                 getSimplePojoDataType(simplePojoClass).getLogicalType()),
@@ -700,7 +697,7 @@ public class DataTypeExtractorTest {
         final StructuredType.Builder builder = StructuredType.newBuilder(Tuple2.class);
         builder.attributes(
                 Arrays.asList(
-                        new StructuredAttribute("f0", new VarCharType(VarCharType.MAX_LENGTH)),
+                        new StructuredAttribute("f0", VarCharType.STRING_TYPE),
                         new StructuredAttribute("f1", new BooleanType())));
         builder.setFinal(true);
         builder.setInstantiable(true);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/type/FlinkReturnTypes.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/type/FlinkReturnTypes.java
index b495a98..1ff51b4 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/type/FlinkReturnTypes.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/type/FlinkReturnTypes.java
@@ -127,6 +127,6 @@ public class FlinkReturnTypes {
                             ((FlinkTypeFactory) factory)
                                     .createFieldTypeFromLogicalType(
                                             new MapType(
-                                                    new VarCharType(VarCharType.MAX_LENGTH),
-                                                    new VarCharType(VarCharType.MAX_LENGTH))));
+                                                    VarCharType.STRING_TYPE,
+                                                    VarCharType.STRING_TYPE)));
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index b605ddc..7619486 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -765,7 +765,7 @@ object StringCallGen {
       operands: Seq[GeneratedExpression]): GeneratedExpression = {
     val className = classOf[SqlFunctionUtils].getCanonicalName
     val t = new MapType(
-      new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH))
+      VarCharType.STRING_TYPE, VarCharType.STRING_TYPE)
     val converter = DataFormatConverters.getConverterForDataType(
       DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
     val converterTerm = ctx.addReusableObject(converter, "mapConverter")
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
index 8a26c3e..931b0ec 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
@@ -105,7 +105,7 @@ public class SortCodeGeneratorTest {
                 new BigIntType(),
                 new FloatType(),
                 new DoubleType(),
-                new VarCharType(VarCharType.MAX_LENGTH),
+                VarCharType.STRING_TYPE,
                 new DecimalType(18, 2),
                 new DecimalType(38, 18),
                 new VarBinaryType(VarBinaryType.MAX_LENGTH),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
index cd20033..8314f67 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
@@ -43,7 +43,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
   util.addDataStream[(Int, Long, String)]("MyTable1", 'a, 'b, 'c)
   util.addDataStream[(Int, Long, String)]("MyTable2", 'd, 'e, 'f)
 
-  val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+  val STRING = VarCharType.STRING_TYPE
   val LONG = new BigIntType()
   val INT = new IntType()
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index cadbaa5..a66db23 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -46,7 +46,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
   util.addDataStream[(Int, Long, String)]("MyTable1", 'a, 'b, 'c)
   util.addDataStream[(Int, Long, String)]("MyTable2", 'd, 'e, 'f)
 
-  val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+  val STRING = VarCharType.STRING_TYPE
   val LONG = new BigIntType()
   val INT = new IntType()
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactoryTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactoryTest.scala
index d8be0ea..d6c8c7e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactoryTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactoryTest.scala
@@ -69,7 +69,7 @@ class FlinkTypeFactoryTest {
     test(new NullType())
     test(new BooleanType())
     test(new TinyIntType())
-    test(new VarCharType(VarCharType.MAX_LENGTH))
+    test(VarCharType.STRING_TYPE)
     test(new DoubleType())
     test(new FloatType())
     test(new IntType())
@@ -82,8 +82,8 @@ class FlinkTypeFactoryTest {
     test(new LocalZonedTimestampType(3))
 
     test(new ArrayType(new DoubleType()))
-    test(new MapType(new DoubleType(), new VarCharType(VarCharType.MAX_LENGTH)))
-    test(RowType.of(new DoubleType(), new VarCharType(VarCharType.MAX_LENGTH)))
+    test(new MapType(new DoubleType(), VarCharType.STRING_TYPE))
+    test(RowType.of(new DoubleType(), VarCharType.STRING_TYPE))
     test(new RawType[DayOfWeek](
       classOf[DayOfWeek],
       new KryoSerializer[DayOfWeek](classOf[DayOfWeek], new ExecutionConfig)))
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala
index cd4d0d5..3fac689 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala
@@ -57,8 +57,8 @@ abstract class AggTestBase(isBatchMode: Boolean) {
   private val planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
   val inputNames = Array("f0", "f1", "f2", "f3", "f4")
   val inputTypes: Array[LogicalType] = Array(
-    new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new DoubleType(), new BigIntType(),
-    new VarCharType(VarCharType.MAX_LENGTH))
+    VarCharType.STRING_TYPE, new BigIntType(), new DoubleType(), new BigIntType(),
+    VarCharType.STRING_TYPE)
   val inputType: RowType = RowType.of(inputTypes, inputNames)
 
   val relBuilder: RelBuilder = planner.getRelBuilder.values(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala
index d71a2a3..c5bf13b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala
@@ -43,7 +43,7 @@ abstract class BatchAggTestBase extends AggTestBase(isBatchMode = true) {
 
   val globalOutputType = RowType.of(
     Array[LogicalType](
-      new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH),
+      VarCharType.STRING_TYPE, VarCharType.STRING_TYPE,
       new BigIntType(),
       new DoubleType(),
       new BigIntType()),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
index 720263b..62ef130 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
@@ -35,7 +35,7 @@ class HashAggCodeGeneratorTest extends BatchAggTestBase {
 
   val localOutputType = RowType.of(
     Array[LogicalType](
-      new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH),
+      VarCharType.STRING_TYPE, VarCharType.STRING_TYPE,
       new BigIntType(), new BigIntType(),
       new DoubleType(), new BigIntType(),
       new BigIntType(), new BigIntType()),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala
index f69766f1..90f981c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala
@@ -32,7 +32,7 @@ class SortAggCodeGeneratorTest extends BatchAggTestBase {
 
   val localOutputType = RowType.of(
     Array[LogicalType](
-      new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH),
+      VarCharType.STRING_TYPE, VarCharType.STRING_TYPE,
       new BigIntType(), new BigIntType(),
       new DoubleType(), new BigIntType(),
       fromTypeInfoToLogicalType(imperativeAggFunc.getAccumulatorType)),
@@ -95,7 +95,7 @@ class SortAggCodeGeneratorTest extends BatchAggTestBase {
     : (CodeGenOperatorFactory[RowData], RowType, RowType) = {
     val localOutputType = RowType.of(
       Array[LogicalType](
-        new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH),
+        VarCharType.STRING_TYPE, VarCharType.STRING_TYPE,
         new BigIntType(), new BigIntType(),
         new DoubleType(), new BigIntType(),
         fromTypeInfoToLogicalType(imperativeAggFunc.getAccumulatorType)),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index b62f158..48898e6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -374,7 +374,7 @@ abstract class ExpressionTestBase {
 
     // generate code
     val resultType = RowType.of(Seq.fill(rexNodes.size)(
-      new VarCharType(VarCharType.MAX_LENGTH)): _*)
+      VarCharType.STRING_TYPE): _*)
 
     val exprs = stringTestExprs.map(exprGenerator.generateExpression)
     val genExpr = exprGenerator.generateResultExpression(exprs, resultType, classOf[BinaryRowData])
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
index 50701b9..b6fc21e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
@@ -34,7 +34,7 @@ class DagOptimizationTest extends TableTestBase {
   util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
   util.addTableSource[(Int, Long, String)]("MyTable1", 'd, 'e, 'f)
 
-  val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+  val STRING = VarCharType.STRING_TYPE
   val LONG = new BigIntType()
   val INT = new IntType()
   val DOUBLE = new DoubleType()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
index f12993f..4759afa 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
@@ -198,7 +198,7 @@ object MetadataTestUtil {
     val fieldNames = Array("a", "b", "c", "proctime", "rowtime")
     val fieldTypes = Array[LogicalType](
       new BigIntType(),
-      new VarCharType(VarCharType.MAX_LENGTH),
+      VarCharType.STRING_TYPE,
       new IntType(),
       new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3),
       new TimestampType(true, TimestampKind.ROWTIME, 3))
@@ -217,7 +217,7 @@ object MetadataTestUtil {
     val fieldNames = Array("a", "b", "c", "proctime", "rowtime")
     val fieldTypes = Array[LogicalType](
       new BigIntType(),
-      new VarCharType(VarCharType.MAX_LENGTH),
+      VarCharType.STRING_TYPE,
       new IntType(),
       new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3),
       new TimestampType(true, TimestampKind.ROWTIME, 3))
@@ -238,7 +238,7 @@ object MetadataTestUtil {
     val fieldTypes = Array[LogicalType](
       new IntType(),
       new BigIntType(),
-      new VarCharType(VarCharType.MAX_LENGTH),
+      VarCharType.STRING_TYPE,
       new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3),
       new TimestampType(true, TimestampKind.ROWTIME, 3))
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala
index 4502232..414b044 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala
@@ -32,7 +32,7 @@ class DagOptimizationTest extends TableTestBase {
   util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
   util.addTableSource[(Int, Long, String)]("MyTable1", 'd, 'e, 'f)
 
-  val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+  val STRING = VarCharType.STRING_TYPE
   val LONG = new BigIntType()
   val INT = new IntType()
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala
index b952a9c..6244402 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala
@@ -31,7 +31,7 @@ class LegacySinkTest extends TableTestBase {
   private val util = streamTestUtil()
   util.addDataStream[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
-  val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+  val STRING = VarCharType.STRING_TYPE
   val LONG = new BigIntType()
   val INT = new IntType()
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
index eb2f7a3..0cf621c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
@@ -34,7 +34,7 @@ import org.junit.{Before, Test}
 class MiniBatchIntervalInferTest extends TableTestBase {
   private val util = streamTestUtil()
 
-  val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+  val STRING = VarCharType.STRING_TYPE
   val LONG = new BigIntType()
   val INT = new IntType()
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
index bdba353..13bced6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -255,7 +255,7 @@ object PartitionableSinkITCase {
   }
 
   val fieldNames = Array("a", "b", "c")
-  val dataType = Array(new IntType(), new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH))
+  val dataType = Array(new IntType(), new BigIntType(), VarCharType.STRING_TYPE)
   val dataNullables = Array(true, true, true)
 
   val testData = Seq(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
index b7a4302..27451d4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
@@ -33,7 +33,7 @@ import scala.collection.Seq
 class UnionITCase extends BatchTestBase {
 
   val type6 = InternalTypeInfo.ofFields(
-    new IntType(), new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH))
+    new IntType(), new BigIntType(), VarCharType.STRING_TYPE)
 
   val data6 = Seq(
     binaryRow(type6.toRowFieldTypes, 1, 1L, fromString("Hi")),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index 1d887a1..b5e55ae 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -152,8 +152,8 @@ class CalcITCase extends StreamingTestBase {
     tEnv.registerTable("MyTableRow", t)
 
     val outputType = InternalTypeInfo.ofFields(
-      new VarCharType(VarCharType.MAX_LENGTH),
-      new VarCharType(VarCharType.MAX_LENGTH),
+      VarCharType.STRING_TYPE,
+      VarCharType.STRING_TYPE,
       new IntType())
 
     val result = tEnv.sqlQuery(sqlQuery).toAppendStream[RowData]
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java
index e7d5773..fb8a5fc 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java
@@ -501,8 +501,7 @@ public class BinaryArrayDataTest {
         writer.writeRow(
                 0,
                 GenericRowData.of(fromString("1"), 1),
-                new RowDataSerializer(
-                        RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType())));
+                new RowDataSerializer(RowType.of(VarCharType.STRING_TYPE, new IntType())));
         writer.setNullAt(1);
         writer.complete();
 
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
index c0c91a4..1a200e8 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
@@ -484,8 +484,7 @@ public class BinaryRowDataTest {
         writer.writeRow(
                 0,
                 GenericRowData.of(fromString("1"), 1),
-                new RowDataSerializer(
-                        RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType())));
+                new RowDataSerializer(RowType.of(VarCharType.STRING_TYPE, new IntType())));
         writer.setNullAt(1);
         writer.complete();
 
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
index de0c65b..852633f 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
@@ -189,10 +189,10 @@ public class DataFormatConvertersTest {
         test(new RowTypeInfo(simpleTypes), new Row(simpleTypes.length));
         test(new RowTypeInfo(simpleTypes), Row.ofKind(RowKind.DELETE, simpleValues));
         test(
-                InternalTypeInfo.ofFields(new VarCharType(VarCharType.MAX_LENGTH), new IntType()),
+                InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new IntType()),
                 GenericRowData.of(StringData.fromString("hehe"), 111));
         test(
-                InternalTypeInfo.ofFields(new VarCharType(VarCharType.MAX_LENGTH), new IntType()),
+                InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new IntType()),
                 GenericRowData.of(null, null));
 
         test(new DecimalDataTypeInfo(10, 5), null);
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
index 8995185..1989701 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
@@ -110,8 +110,7 @@ public class SlicingWindowAggOperatorTest {
 
     private static final RowDataHarnessAssertor ASSERTER =
             new RowDataHarnessAssertor(
-                    OUTPUT_TYPES,
-                    new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
+                    OUTPUT_TYPES, new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
 
     @Test
     public void testEventTimeHoppingWindows() throws Exception {
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java
index 99771bc..cdac040 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java
@@ -37,8 +37,7 @@ abstract class ProcTimeDeduplicateFunctionTestBase {
 
     Time minTime = Time.milliseconds(10);
     InternalTypeInfo<RowData> inputRowType =
-            InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new IntType());
+            InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new BigIntType(), new IntType());
 
     int rowKeyIdx = 1;
     RowDataKeySelector rowKeySelector =
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
index 2a6cc8f..bec4ba1 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
@@ -40,8 +40,7 @@ abstract class RowTimeDeduplicateFunctionTestBase {
     protected final long miniBatchSize = 4L;
     protected Time minTtlTime = Time.milliseconds(10);
     protected InternalTypeInfo inputRowType =
-            InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH), new IntType(), new BigIntType());
+            InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new IntType(), new BigIntType());
     protected TypeSerializer<RowData> serializer = inputRowType.toSerializer();
     protected int rowTimeIndex = 2;
     protected int rowKeyIndex = 0;
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java
index 9c91345..386fd28 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java
@@ -84,8 +84,7 @@ public class RowTimeWindowDeduplicateOperatorTest {
 
     private static final RowDataHarnessAssertor ASSERTER =
             new RowDataHarnessAssertor(
-                    OUTPUT_TYPES,
-                    new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
+                    OUTPUT_TYPES, new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
 
     private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
     private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java
index 99f93fd..f73d224 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java
@@ -261,13 +261,13 @@ public class RandomSortMergeInnerJoinTest {
             boolean input1First)
             throws Exception {
         InternalTypeInfo<RowData> typeInfo =
-                InternalTypeInfo.ofFields(new IntType(), new VarCharType(VarCharType.MAX_LENGTH));
+                InternalTypeInfo.ofFields(new IntType(), VarCharType.STRING_TYPE);
         InternalTypeInfo<RowData> joinedInfo =
                 InternalTypeInfo.ofFields(
                         new IntType(),
-                        new VarCharType(VarCharType.MAX_LENGTH),
+                        VarCharType.STRING_TYPE,
                         new IntType(),
-                        new VarCharType(VarCharType.MAX_LENGTH));
+                        VarCharType.STRING_TYPE);
         final TwoInputStreamTaskTestHarness<BinaryRowData, BinaryRowData, JoinedRowData>
                 testHarness =
                         new TwoInputStreamTaskTestHarness<>(
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
index 88c06bc..31eb491 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
@@ -48,15 +48,13 @@ import java.util.concurrent.LinkedBlockingQueue;
 public class String2HashJoinOperatorTest implements Serializable {
 
     private InternalTypeInfo<RowData> typeInfo =
-            InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH));
+            InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, VarCharType.STRING_TYPE);
     private InternalTypeInfo<RowData> joinedInfo =
             InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH));
+                    VarCharType.STRING_TYPE,
+                    VarCharType.STRING_TYPE,
+                    VarCharType.STRING_TYPE,
+                    VarCharType.STRING_TYPE);
     private transient TwoInputStreamTaskTestHarness<BinaryRowData, BinaryRowData, JoinedRowData>
             testHarness;
     private ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -354,6 +352,6 @@ public class String2HashJoinOperatorTest implements Serializable {
                 20,
                 10000,
                 10000,
-                RowType.of(new VarCharType(VarCharType.MAX_LENGTH)));
+                RowType.of(VarCharType.STRING_TYPE));
     }
 }
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
index 6a6c5060..719cecf 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
@@ -60,15 +60,13 @@ public class String2SortMergeJoinOperatorTest {
 
     private boolean leftIsSmall;
     InternalTypeInfo<RowData> typeInfo =
-            InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH));
+            InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, VarCharType.STRING_TYPE);
     private InternalTypeInfo<RowData> joinedInfo =
             InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH));
+                    VarCharType.STRING_TYPE,
+                    VarCharType.STRING_TYPE,
+                    VarCharType.STRING_TYPE,
+                    VarCharType.STRING_TYPE);
 
     public String2SortMergeJoinOperatorTest(boolean leftIsSmall) {
         this.leftIsSmall = leftIsSmall;
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalStreamJoinTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalStreamJoinTestBase.java
index 6f92a34..034fe71 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalStreamJoinTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalStreamJoinTestBase.java
@@ -28,14 +28,14 @@ import org.apache.flink.table.types.logical.VarCharType;
 /** Base Test for all subclass of {@link TimeIntervalJoin}. */
 abstract class TimeIntervalStreamJoinTestBase {
     InternalTypeInfo<RowData> rowType =
-            InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH));
+            InternalTypeInfo.ofFields(new BigIntType(), VarCharType.STRING_TYPE);
 
     private InternalTypeInfo<RowData> outputRowType =
             InternalTypeInfo.ofFields(
                     new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH),
+                    VarCharType.STRING_TYPE,
                     new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH));
+                    VarCharType.STRING_TYPE);
     RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(outputRowType.toRowFieldTypes());
 
     protected String funcCode =
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java
index cc839c6..ff4e801 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java
@@ -43,7 +43,7 @@ public class TemporalProcessTimeJoinOperatorTest extends TemporalTimeJoinOperato
 
     private int keyIdx = 0;
     private InternalTypeInfo<RowData> rowType =
-            InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH));
+            InternalTypeInfo.ofFields(new BigIntType(), VarCharType.STRING_TYPE);
     private RowDataKeySelector keySelector =
             HandwrittenSelectorUtil.getRowDataSelector(
                     new int[] {keyIdx}, rowType.toRowFieldTypes());
@@ -51,9 +51,9 @@ public class TemporalProcessTimeJoinOperatorTest extends TemporalTimeJoinOperato
     private InternalTypeInfo<RowData> outputRowType =
             InternalTypeInfo.ofFields(
                     new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH),
+                    VarCharType.STRING_TYPE,
                     new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH));
+                    VarCharType.STRING_TYPE);
     private RowDataHarnessAssertor assertor =
             new RowDataHarnessAssertor(outputRowType.toRowFieldTypes());
 
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java
index 924e30e..8ba7a87 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java
@@ -46,17 +46,15 @@ abstract class TemporalTimeJoinOperatorTestBase {
             new GeneratedJoinCondition("TimeTemporalJoinCondition", funcCode, new Object[0]);
     protected InternalTypeInfo<RowData> rowType =
             InternalTypeInfo.ofFields(
-                    new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH));
+                    new BigIntType(), VarCharType.STRING_TYPE, VarCharType.STRING_TYPE);
     protected InternalTypeInfo<RowData> outputRowType =
             InternalTypeInfo.ofFields(
                     new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH),
+                    VarCharType.STRING_TYPE,
+                    VarCharType.STRING_TYPE,
                     new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH));
+                    VarCharType.STRING_TYPE,
+                    VarCharType.STRING_TYPE);
     protected RowDataHarnessAssertor assertor =
             new RowDataHarnessAssertor(outputRowType.toRowFieldTypes());
     protected int keyIdx = 1;
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
index 3b7d093..762d434 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
@@ -50,14 +50,14 @@ import static org.junit.Assert.assertEquals;
 public class WindowJoinOperatorTest {
 
     private static final InternalTypeInfo<RowData> INPUT_ROW_TYPE =
-            InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH));
+            InternalTypeInfo.ofFields(new BigIntType(), VarCharType.STRING_TYPE);
 
     private static final InternalTypeInfo<RowData> OUTPUT_ROW_TYPE =
             InternalTypeInfo.ofFields(
                     new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH),
+                    VarCharType.STRING_TYPE,
                     new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH));
+                    VarCharType.STRING_TYPE);
 
     private static final RowDataHarnessAssertor ASSERTER =
             new RowDataHarnessAssertor(OUTPUT_ROW_TYPE.toRowFieldTypes());
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java
index 94d337c..a9718fc 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java
@@ -50,7 +50,7 @@ public class ProcTimeRangeBoundedPrecedingFunctionTest {
 
     private LogicalType[] inputFieldTypes =
             new LogicalType[] {
-                new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(),
+                VarCharType.STRING_TYPE, new BigIntType(),
             };
     private LogicalType[] accTypes = new LogicalType[] {new BigIntType()};
 
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java
index 689dec6..3f85eee 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java
@@ -42,9 +42,7 @@ public class RowTimeOverWindowTestBase {
             };
 
     protected LogicalType[] inputFieldTypes =
-            new LogicalType[] {
-                new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new BigIntType()
-            };
+            new LogicalType[] {VarCharType.STRING_TYPE, new BigIntType(), new BigIntType()};
     protected LogicalType[] accTypes = new LogicalType[] {new BigIntType()};
 
     protected RowDataKeySelector keySelector =
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java
index aad99bc..c38e12c 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java
@@ -58,8 +58,7 @@ abstract class TopNFunctionTestBase {
     long cacheSize = 10000L;
 
     InternalTypeInfo<RowData> inputRowType =
-            InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new IntType());
+            InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new BigIntType(), new IntType());
 
     static GeneratedRecordComparator generatedSortKeyComparator =
             new GeneratedRecordComparator("", "", new Object[0]) {
@@ -107,10 +106,7 @@ abstract class TopNFunctionTestBase {
 
     private InternalTypeInfo<RowData> outputTypeWithRowNumber =
             InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new BigIntType(),
-                    new IntType(),
-                    new BigIntType());
+                    VarCharType.STRING_TYPE, new BigIntType(), new IntType(), new BigIntType());
 
     RowDataHarnessAssertor assertorWithoutRowNumber =
             new RowDataHarnessAssertor(
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
index 087cc4c..60f686e 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
@@ -106,8 +106,7 @@ public class WindowRankOperatorTest {
 
     private static final RowDataHarnessAssertor ASSERTER =
             new RowDataHarnessAssertor(
-                    OUTPUT_TYPES,
-                    new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
+                    OUTPUT_TYPES, new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
 
     private static final LogicalType[] OUTPUT_TYPES_WITHOUT_RANK_NUMBER =
             new LogicalType[] {new VarCharType(Integer.MAX_VALUE), new IntType(), new BigIntType()};
@@ -118,7 +117,7 @@ public class WindowRankOperatorTest {
     private static final RowDataHarnessAssertor ASSERTER_WITHOUT_RANK_NUMBER =
             new RowDataHarnessAssertor(
                     OUTPUT_TYPES_WITHOUT_RANK_NUMBER,
-                    new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
+                    new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
 
     private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
     private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/ProcTimeSortOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/ProcTimeSortOperatorTest.java
index e42a1ec..fa29703 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/ProcTimeSortOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/ProcTimeSortOperatorTest.java
@@ -43,10 +43,7 @@ public class ProcTimeSortOperatorTest {
 
     private InternalTypeInfo<RowData> inputRowType =
             InternalTypeInfo.ofFields(
-                    new IntType(),
-                    new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new IntType());
+                    new IntType(), new BigIntType(), VarCharType.STRING_TYPE, new IntType());
 
     private GeneratedRecordComparator gComparator =
             new GeneratedRecordComparator("", "", new Object[0]) {
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/RowTimeSortOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/RowTimeSortOperatorTest.java
index 7acaaa5..53cf10b 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/RowTimeSortOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/RowTimeSortOperatorTest.java
@@ -46,10 +46,7 @@ public class RowTimeSortOperatorTest {
     public void testSortOnTwoFields() throws Exception {
         InternalTypeInfo<RowData> inputRowType =
                 InternalTypeInfo.ofFields(
-                        new IntType(),
-                        new BigIntType(),
-                        new VarCharType(VarCharType.MAX_LENGTH),
-                        new IntType());
+                        new IntType(), new BigIntType(), VarCharType.STRING_TYPE, new IntType());
 
         // Note: RowTimeIdx must be 0 in product environment, the value is 1 here just for simplify
         // the testing
@@ -134,10 +131,7 @@ public class RowTimeSortOperatorTest {
     public void testOnlySortOnRowTime() throws Exception {
         InternalTypeInfo<RowData> inputRowType =
                 InternalTypeInfo.ofFields(
-                        new BigIntType(),
-                        new BigIntType(),
-                        new VarCharType(VarCharType.MAX_LENGTH),
-                        new IntType());
+                        new BigIntType(), new BigIntType(), VarCharType.STRING_TYPE, new IntType());
         int rowTimeIdx = 0;
         RowDataHarnessAssertor assertor =
                 new RowDataHarnessAssertor(inputRowType.toRowFieldTypes());
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java
index e535ede..5d4879c 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java
@@ -40,7 +40,7 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord
 public class StreamSortOperatorTest {
 
     private InternalTypeInfo<RowData> inputRowType =
-            InternalTypeInfo.ofFields(new VarCharType(VarCharType.MAX_LENGTH), new IntType());
+            InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new IntType());
 
     private GeneratedRecordComparator sortKeyComparator =
             new GeneratedRecordComparator("", "", new Object[0]) {
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorContractTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorContractTest.java
index e8308f8..76db205 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorContractTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorContractTest.java
@@ -195,8 +195,7 @@ public class WindowOperatorContractTest {
                     long allowedLateness)
                     throws Exception {
 
-        LogicalType[] inputTypes =
-                new LogicalType[] {new VarCharType(VarCharType.MAX_LENGTH), new IntType()};
+        LogicalType[] inputTypes = new LogicalType[] {VarCharType.STRING_TYPE, new IntType()};
         RowDataKeySelector keySelector =
                 HandwrittenSelectorUtil.getRowDataSelector(new int[] {0}, inputTypes);
         TypeInformation<RowData> keyType = keySelector.getProducedType();
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorTest.java
index ed0e75c..6235352 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorTest.java
@@ -123,13 +123,11 @@ public class WindowOperatorTest {
     private static AtomicInteger closeCalled = new AtomicInteger(0);
 
     private LogicalType[] inputFieldTypes =
-            new LogicalType[] {
-                new VarCharType(VarCharType.MAX_LENGTH), new IntType(), new BigIntType()
-            };
+            new LogicalType[] {VarCharType.STRING_TYPE, new IntType(), new BigIntType()};
 
     private InternalTypeInfo<RowData> outputType =
             InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH),
+                    VarCharType.STRING_TYPE,
                     new BigIntType(),
                     new BigIntType(),
                     new BigIntType(),
@@ -147,7 +145,7 @@ public class WindowOperatorTest {
     private RowDataHarnessAssertor assertor =
             new RowDataHarnessAssertor(
                     outputType.toRowFieldTypes(),
-                    new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
+                    new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
 
     private ConcurrentLinkedQueue<Object> doubleRecord(
             boolean isDouble, StreamRecord<RowData> record) {
@@ -1543,8 +1541,7 @@ public class WindowOperatorTest {
         RowDataHarnessAssertor assertor =
                 new RowDataHarnessAssertor(
                         outputType.toRowFieldTypes(),
-                        new GenericRowRecordSortComparator(
-                                0, new VarCharType(VarCharType.MAX_LENGTH)));
+                        new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
 
         ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java
index d8eb34c..3bf9e4c 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java
@@ -82,7 +82,7 @@ public class DataTypePrecisionFixerTest {
                         .logicalType(new LocalZonedTimestampType(2))
                         .expect(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(2)),
                 TestSpecs.fix(Types.STRING)
-                        .logicalType(new VarCharType(VarCharType.MAX_LENGTH))
+                        .logicalType(VarCharType.STRING_TYPE)
                         .expect(DataTypes.STRING()),
 
                 // nested
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
index 1e0aa14..d87a5f0 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
@@ -94,7 +94,7 @@ public class RowDataSerializerTest extends SerializerTestInstance<RowData> {
 
     private static Object[] testRowDataSerializer() {
         InternalTypeInfo<RowData> typeInfo =
-                InternalTypeInfo.ofFields(new IntType(), new VarCharType(VarCharType.MAX_LENGTH));
+                InternalTypeInfo.ofFields(new IntType(), VarCharType.STRING_TYPE);
         GenericRowData row1 = new GenericRowData(2);
         row1.setField(0, 1);
         row1.setField(1, fromString("a"));
@@ -122,7 +122,7 @@ public class RowDataSerializerTest extends SerializerTestInstance<RowData> {
                         new IntType(),
                         new IntType(),
                         new IntType(),
-                        new VarCharType(VarCharType.MAX_LENGTH));
+                        VarCharType.STRING_TYPE);
 
         GenericRowData row = new GenericRowData(13);
         row.setField(0, 2);
@@ -147,7 +147,7 @@ public class RowDataSerializerTest extends SerializerTestInstance<RowData> {
                 InternalTypeInfo.ofFields(
                         new IntType(),
                         new DoubleType(),
-                        new VarCharType(VarCharType.MAX_LENGTH),
+                        VarCharType.STRING_TYPE,
                         new ArrayType(new IntType()),
                         new MapType(new IntType(), new IntType()));
 
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
index a599f29..9d50c32 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
@@ -53,7 +53,7 @@ public abstract class BytesHashMapTestBase<K> extends BytesMapTestBase {
     static final LogicalType[] KEY_TYPES =
             new LogicalType[] {
                 new IntType(),
-                new VarCharType(VarCharType.MAX_LENGTH),
+                VarCharType.STRING_TYPE,
                 new DoubleType(),
                 new BigIntType(),
                 new BooleanType(),
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java
index 22dca2d..c4a5d63 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java
@@ -50,7 +50,7 @@ public abstract class BytesMultiMapTestBase<K> extends BytesMapTestBase {
     static final LogicalType[] KEY_TYPES =
             new LogicalType[] {
                 new IntType(),
-                new VarCharType(VarCharType.MAX_LENGTH),
+                VarCharType.STRING_TYPE,
                 new DoubleType(),
                 new BigIntType(),
                 new BooleanType(),
@@ -60,7 +60,7 @@ public abstract class BytesMultiMapTestBase<K> extends BytesMapTestBase {
 
     static final LogicalType[] VALUE_TYPES =
             new LogicalType[] {
-                new VarCharType(VarCharType.MAX_LENGTH), new IntType(),
+                VarCharType.STRING_TYPE, new IntType(),
             };
 
     protected final PagedTypeSerializer<K> keySerializer;

[flink] 04/04: [hotfix][table] Rename precision to length for CHAR/VARCHAR sink enforcer

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4b1df4945141907022a3c5ddae21723a4d5a42f4
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Mon Dec 13 14:58:13 2021 +0200

    [hotfix][table] Rename precision to length for CHAR/VARCHAR sink enforcer
    
    Rename all `precision` references in code and docs to `length`
    which were introduced with: https://github.com/apache/flink/commit/1151071b67b866bc18225fc7f522d29e819a6238
---
 .../generated/execution_config_configuration.html  |  4 +-
 .../table/api/config/ExecutionConfigOptions.java   | 31 ++++++-----
 .../plan/nodes/exec/common/CommonExecSink.java     | 10 ++--
 .../nodes/exec/common/CommonExecSinkITCase.java    | 18 +++----
 .../runtime/operators/sink/ConstraintEnforcer.java | 60 +++++++++++-----------
 5 files changed, 61 insertions(+), 62 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index 2a35fc8..099a9f9 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -53,10 +53,10 @@ By default no operator is disabled.</td>
             <td>Sets default parallelism for all operators (such as aggregate, join, filter) to run with parallel instances. This config has a higher priority than parallelism of StreamExecutionEnvironment (actually, this config overrides the parallelism of StreamExecutionEnvironment). A value of -1 indicates that no default parallelism is set, then it will fallback to use the parallelism of StreamExecutionEnvironment.</td>
         </tr>
         <tr>
-            <td><h5>table.exec.sink.char-precision-enforcer</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
+            <td><h5>table.exec.sink.char-length-enforcer</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">IGNORE</td>
             <td><p>Enum</p></td>
-            <td>Determines whether string values for columns with CHAR(&lt;precision&gt;)/VARCHAR(&lt;precision&gt;) types will be trimmed or padded (only for CHAR(&lt;precision&gt;)), so that their length will match the one defined by the precision of their respective CHAR/VARCHAR column type.<br /><br />Possible values:<ul><li>"IGNORE": Don't apply any trimming and padding, and instead ignore the CHAR/VARCHAR precision directive.</li><li>"TRIM_PAD": Trim and pad string values to match  [...]
+            <td>Determines whether string values for columns with CHAR(&lt;length&gt;)/VARCHAR(&lt;length&gt;) types will be trimmed or padded (only for CHAR(&lt;length&gt;)), so that their length will match the one defined by the length of their respective CHAR/VARCHAR column type.<br /><br />Possible values:<ul><li>"IGNORE": Don't apply any trimming and padding, and instead ignore the CHAR/VARCHAR length directive.</li><li>"TRIM_PAD": Trim and pad string values to match the length defi [...]
         </tr>
         <tr>
             <td><h5>table.exec.sink.keyed-shuffle</h5><br> <span class="label label-primary">Streaming</span></td>
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 6f655b2..ba9a6c3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -120,16 +120,15 @@ public class ExecutionConfigOptions {
                             "Determines how Flink enforces NOT NULL column constraints when inserting null values.");
 
     @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
-    public static final ConfigOption<CharPrecisionEnforcer>
-            TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER =
-                    key("table.exec.sink.char-precision-enforcer")
-                            .enumType(CharPrecisionEnforcer.class)
-                            .defaultValue(CharPrecisionEnforcer.IGNORE)
-                            .withDescription(
-                                    "Determines whether string values for columns with CHAR(<precision>)/VARCHAR(<precision>) "
-                                            + "types will be trimmed or padded (only for CHAR(<precision>)), so that their "
-                                            + "length will match the one defined by the precision of their respective "
-                                            + "CHAR/VARCHAR column type.");
+    public static final ConfigOption<CharLengthEnforcer> TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER =
+            key("table.exec.sink.char-length-enforcer")
+                    .enumType(CharLengthEnforcer.class)
+                    .defaultValue(CharLengthEnforcer.IGNORE)
+                    .withDescription(
+                            "Determines whether string values for columns with CHAR(<length>)/VARCHAR(<length>) "
+                                    + "types will be trimmed or padded (only for CHAR(<length>)), so that their "
+                                    + "length will match the one defined by the length of their respective "
+                                    + "CHAR/VARCHAR column type.");
 
     @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
     public static final ConfigOption<UpsertMaterialize> TABLE_EXEC_SINK_UPSERT_MATERIALIZE =
@@ -441,23 +440,23 @@ public class ExecutionConfigOptions {
     }
 
     /**
-     * The enforcer to guarantee that precision of CHAR/VARCHAR columns is respected when writing
-     * data into sink.
+     * The enforcer to guarantee that length of CHAR/VARCHAR columns is respected when writing data
+     * into sink.
      */
     @PublicEvolving
-    public enum CharPrecisionEnforcer implements DescribedEnum {
+    public enum CharLengthEnforcer implements DescribedEnum {
         IGNORE(
                 text(
                         "Don't apply any trimming and padding, and instead "
-                                + "ignore the CHAR/VARCHAR precision directive.")),
+                                + "ignore the CHAR/VARCHAR length directive.")),
         TRIM_PAD(
                 text(
                         "Trim and pad string values to match the length "
-                                + "defined by the CHAR/VARCHAR precision."));
+                                + "defined by the CHAR/VARCHAR length."));
 
         private final InlineElement description;
 
-        CharPrecisionEnforcer(InlineElement description) {
+        CharLengthEnforcer(InlineElement description) {
             this.description = description;
         }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 65500b9..091c09a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -203,20 +203,20 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
                     notNullEnforcer, notNullFieldIndices, notNullFieldNames, fieldNames);
         }
 
-        // Build CHAR/VARCHAR precision enforcer
+        // Build CHAR/VARCHAR length enforcer
         final List<ConstraintEnforcer.CharFieldInfo> charFieldInfo =
                 getCharFieldInfo(physicalRowType);
         if (!charFieldInfo.isEmpty()) {
-            final ExecutionConfigOptions.CharPrecisionEnforcer charPrecisionEnforcer =
+            final ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer =
                     config.getConfiguration()
-                            .get(ExecutionConfigOptions.TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER);
+                            .get(ExecutionConfigOptions.TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER);
             final List<String> charFieldNames =
                     charFieldInfo.stream()
                             .map(cfi -> fieldNames[cfi.fieldIdx()])
                             .collect(Collectors.toList());
 
-            validatorBuilder.addCharPrecisionConstraint(
-                    charPrecisionEnforcer, charFieldInfo, charFieldNames, fieldNames);
+            validatorBuilder.addCharLengthConstraint(
+                    charLengthEnforcer, charFieldInfo, charFieldNames, fieldNames);
         }
 
         ConstraintEnforcer constraintEnforcer = validatorBuilder.build();
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
index dde54b7..8714722 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
@@ -58,7 +58,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.api.DataTypes.INT;
-import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER;
 import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -194,7 +194,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testCharPrecisionEnforcer() throws ExecutionException, InterruptedException {
+    public void testCharLengthEnforcer() throws ExecutionException, InterruptedException {
         final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
         final List<Row> rows =
                 Arrays.asList(
@@ -205,7 +205,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
 
         final TableDescriptor sourceDescriptor =
                 TableFactoryHarness.newBuilder()
-                        .schema(schemaForCharPrecisionEnforcer())
+                        .schema(schemaForCharLengthEnforcer())
                         .source(new TestSource(rows))
                         .build();
         tableEnv.createTable("T1", sourceDescriptor);
@@ -218,13 +218,13 @@ public class CommonExecSinkITCase extends AbstractTestBase {
         result.collect().forEachRemaining(results::add);
         assertThat(results, containsInAnyOrder(rows.toArray()));
 
-        // Change config option to "trim", to trim the strings based on their type precision
+        // Change config option to "trim", to trim the strings based on their type length
         try {
             tableEnv.getConfig()
                     .getConfiguration()
                     .setString(
-                            TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(),
-                            ExecutionConfigOptions.CharPrecisionEnforcer.TRIM_PAD.name());
+                            TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER.key(),
+                            ExecutionConfigOptions.CharLengthEnforcer.TRIM_PAD.name());
 
             result = tableEnv.executeSql("SELECT * FROM T1");
             result.await();
@@ -243,8 +243,8 @@ public class CommonExecSinkITCase extends AbstractTestBase {
             tableEnv.getConfig()
                     .getConfiguration()
                     .setString(
-                            TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(),
-                            ExecutionConfigOptions.CharPrecisionEnforcer.IGNORE.name());
+                            TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER.key(),
+                            ExecutionConfigOptions.CharLengthEnforcer.IGNORE.name());
         }
     }
 
@@ -378,7 +378,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
         return builder.build();
     }
 
-    private static Schema schemaForCharPrecisionEnforcer() {
+    private static Schema schemaForCharLengthEnforcer() {
         return Schema.newBuilder()
                 .column("a", "INT")
                 .column("b", "CHAR(8)")
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java
index 4755007..7822933 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.config.ExecutionConfigOptions.NotNullEnforcer;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -37,7 +38,7 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 
-import static org.apache.flink.table.api.config.ExecutionConfigOptions.CharPrecisionEnforcer;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.CharLengthEnforcer;
 import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -46,8 +47,8 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  *
  * <ul>
  *   <li>{@code NOT NULL} column constraint of a sink table
- *   <li>{@code CHAR(precision)}/@{code VARCHAR(precision)}: trim string values to comply with the
- *       {@code precision} defined in their corresponding types.
+ *   <li>{@code CHAR(length)}/@{code VARCHAR(length)}: trim string values to comply with the {@code
+ *       length} defined in their corresponding types.
  * </ul>
  */
 @Internal
@@ -60,9 +61,9 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
     private final int[] notNullFieldIndices;
     private final String[] allFieldNames;
 
-    private final CharPrecisionEnforcer charPrecisionEnforcer;
+    private final ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer;
     private final int[] charFieldIndices;
-    private final int[] charFieldPrecisions;
+    private final int[] charFieldLengths;
     private final BitSet charFieldShouldPad;
 
     private final String operatorName;
@@ -72,17 +73,17 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
     private ConstraintEnforcer(
             NotNullEnforcer notNullEnforcer,
             int[] notNullFieldIndices,
-            CharPrecisionEnforcer charPrecisionEnforcer,
+            ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer,
             int[] charFieldIndices,
-            int[] charFieldPrecisions,
+            int[] charFieldLengths,
             BitSet charFieldShouldPad,
             String[] allFieldNames,
             String operatorName) {
         this.notNullEnforcer = notNullEnforcer;
         this.notNullFieldIndices = notNullFieldIndices;
-        this.charPrecisionEnforcer = charPrecisionEnforcer;
+        this.charLengthEnforcer = charLengthEnforcer;
         this.charFieldIndices = charFieldIndices;
-        this.charFieldPrecisions = charFieldPrecisions;
+        this.charFieldLengths = charFieldLengths;
         this.charFieldShouldPad = charFieldShouldPad;
         this.allFieldNames = allFieldNames;
         this.operatorName = operatorName;
@@ -105,14 +106,14 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
 
     /**
      * Helper builder, so that the {@link ConstraintEnforcer} can be instantiated with only the NOT
-     * NULL constraint validation, only the CHAR/VARCHAR precision validation, or both.
+     * NULL constraint validation, only the CHAR/VARCHAR length validation, or both.
      */
     public static class Builder {
 
         private NotNullEnforcer notNullEnforcer;
         private int[] notNullFieldIndices;
 
-        private CharPrecisionEnforcer charPrecisionEnforcer;
+        private CharLengthEnforcer charLengthEnforcer;
         private List<CharFieldInfo> charFieldInfo;
         private String[] allFieldNames;
 
@@ -140,13 +141,13 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
             }
         }
 
-        public void addCharPrecisionConstraint(
-                CharPrecisionEnforcer charPrecisionEnforcer,
+        public void addCharLengthConstraint(
+                ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer,
                 List<CharFieldInfo> charFieldInfo,
                 List<String> charFieldNames,
                 String[] allFieldNames) {
-            this.charPrecisionEnforcer = charPrecisionEnforcer;
-            if (this.charPrecisionEnforcer == CharPrecisionEnforcer.TRIM_PAD) {
+            this.charLengthEnforcer = charLengthEnforcer;
+            if (this.charLengthEnforcer == CharLengthEnforcer.TRIM_PAD) {
                 checkArgument(
                         charFieldInfo.size() > 0,
                         "ConstraintValidator requires that there are CHAR/VARCHAR fields.");
@@ -155,15 +156,14 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
 
                 operatorNames.add(
                         String.format(
-                                "CharPrecisionEnforcer(fields=[%s])",
+                                "CharLengthEnforcer(fields=[%s])",
                                 String.join(", ", charFieldNames)));
                 this.isConfigured = true;
             }
         }
 
         /**
-         * If neither of NOT NULL or CHAR/VARCHAR precision enforcers are configured, null is
-         * returned.
+         * If neither of NOT NULL or CHAR/VARCHAR length enforcers are configured, null is returned.
          */
         public ConstraintEnforcer build() {
             if (isConfigured) {
@@ -172,12 +172,12 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
                 return new ConstraintEnforcer(
                         notNullEnforcer,
                         notNullFieldIndices,
-                        charPrecisionEnforcer,
+                        charLengthEnforcer,
                         charFieldInfo != null
                                 ? charFieldInfo.stream().mapToInt(cfi -> cfi.fieldIdx).toArray()
                                 : null,
                         charFieldInfo != null
-                                ? charFieldInfo.stream().mapToInt(cfi -> cfi.precision).toArray()
+                                ? charFieldInfo.stream().mapToInt(cfi -> cfi.length).toArray()
                                 : null,
                         charFieldInfo != null ? buildShouldPad(charFieldInfo) : null,
                         allFieldNames,
@@ -231,8 +231,8 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
     }
 
     private RowData processCharConstraint(RowData rowData) {
-        if (charPrecisionEnforcer == null
-                || charPrecisionEnforcer == CharPrecisionEnforcer.IGNORE) {
+        if (charLengthEnforcer == null
+                || charLengthEnforcer == ExecutionConfigOptions.CharLengthEnforcer.IGNORE) {
             return rowData;
         }
 
@@ -240,26 +240,26 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
 
         for (int i = 0; i < charFieldIndices.length; i++) {
             final int fieldIdx = charFieldIndices[i];
-            final int precision = charFieldPrecisions[i];
+            final int length = charFieldLengths[i];
             final BinaryStringData stringData = (BinaryStringData) rowData.getString(fieldIdx);
             final int sourceStrLength = stringData.numChars();
 
-            if (charFieldShouldPad.get(i) && sourceStrLength < precision) {
+            if (charFieldShouldPad.get(i) && sourceStrLength < length) {
                 if (updatedRowData == null) {
                     updatedRowData = new UpdatableRowData(rowData, allFieldNames.length);
                 }
                 final int srcSizeInBytes = stringData.getSizeInBytes();
-                final byte[] newString = new byte[srcSizeInBytes + precision - sourceStrLength];
+                final byte[] newString = new byte[srcSizeInBytes + length - sourceStrLength];
                 for (int j = srcSizeInBytes; j < newString.length; j++) {
                     newString[j] = (byte) 32; // space
                 }
                 SegmentsUtil.copyToBytes(stringData.getSegments(), 0, newString, 0, srcSizeInBytes);
                 updatedRowData.setField(fieldIdx, StringData.fromBytes(newString));
-            } else if (sourceStrLength > precision) {
+            } else if (sourceStrLength > length) {
                 if (updatedRowData == null) {
                     updatedRowData = new UpdatableRowData(rowData, allFieldNames.length);
                 }
-                updatedRowData.setField(fieldIdx, stringData.substring(0, precision));
+                updatedRowData.setField(fieldIdx, stringData.substring(0, length));
             }
         }
 
@@ -273,12 +273,12 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData>
     @Internal
     public static class CharFieldInfo {
         private final int fieldIdx;
-        private final Integer precision;
+        private final Integer length;
         private final boolean shouldPad;
 
-        public CharFieldInfo(int fieldIdx, @Nullable Integer precision, boolean shouldPad) {
+        public CharFieldInfo(int fieldIdx, @Nullable Integer length, boolean shouldPad) {
             this.fieldIdx = fieldIdx;
-            this.precision = precision;
+            this.length = length;
             this.shouldPad = shouldPad;
         }