You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/12/14 13:11:03 UTC

[flink] 02/04: [hotfix][table] Make use of VarCharType.STRING_TYPE

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b6ca017e095de4b055e58af195e1f0a00e312d6e
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Thu Dec 9 13:50:10 2021 +0100

    [hotfix][table] Make use of VarCharType.STRING_TYPE
    
    Replace occurrences of `new VarCharType(MAX_LENGTH()` with new constant
    `VarCharType.STRING_TYPE`.
---
 .../flink/table/types/logical/utils/LogicalTypeParser.java |  2 +-
 .../java/org/apache/flink/table/types/DataTypesTest.java   |  2 +-
 .../apache/flink/table/types/LogicalCommonTypeTest.java    |  4 ++--
 .../apache/flink/table/types/LogicalTypeParserTest.java    |  2 +-
 .../table/types/extraction/DataTypeExtractorTest.java      |  9 +++------
 .../flink/table/planner/plan/type/FlinkReturnTypes.java    |  4 ++--
 .../flink/table/planner/codegen/calls/StringCallGen.scala  |  2 +-
 .../flink/table/planner/codegen/SortCodeGeneratorTest.java |  2 +-
 .../org/apache/flink/table/api/batch/ExplainTest.scala     |  2 +-
 .../org/apache/flink/table/api/stream/ExplainTest.scala    |  2 +-
 .../flink/table/planner/calcite/FlinkTypeFactoryTest.scala |  6 +++---
 .../flink/table/planner/codegen/agg/AggTestBase.scala      |  4 ++--
 .../table/planner/codegen/agg/batch/BatchAggTestBase.scala |  2 +-
 .../codegen/agg/batch/HashAggCodeGeneratorTest.scala       |  2 +-
 .../codegen/agg/batch/SortAggCodeGeneratorTest.scala       |  4 ++--
 .../planner/expressions/utils/ExpressionTestBase.scala     |  2 +-
 .../table/planner/plan/batch/sql/DagOptimizationTest.scala |  2 +-
 .../table/planner/plan/metadata/MetadataTestUtil.scala     |  6 +++---
 .../planner/plan/stream/sql/DagOptimizationTest.scala      |  2 +-
 .../table/planner/plan/stream/sql/LegacySinkTest.scala     |  2 +-
 .../plan/stream/sql/MiniBatchIntervalInferTest.scala       |  2 +-
 .../runtime/batch/sql/PartitionableSinkITCase.scala        |  2 +-
 .../table/planner/runtime/batch/sql/UnionITCase.scala      |  2 +-
 .../table/planner/runtime/stream/sql/CalcITCase.scala      |  4 ++--
 .../org/apache/flink/table/data/BinaryArrayDataTest.java   |  3 +--
 .../org/apache/flink/table/data/BinaryRowDataTest.java     |  3 +--
 .../apache/flink/table/data/DataFormatConvertersTest.java  |  4 ++--
 .../aggregate/window/SlicingWindowAggOperatorTest.java     |  3 +--
 .../deduplicate/ProcTimeDeduplicateFunctionTestBase.java   |  3 +--
 .../deduplicate/RowTimeDeduplicateFunctionTestBase.java    |  3 +--
 .../window/RowTimeWindowDeduplicateOperatorTest.java       |  3 +--
 .../operators/join/RandomSortMergeInnerJoinTest.java       |  6 +++---
 .../operators/join/String2HashJoinOperatorTest.java        | 14 ++++++--------
 .../operators/join/String2SortMergeJoinOperatorTest.java   | 12 +++++-------
 .../join/interval/TimeIntervalStreamJoinTestBase.java      |  6 +++---
 .../join/temporal/TemporalProcessTimeJoinOperatorTest.java |  6 +++---
 .../join/temporal/TemporalTimeJoinOperatorTestBase.java    | 12 +++++-------
 .../operators/join/window/WindowJoinOperatorTest.java      |  6 +++---
 .../over/ProcTimeRangeBoundedPrecedingFunctionTest.java    |  2 +-
 .../runtime/operators/over/RowTimeOverWindowTestBase.java  |  4 +---
 .../table/runtime/operators/rank/TopNFunctionTestBase.java |  8 ++------
 .../operators/rank/window/WindowRankOperatorTest.java      |  5 ++---
 .../runtime/operators/sort/ProcTimeSortOperatorTest.java   |  5 +----
 .../runtime/operators/sort/RowTimeSortOperatorTest.java    | 10 ++--------
 .../runtime/operators/sort/StreamSortOperatorTest.java     |  2 +-
 .../operators/window/WindowOperatorContractTest.java       |  3 +--
 .../table/runtime/operators/window/WindowOperatorTest.java | 11 ++++-------
 .../table/runtime/types/DataTypePrecisionFixerTest.java    |  2 +-
 .../table/runtime/typeutils/RowDataSerializerTest.java     |  6 +++---
 .../util/collections/binary/BytesHashMapTestBase.java      |  2 +-
 .../util/collections/binary/BytesMultiMapTestBase.java     |  4 ++--
 51 files changed, 93 insertions(+), 128 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
index 1c69d77..15b5daa 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
@@ -528,7 +528,7 @@ public final class LogicalTypeParser {
                 case VARCHAR:
                     return parseVarCharType();
                 case STRING:
-                    return new VarCharType(VarCharType.MAX_LENGTH);
+                    return VarCharType.STRING_TYPE;
                 case BOOLEAN:
                     return new BooleanType();
                 case BINARY:
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
index 9d5288e..a658285 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
@@ -118,7 +118,7 @@ public class DataTypesTest {
                         .expectLogicalType(new VarCharType(2))
                         .expectConversionClass(String.class),
                 TestSpec.forDataType(STRING())
-                        .expectLogicalType(new VarCharType(VarCharType.MAX_LENGTH))
+                        .expectLogicalType(VarCharType.STRING_TYPE)
                         .expectConversionClass(String.class),
                 TestSpec.forDataType(BOOLEAN())
                         .expectLogicalType(new BooleanType())
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalCommonTypeTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalCommonTypeTest.java
index 29745ec..ad76ea6 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalCommonTypeTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalCommonTypeTest.java
@@ -141,8 +141,8 @@ public class LogicalCommonTypeTest {
 
                     // VARCHAR types of different length
                     {
-                        Arrays.asList(new VarCharType(2), new VarCharType(VarCharType.MAX_LENGTH)),
-                        new VarCharType(VarCharType.MAX_LENGTH)
+                        Arrays.asList(new VarCharType(2), VarCharType.STRING_TYPE),
+                        VarCharType.STRING_TYPE
                     },
 
                     // mixed VARCHAR and CHAR types
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java
index 1a58475..88758f2 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java
@@ -91,7 +91,7 @@ public class LogicalTypeParserTest {
                 TestSpec.forString("CHAR(33)").expectType(new CharType(33)),
                 TestSpec.forString("VARCHAR").expectType(new VarCharType()),
                 TestSpec.forString("VARCHAR(33)").expectType(new VarCharType(33)),
-                TestSpec.forString("STRING").expectType(new VarCharType(VarCharType.MAX_LENGTH)),
+                TestSpec.forString("STRING").expectType(VarCharType.STRING_TYPE),
                 TestSpec.forString("BOOLEAN").expectType(new BooleanType()),
                 TestSpec.forString("BINARY").expectType(new BinaryType()),
                 TestSpec.forString("BINARY(33)").expectType(new BinaryType(33)),
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
index f023a50..dd58c82 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
@@ -619,8 +619,7 @@ public class DataTypeExtractorTest {
                         new StructuredAttribute("intField", new IntType(true)),
                         new StructuredAttribute("primitiveBooleanField", new BooleanType(false)),
                         new StructuredAttribute("primitiveIntField", new IntType(false)),
-                        new StructuredAttribute(
-                                "stringField", new VarCharType(VarCharType.MAX_LENGTH))));
+                        new StructuredAttribute("stringField", VarCharType.STRING_TYPE)));
         builder.setFinal(true);
         builder.setInstantiable(true);
         final StructuredType structuredType = builder.build();
@@ -641,9 +640,7 @@ public class DataTypeExtractorTest {
         builder.attributes(
                 Arrays.asList(
                         new StructuredAttribute(
-                                "mapField",
-                                new MapType(
-                                        new VarCharType(VarCharType.MAX_LENGTH), new IntType())),
+                                "mapField", new MapType(VarCharType.STRING_TYPE, new IntType())),
                         new StructuredAttribute(
                                 "simplePojoField",
                                 getSimplePojoDataType(simplePojoClass).getLogicalType()),
@@ -700,7 +697,7 @@ public class DataTypeExtractorTest {
         final StructuredType.Builder builder = StructuredType.newBuilder(Tuple2.class);
         builder.attributes(
                 Arrays.asList(
-                        new StructuredAttribute("f0", new VarCharType(VarCharType.MAX_LENGTH)),
+                        new StructuredAttribute("f0", VarCharType.STRING_TYPE),
                         new StructuredAttribute("f1", new BooleanType())));
         builder.setFinal(true);
         builder.setInstantiable(true);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/type/FlinkReturnTypes.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/type/FlinkReturnTypes.java
index b495a98..1ff51b4 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/type/FlinkReturnTypes.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/type/FlinkReturnTypes.java
@@ -127,6 +127,6 @@ public class FlinkReturnTypes {
                             ((FlinkTypeFactory) factory)
                                     .createFieldTypeFromLogicalType(
                                             new MapType(
-                                                    new VarCharType(VarCharType.MAX_LENGTH),
-                                                    new VarCharType(VarCharType.MAX_LENGTH))));
+                                                    VarCharType.STRING_TYPE,
+                                                    VarCharType.STRING_TYPE)));
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index b605ddc..7619486 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -765,7 +765,7 @@ object StringCallGen {
       operands: Seq[GeneratedExpression]): GeneratedExpression = {
     val className = classOf[SqlFunctionUtils].getCanonicalName
     val t = new MapType(
-      new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH))
+      VarCharType.STRING_TYPE, VarCharType.STRING_TYPE)
     val converter = DataFormatConverters.getConverterForDataType(
       DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
     val converterTerm = ctx.addReusableObject(converter, "mapConverter")
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
index 8a26c3e..931b0ec 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
@@ -105,7 +105,7 @@ public class SortCodeGeneratorTest {
                 new BigIntType(),
                 new FloatType(),
                 new DoubleType(),
-                new VarCharType(VarCharType.MAX_LENGTH),
+                VarCharType.STRING_TYPE,
                 new DecimalType(18, 2),
                 new DecimalType(38, 18),
                 new VarBinaryType(VarBinaryType.MAX_LENGTH),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
index cd20033..8314f67 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
@@ -43,7 +43,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
   util.addDataStream[(Int, Long, String)]("MyTable1", 'a, 'b, 'c)
   util.addDataStream[(Int, Long, String)]("MyTable2", 'd, 'e, 'f)
 
-  val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+  val STRING = VarCharType.STRING_TYPE
   val LONG = new BigIntType()
   val INT = new IntType()
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index cadbaa5..a66db23 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -46,7 +46,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
   util.addDataStream[(Int, Long, String)]("MyTable1", 'a, 'b, 'c)
   util.addDataStream[(Int, Long, String)]("MyTable2", 'd, 'e, 'f)
 
-  val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+  val STRING = VarCharType.STRING_TYPE
   val LONG = new BigIntType()
   val INT = new IntType()
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactoryTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactoryTest.scala
index d8be0ea..d6c8c7e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactoryTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactoryTest.scala
@@ -69,7 +69,7 @@ class FlinkTypeFactoryTest {
     test(new NullType())
     test(new BooleanType())
     test(new TinyIntType())
-    test(new VarCharType(VarCharType.MAX_LENGTH))
+    test(VarCharType.STRING_TYPE)
     test(new DoubleType())
     test(new FloatType())
     test(new IntType())
@@ -82,8 +82,8 @@ class FlinkTypeFactoryTest {
     test(new LocalZonedTimestampType(3))
 
     test(new ArrayType(new DoubleType()))
-    test(new MapType(new DoubleType(), new VarCharType(VarCharType.MAX_LENGTH)))
-    test(RowType.of(new DoubleType(), new VarCharType(VarCharType.MAX_LENGTH)))
+    test(new MapType(new DoubleType(), VarCharType.STRING_TYPE))
+    test(RowType.of(new DoubleType(), VarCharType.STRING_TYPE))
     test(new RawType[DayOfWeek](
       classOf[DayOfWeek],
       new KryoSerializer[DayOfWeek](classOf[DayOfWeek], new ExecutionConfig)))
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala
index cd4d0d5..3fac689 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala
@@ -57,8 +57,8 @@ abstract class AggTestBase(isBatchMode: Boolean) {
   private val planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
   val inputNames = Array("f0", "f1", "f2", "f3", "f4")
   val inputTypes: Array[LogicalType] = Array(
-    new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new DoubleType(), new BigIntType(),
-    new VarCharType(VarCharType.MAX_LENGTH))
+    VarCharType.STRING_TYPE, new BigIntType(), new DoubleType(), new BigIntType(),
+    VarCharType.STRING_TYPE)
   val inputType: RowType = RowType.of(inputTypes, inputNames)
 
   val relBuilder: RelBuilder = planner.getRelBuilder.values(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala
index d71a2a3..c5bf13b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala
@@ -43,7 +43,7 @@ abstract class BatchAggTestBase extends AggTestBase(isBatchMode = true) {
 
   val globalOutputType = RowType.of(
     Array[LogicalType](
-      new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH),
+      VarCharType.STRING_TYPE, VarCharType.STRING_TYPE,
       new BigIntType(),
       new DoubleType(),
       new BigIntType()),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
index 720263b..62ef130 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
@@ -35,7 +35,7 @@ class HashAggCodeGeneratorTest extends BatchAggTestBase {
 
   val localOutputType = RowType.of(
     Array[LogicalType](
-      new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH),
+      VarCharType.STRING_TYPE, VarCharType.STRING_TYPE,
       new BigIntType(), new BigIntType(),
       new DoubleType(), new BigIntType(),
       new BigIntType(), new BigIntType()),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala
index f69766f1..90f981c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala
@@ -32,7 +32,7 @@ class SortAggCodeGeneratorTest extends BatchAggTestBase {
 
   val localOutputType = RowType.of(
     Array[LogicalType](
-      new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH),
+      VarCharType.STRING_TYPE, VarCharType.STRING_TYPE,
       new BigIntType(), new BigIntType(),
       new DoubleType(), new BigIntType(),
       fromTypeInfoToLogicalType(imperativeAggFunc.getAccumulatorType)),
@@ -95,7 +95,7 @@ class SortAggCodeGeneratorTest extends BatchAggTestBase {
     : (CodeGenOperatorFactory[RowData], RowType, RowType) = {
     val localOutputType = RowType.of(
       Array[LogicalType](
-        new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH),
+        VarCharType.STRING_TYPE, VarCharType.STRING_TYPE,
         new BigIntType(), new BigIntType(),
         new DoubleType(), new BigIntType(),
         fromTypeInfoToLogicalType(imperativeAggFunc.getAccumulatorType)),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index b62f158..48898e6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -374,7 +374,7 @@ abstract class ExpressionTestBase {
 
     // generate code
     val resultType = RowType.of(Seq.fill(rexNodes.size)(
-      new VarCharType(VarCharType.MAX_LENGTH)): _*)
+      VarCharType.STRING_TYPE): _*)
 
     val exprs = stringTestExprs.map(exprGenerator.generateExpression)
     val genExpr = exprGenerator.generateResultExpression(exprs, resultType, classOf[BinaryRowData])
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
index 50701b9..b6fc21e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
@@ -34,7 +34,7 @@ class DagOptimizationTest extends TableTestBase {
   util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
   util.addTableSource[(Int, Long, String)]("MyTable1", 'd, 'e, 'f)
 
-  val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+  val STRING = VarCharType.STRING_TYPE
   val LONG = new BigIntType()
   val INT = new IntType()
   val DOUBLE = new DoubleType()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
index f12993f..4759afa 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
@@ -198,7 +198,7 @@ object MetadataTestUtil {
     val fieldNames = Array("a", "b", "c", "proctime", "rowtime")
     val fieldTypes = Array[LogicalType](
       new BigIntType(),
-      new VarCharType(VarCharType.MAX_LENGTH),
+      VarCharType.STRING_TYPE,
       new IntType(),
       new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3),
       new TimestampType(true, TimestampKind.ROWTIME, 3))
@@ -217,7 +217,7 @@ object MetadataTestUtil {
     val fieldNames = Array("a", "b", "c", "proctime", "rowtime")
     val fieldTypes = Array[LogicalType](
       new BigIntType(),
-      new VarCharType(VarCharType.MAX_LENGTH),
+      VarCharType.STRING_TYPE,
       new IntType(),
       new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3),
       new TimestampType(true, TimestampKind.ROWTIME, 3))
@@ -238,7 +238,7 @@ object MetadataTestUtil {
     val fieldTypes = Array[LogicalType](
       new IntType(),
       new BigIntType(),
-      new VarCharType(VarCharType.MAX_LENGTH),
+      VarCharType.STRING_TYPE,
       new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3),
       new TimestampType(true, TimestampKind.ROWTIME, 3))
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala
index 4502232..414b044 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala
@@ -32,7 +32,7 @@ class DagOptimizationTest extends TableTestBase {
   util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
   util.addTableSource[(Int, Long, String)]("MyTable1", 'd, 'e, 'f)
 
-  val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+  val STRING = VarCharType.STRING_TYPE
   val LONG = new BigIntType()
   val INT = new IntType()
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala
index b952a9c..6244402 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala
@@ -31,7 +31,7 @@ class LegacySinkTest extends TableTestBase {
   private val util = streamTestUtil()
   util.addDataStream[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
-  val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+  val STRING = VarCharType.STRING_TYPE
   val LONG = new BigIntType()
   val INT = new IntType()
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
index eb2f7a3..0cf621c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
@@ -34,7 +34,7 @@ import org.junit.{Before, Test}
 class MiniBatchIntervalInferTest extends TableTestBase {
   private val util = streamTestUtil()
 
-  val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+  val STRING = VarCharType.STRING_TYPE
   val LONG = new BigIntType()
   val INT = new IntType()
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
index bdba353..13bced6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -255,7 +255,7 @@ object PartitionableSinkITCase {
   }
 
   val fieldNames = Array("a", "b", "c")
-  val dataType = Array(new IntType(), new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH))
+  val dataType = Array(new IntType(), new BigIntType(), VarCharType.STRING_TYPE)
   val dataNullables = Array(true, true, true)
 
   val testData = Seq(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
index b7a4302..27451d4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
@@ -33,7 +33,7 @@ import scala.collection.Seq
 class UnionITCase extends BatchTestBase {
 
   val type6 = InternalTypeInfo.ofFields(
-    new IntType(), new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH))
+    new IntType(), new BigIntType(), VarCharType.STRING_TYPE)
 
   val data6 = Seq(
     binaryRow(type6.toRowFieldTypes, 1, 1L, fromString("Hi")),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index 1d887a1..b5e55ae 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -152,8 +152,8 @@ class CalcITCase extends StreamingTestBase {
     tEnv.registerTable("MyTableRow", t)
 
     val outputType = InternalTypeInfo.ofFields(
-      new VarCharType(VarCharType.MAX_LENGTH),
-      new VarCharType(VarCharType.MAX_LENGTH),
+      VarCharType.STRING_TYPE,
+      VarCharType.STRING_TYPE,
       new IntType())
 
     val result = tEnv.sqlQuery(sqlQuery).toAppendStream[RowData]
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java
index e7d5773..fb8a5fc 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java
@@ -501,8 +501,7 @@ public class BinaryArrayDataTest {
         writer.writeRow(
                 0,
                 GenericRowData.of(fromString("1"), 1),
-                new RowDataSerializer(
-                        RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType())));
+                new RowDataSerializer(RowType.of(VarCharType.STRING_TYPE, new IntType())));
         writer.setNullAt(1);
         writer.complete();
 
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
index c0c91a4..1a200e8 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
@@ -484,8 +484,7 @@ public class BinaryRowDataTest {
         writer.writeRow(
                 0,
                 GenericRowData.of(fromString("1"), 1),
-                new RowDataSerializer(
-                        RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType())));
+                new RowDataSerializer(RowType.of(VarCharType.STRING_TYPE, new IntType())));
         writer.setNullAt(1);
         writer.complete();
 
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
index de0c65b..852633f 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
@@ -189,10 +189,10 @@ public class DataFormatConvertersTest {
         test(new RowTypeInfo(simpleTypes), new Row(simpleTypes.length));
         test(new RowTypeInfo(simpleTypes), Row.ofKind(RowKind.DELETE, simpleValues));
         test(
-                InternalTypeInfo.ofFields(new VarCharType(VarCharType.MAX_LENGTH), new IntType()),
+                InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new IntType()),
                 GenericRowData.of(StringData.fromString("hehe"), 111));
         test(
-                InternalTypeInfo.ofFields(new VarCharType(VarCharType.MAX_LENGTH), new IntType()),
+                InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new IntType()),
                 GenericRowData.of(null, null));
 
         test(new DecimalDataTypeInfo(10, 5), null);
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
index 8995185..1989701 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
@@ -110,8 +110,7 @@ public class SlicingWindowAggOperatorTest {
 
     private static final RowDataHarnessAssertor ASSERTER =
             new RowDataHarnessAssertor(
-                    OUTPUT_TYPES,
-                    new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
+                    OUTPUT_TYPES, new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
 
     @Test
     public void testEventTimeHoppingWindows() throws Exception {
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java
index 99771bc..cdac040 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java
@@ -37,8 +37,7 @@ abstract class ProcTimeDeduplicateFunctionTestBase {
 
     Time minTime = Time.milliseconds(10);
     InternalTypeInfo<RowData> inputRowType =
-            InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new IntType());
+            InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new BigIntType(), new IntType());
 
     int rowKeyIdx = 1;
     RowDataKeySelector rowKeySelector =
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
index 2a6cc8f..bec4ba1 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
@@ -40,8 +40,7 @@ abstract class RowTimeDeduplicateFunctionTestBase {
     protected final long miniBatchSize = 4L;
     protected Time minTtlTime = Time.milliseconds(10);
     protected InternalTypeInfo inputRowType =
-            InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH), new IntType(), new BigIntType());
+            InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new IntType(), new BigIntType());
     protected TypeSerializer<RowData> serializer = inputRowType.toSerializer();
     protected int rowTimeIndex = 2;
     protected int rowKeyIndex = 0;
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java
index 9c91345..386fd28 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java
@@ -84,8 +84,7 @@ public class RowTimeWindowDeduplicateOperatorTest {
 
     private static final RowDataHarnessAssertor ASSERTER =
             new RowDataHarnessAssertor(
-                    OUTPUT_TYPES,
-                    new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
+                    OUTPUT_TYPES, new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
 
     private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
     private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java
index 99f93fd..f73d224 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java
@@ -261,13 +261,13 @@ public class RandomSortMergeInnerJoinTest {
             boolean input1First)
             throws Exception {
         InternalTypeInfo<RowData> typeInfo =
-                InternalTypeInfo.ofFields(new IntType(), new VarCharType(VarCharType.MAX_LENGTH));
+                InternalTypeInfo.ofFields(new IntType(), VarCharType.STRING_TYPE);
         InternalTypeInfo<RowData> joinedInfo =
                 InternalTypeInfo.ofFields(
                         new IntType(),
-                        new VarCharType(VarCharType.MAX_LENGTH),
+                        VarCharType.STRING_TYPE,
                         new IntType(),
-                        new VarCharType(VarCharType.MAX_LENGTH));
+                        VarCharType.STRING_TYPE);
         final TwoInputStreamTaskTestHarness<BinaryRowData, BinaryRowData, JoinedRowData>
                 testHarness =
                         new TwoInputStreamTaskTestHarness<>(
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
index 88c06bc..31eb491 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
@@ -48,15 +48,13 @@ import java.util.concurrent.LinkedBlockingQueue;
 public class String2HashJoinOperatorTest implements Serializable {
 
     private InternalTypeInfo<RowData> typeInfo =
-            InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH));
+            InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, VarCharType.STRING_TYPE);
     private InternalTypeInfo<RowData> joinedInfo =
             InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH));
+                    VarCharType.STRING_TYPE,
+                    VarCharType.STRING_TYPE,
+                    VarCharType.STRING_TYPE,
+                    VarCharType.STRING_TYPE);
     private transient TwoInputStreamTaskTestHarness<BinaryRowData, BinaryRowData, JoinedRowData>
             testHarness;
     private ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -354,6 +352,6 @@ public class String2HashJoinOperatorTest implements Serializable {
                 20,
                 10000,
                 10000,
-                RowType.of(new VarCharType(VarCharType.MAX_LENGTH)));
+                RowType.of(VarCharType.STRING_TYPE));
     }
 }
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
index 6a6c5060..719cecf 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
@@ -60,15 +60,13 @@ public class String2SortMergeJoinOperatorTest {
 
     private boolean leftIsSmall;
     InternalTypeInfo<RowData> typeInfo =
-            InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH));
+            InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, VarCharType.STRING_TYPE);
     private InternalTypeInfo<RowData> joinedInfo =
             InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH));
+                    VarCharType.STRING_TYPE,
+                    VarCharType.STRING_TYPE,
+                    VarCharType.STRING_TYPE,
+                    VarCharType.STRING_TYPE);
 
     public String2SortMergeJoinOperatorTest(boolean leftIsSmall) {
         this.leftIsSmall = leftIsSmall;
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalStreamJoinTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalStreamJoinTestBase.java
index 6f92a34..034fe71 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalStreamJoinTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalStreamJoinTestBase.java
@@ -28,14 +28,14 @@ import org.apache.flink.table.types.logical.VarCharType;
 /** Base Test for all subclass of {@link TimeIntervalJoin}. */
 abstract class TimeIntervalStreamJoinTestBase {
     InternalTypeInfo<RowData> rowType =
-            InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH));
+            InternalTypeInfo.ofFields(new BigIntType(), VarCharType.STRING_TYPE);
 
     private InternalTypeInfo<RowData> outputRowType =
             InternalTypeInfo.ofFields(
                     new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH),
+                    VarCharType.STRING_TYPE,
                     new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH));
+                    VarCharType.STRING_TYPE);
     RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(outputRowType.toRowFieldTypes());
 
     protected String funcCode =
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java
index cc839c6..ff4e801 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java
@@ -43,7 +43,7 @@ public class TemporalProcessTimeJoinOperatorTest extends TemporalTimeJoinOperato
 
     private int keyIdx = 0;
     private InternalTypeInfo<RowData> rowType =
-            InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH));
+            InternalTypeInfo.ofFields(new BigIntType(), VarCharType.STRING_TYPE);
     private RowDataKeySelector keySelector =
             HandwrittenSelectorUtil.getRowDataSelector(
                     new int[] {keyIdx}, rowType.toRowFieldTypes());
@@ -51,9 +51,9 @@ public class TemporalProcessTimeJoinOperatorTest extends TemporalTimeJoinOperato
     private InternalTypeInfo<RowData> outputRowType =
             InternalTypeInfo.ofFields(
                     new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH),
+                    VarCharType.STRING_TYPE,
                     new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH));
+                    VarCharType.STRING_TYPE);
     private RowDataHarnessAssertor assertor =
             new RowDataHarnessAssertor(outputRowType.toRowFieldTypes());
 
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java
index 924e30e..8ba7a87 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java
@@ -46,17 +46,15 @@ abstract class TemporalTimeJoinOperatorTestBase {
             new GeneratedJoinCondition("TimeTemporalJoinCondition", funcCode, new Object[0]);
     protected InternalTypeInfo<RowData> rowType =
             InternalTypeInfo.ofFields(
-                    new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH));
+                    new BigIntType(), VarCharType.STRING_TYPE, VarCharType.STRING_TYPE);
     protected InternalTypeInfo<RowData> outputRowType =
             InternalTypeInfo.ofFields(
                     new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH),
+                    VarCharType.STRING_TYPE,
+                    VarCharType.STRING_TYPE,
                     new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new VarCharType(VarCharType.MAX_LENGTH));
+                    VarCharType.STRING_TYPE,
+                    VarCharType.STRING_TYPE);
     protected RowDataHarnessAssertor assertor =
             new RowDataHarnessAssertor(outputRowType.toRowFieldTypes());
     protected int keyIdx = 1;
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
index 3b7d093..762d434 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
@@ -50,14 +50,14 @@ import static org.junit.Assert.assertEquals;
 public class WindowJoinOperatorTest {
 
     private static final InternalTypeInfo<RowData> INPUT_ROW_TYPE =
-            InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH));
+            InternalTypeInfo.ofFields(new BigIntType(), VarCharType.STRING_TYPE);
 
     private static final InternalTypeInfo<RowData> OUTPUT_ROW_TYPE =
             InternalTypeInfo.ofFields(
                     new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH),
+                    VarCharType.STRING_TYPE,
                     new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH));
+                    VarCharType.STRING_TYPE);
 
     private static final RowDataHarnessAssertor ASSERTER =
             new RowDataHarnessAssertor(OUTPUT_ROW_TYPE.toRowFieldTypes());
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java
index 94d337c..a9718fc 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java
@@ -50,7 +50,7 @@ public class ProcTimeRangeBoundedPrecedingFunctionTest {
 
     private LogicalType[] inputFieldTypes =
             new LogicalType[] {
-                new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(),
+                VarCharType.STRING_TYPE, new BigIntType(),
             };
     private LogicalType[] accTypes = new LogicalType[] {new BigIntType()};
 
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java
index 689dec6..3f85eee 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java
@@ -42,9 +42,7 @@ public class RowTimeOverWindowTestBase {
             };
 
     protected LogicalType[] inputFieldTypes =
-            new LogicalType[] {
-                new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new BigIntType()
-            };
+            new LogicalType[] {VarCharType.STRING_TYPE, new BigIntType(), new BigIntType()};
     protected LogicalType[] accTypes = new LogicalType[] {new BigIntType()};
 
     protected RowDataKeySelector keySelector =
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java
index aad99bc..c38e12c 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java
@@ -58,8 +58,7 @@ abstract class TopNFunctionTestBase {
     long cacheSize = 10000L;
 
     InternalTypeInfo<RowData> inputRowType =
-            InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new IntType());
+            InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new BigIntType(), new IntType());
 
     static GeneratedRecordComparator generatedSortKeyComparator =
             new GeneratedRecordComparator("", "", new Object[0]) {
@@ -107,10 +106,7 @@ abstract class TopNFunctionTestBase {
 
     private InternalTypeInfo<RowData> outputTypeWithRowNumber =
             InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new BigIntType(),
-                    new IntType(),
-                    new BigIntType());
+                    VarCharType.STRING_TYPE, new BigIntType(), new IntType(), new BigIntType());
 
     RowDataHarnessAssertor assertorWithoutRowNumber =
             new RowDataHarnessAssertor(
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
index 087cc4c..60f686e 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
@@ -106,8 +106,7 @@ public class WindowRankOperatorTest {
 
     private static final RowDataHarnessAssertor ASSERTER =
             new RowDataHarnessAssertor(
-                    OUTPUT_TYPES,
-                    new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
+                    OUTPUT_TYPES, new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
 
     private static final LogicalType[] OUTPUT_TYPES_WITHOUT_RANK_NUMBER =
             new LogicalType[] {new VarCharType(Integer.MAX_VALUE), new IntType(), new BigIntType()};
@@ -118,7 +117,7 @@ public class WindowRankOperatorTest {
     private static final RowDataHarnessAssertor ASSERTER_WITHOUT_RANK_NUMBER =
             new RowDataHarnessAssertor(
                     OUTPUT_TYPES_WITHOUT_RANK_NUMBER,
-                    new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
+                    new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
 
     private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
     private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/ProcTimeSortOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/ProcTimeSortOperatorTest.java
index e42a1ec..fa29703 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/ProcTimeSortOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/ProcTimeSortOperatorTest.java
@@ -43,10 +43,7 @@ public class ProcTimeSortOperatorTest {
 
     private InternalTypeInfo<RowData> inputRowType =
             InternalTypeInfo.ofFields(
-                    new IntType(),
-                    new BigIntType(),
-                    new VarCharType(VarCharType.MAX_LENGTH),
-                    new IntType());
+                    new IntType(), new BigIntType(), VarCharType.STRING_TYPE, new IntType());
 
     private GeneratedRecordComparator gComparator =
             new GeneratedRecordComparator("", "", new Object[0]) {
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/RowTimeSortOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/RowTimeSortOperatorTest.java
index 7acaaa5..53cf10b 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/RowTimeSortOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/RowTimeSortOperatorTest.java
@@ -46,10 +46,7 @@ public class RowTimeSortOperatorTest {
     public void testSortOnTwoFields() throws Exception {
         InternalTypeInfo<RowData> inputRowType =
                 InternalTypeInfo.ofFields(
-                        new IntType(),
-                        new BigIntType(),
-                        new VarCharType(VarCharType.MAX_LENGTH),
-                        new IntType());
+                        new IntType(), new BigIntType(), VarCharType.STRING_TYPE, new IntType());
 
         // Note: RowTimeIdx must be 0 in product environment, the value is 1 here just for simplify
         // the testing
@@ -134,10 +131,7 @@ public class RowTimeSortOperatorTest {
     public void testOnlySortOnRowTime() throws Exception {
         InternalTypeInfo<RowData> inputRowType =
                 InternalTypeInfo.ofFields(
-                        new BigIntType(),
-                        new BigIntType(),
-                        new VarCharType(VarCharType.MAX_LENGTH),
-                        new IntType());
+                        new BigIntType(), new BigIntType(), VarCharType.STRING_TYPE, new IntType());
         int rowTimeIdx = 0;
         RowDataHarnessAssertor assertor =
                 new RowDataHarnessAssertor(inputRowType.toRowFieldTypes());
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java
index e535ede..5d4879c 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java
@@ -40,7 +40,7 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord
 public class StreamSortOperatorTest {
 
     private InternalTypeInfo<RowData> inputRowType =
-            InternalTypeInfo.ofFields(new VarCharType(VarCharType.MAX_LENGTH), new IntType());
+            InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new IntType());
 
     private GeneratedRecordComparator sortKeyComparator =
             new GeneratedRecordComparator("", "", new Object[0]) {
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorContractTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorContractTest.java
index e8308f8..76db205 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorContractTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorContractTest.java
@@ -195,8 +195,7 @@ public class WindowOperatorContractTest {
                     long allowedLateness)
                     throws Exception {
 
-        LogicalType[] inputTypes =
-                new LogicalType[] {new VarCharType(VarCharType.MAX_LENGTH), new IntType()};
+        LogicalType[] inputTypes = new LogicalType[] {VarCharType.STRING_TYPE, new IntType()};
         RowDataKeySelector keySelector =
                 HandwrittenSelectorUtil.getRowDataSelector(new int[] {0}, inputTypes);
         TypeInformation<RowData> keyType = keySelector.getProducedType();
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorTest.java
index ed0e75c..6235352 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorTest.java
@@ -123,13 +123,11 @@ public class WindowOperatorTest {
     private static AtomicInteger closeCalled = new AtomicInteger(0);
 
     private LogicalType[] inputFieldTypes =
-            new LogicalType[] {
-                new VarCharType(VarCharType.MAX_LENGTH), new IntType(), new BigIntType()
-            };
+            new LogicalType[] {VarCharType.STRING_TYPE, new IntType(), new BigIntType()};
 
     private InternalTypeInfo<RowData> outputType =
             InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH),
+                    VarCharType.STRING_TYPE,
                     new BigIntType(),
                     new BigIntType(),
                     new BigIntType(),
@@ -147,7 +145,7 @@ public class WindowOperatorTest {
     private RowDataHarnessAssertor assertor =
             new RowDataHarnessAssertor(
                     outputType.toRowFieldTypes(),
-                    new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
+                    new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
 
     private ConcurrentLinkedQueue<Object> doubleRecord(
             boolean isDouble, StreamRecord<RowData> record) {
@@ -1543,8 +1541,7 @@ public class WindowOperatorTest {
         RowDataHarnessAssertor assertor =
                 new RowDataHarnessAssertor(
                         outputType.toRowFieldTypes(),
-                        new GenericRowRecordSortComparator(
-                                0, new VarCharType(VarCharType.MAX_LENGTH)));
+                        new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
 
         ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java
index d8eb34c..3bf9e4c 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java
@@ -82,7 +82,7 @@ public class DataTypePrecisionFixerTest {
                         .logicalType(new LocalZonedTimestampType(2))
                         .expect(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(2)),
                 TestSpecs.fix(Types.STRING)
-                        .logicalType(new VarCharType(VarCharType.MAX_LENGTH))
+                        .logicalType(VarCharType.STRING_TYPE)
                         .expect(DataTypes.STRING()),
 
                 // nested
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
index 1e0aa14..d87a5f0 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
@@ -94,7 +94,7 @@ public class RowDataSerializerTest extends SerializerTestInstance<RowData> {
 
     private static Object[] testRowDataSerializer() {
         InternalTypeInfo<RowData> typeInfo =
-                InternalTypeInfo.ofFields(new IntType(), new VarCharType(VarCharType.MAX_LENGTH));
+                InternalTypeInfo.ofFields(new IntType(), VarCharType.STRING_TYPE);
         GenericRowData row1 = new GenericRowData(2);
         row1.setField(0, 1);
         row1.setField(1, fromString("a"));
@@ -122,7 +122,7 @@ public class RowDataSerializerTest extends SerializerTestInstance<RowData> {
                         new IntType(),
                         new IntType(),
                         new IntType(),
-                        new VarCharType(VarCharType.MAX_LENGTH));
+                        VarCharType.STRING_TYPE);
 
         GenericRowData row = new GenericRowData(13);
         row.setField(0, 2);
@@ -147,7 +147,7 @@ public class RowDataSerializerTest extends SerializerTestInstance<RowData> {
                 InternalTypeInfo.ofFields(
                         new IntType(),
                         new DoubleType(),
-                        new VarCharType(VarCharType.MAX_LENGTH),
+                        VarCharType.STRING_TYPE,
                         new ArrayType(new IntType()),
                         new MapType(new IntType(), new IntType()));
 
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
index a599f29..9d50c32 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
@@ -53,7 +53,7 @@ public abstract class BytesHashMapTestBase<K> extends BytesMapTestBase {
     static final LogicalType[] KEY_TYPES =
             new LogicalType[] {
                 new IntType(),
-                new VarCharType(VarCharType.MAX_LENGTH),
+                VarCharType.STRING_TYPE,
                 new DoubleType(),
                 new BigIntType(),
                 new BooleanType(),
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java
index 22dca2d..c4a5d63 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java
@@ -50,7 +50,7 @@ public abstract class BytesMultiMapTestBase<K> extends BytesMapTestBase {
     static final LogicalType[] KEY_TYPES =
             new LogicalType[] {
                 new IntType(),
-                new VarCharType(VarCharType.MAX_LENGTH),
+                VarCharType.STRING_TYPE,
                 new DoubleType(),
                 new BigIntType(),
                 new BooleanType(),
@@ -60,7 +60,7 @@ public abstract class BytesMultiMapTestBase<K> extends BytesMapTestBase {
 
     static final LogicalType[] VALUE_TYPES =
             new LogicalType[] {
-                new VarCharType(VarCharType.MAX_LENGTH), new IntType(),
+                VarCharType.STRING_TYPE, new IntType(),
             };
 
     protected final PagedTypeSerializer<K> keySerializer;