You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/12/14 13:11:03 UTC
[flink] 02/04: [hotfix][table] Make use of VarCharType.STRING_TYPE
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b6ca017e095de4b055e58af195e1f0a00e312d6e
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Thu Dec 9 13:50:10 2021 +0100
[hotfix][table] Make use of VarCharType.STRING_TYPE
Replace occurrences of `new VarCharType(MAX_LENGTH()` with new constant
`VarCharType.STRING_TYPE`.
---
.../flink/table/types/logical/utils/LogicalTypeParser.java | 2 +-
.../java/org/apache/flink/table/types/DataTypesTest.java | 2 +-
.../apache/flink/table/types/LogicalCommonTypeTest.java | 4 ++--
.../apache/flink/table/types/LogicalTypeParserTest.java | 2 +-
.../table/types/extraction/DataTypeExtractorTest.java | 9 +++------
.../flink/table/planner/plan/type/FlinkReturnTypes.java | 4 ++--
.../flink/table/planner/codegen/calls/StringCallGen.scala | 2 +-
.../flink/table/planner/codegen/SortCodeGeneratorTest.java | 2 +-
.../org/apache/flink/table/api/batch/ExplainTest.scala | 2 +-
.../org/apache/flink/table/api/stream/ExplainTest.scala | 2 +-
.../flink/table/planner/calcite/FlinkTypeFactoryTest.scala | 6 +++---
.../flink/table/planner/codegen/agg/AggTestBase.scala | 4 ++--
.../table/planner/codegen/agg/batch/BatchAggTestBase.scala | 2 +-
.../codegen/agg/batch/HashAggCodeGeneratorTest.scala | 2 +-
.../codegen/agg/batch/SortAggCodeGeneratorTest.scala | 4 ++--
.../planner/expressions/utils/ExpressionTestBase.scala | 2 +-
.../table/planner/plan/batch/sql/DagOptimizationTest.scala | 2 +-
.../table/planner/plan/metadata/MetadataTestUtil.scala | 6 +++---
.../planner/plan/stream/sql/DagOptimizationTest.scala | 2 +-
.../table/planner/plan/stream/sql/LegacySinkTest.scala | 2 +-
.../plan/stream/sql/MiniBatchIntervalInferTest.scala | 2 +-
.../runtime/batch/sql/PartitionableSinkITCase.scala | 2 +-
.../table/planner/runtime/batch/sql/UnionITCase.scala | 2 +-
.../table/planner/runtime/stream/sql/CalcITCase.scala | 4 ++--
.../org/apache/flink/table/data/BinaryArrayDataTest.java | 3 +--
.../org/apache/flink/table/data/BinaryRowDataTest.java | 3 +--
.../apache/flink/table/data/DataFormatConvertersTest.java | 4 ++--
.../aggregate/window/SlicingWindowAggOperatorTest.java | 3 +--
.../deduplicate/ProcTimeDeduplicateFunctionTestBase.java | 3 +--
.../deduplicate/RowTimeDeduplicateFunctionTestBase.java | 3 +--
.../window/RowTimeWindowDeduplicateOperatorTest.java | 3 +--
.../operators/join/RandomSortMergeInnerJoinTest.java | 6 +++---
.../operators/join/String2HashJoinOperatorTest.java | 14 ++++++--------
.../operators/join/String2SortMergeJoinOperatorTest.java | 12 +++++-------
.../join/interval/TimeIntervalStreamJoinTestBase.java | 6 +++---
.../join/temporal/TemporalProcessTimeJoinOperatorTest.java | 6 +++---
.../join/temporal/TemporalTimeJoinOperatorTestBase.java | 12 +++++-------
.../operators/join/window/WindowJoinOperatorTest.java | 6 +++---
.../over/ProcTimeRangeBoundedPrecedingFunctionTest.java | 2 +-
.../runtime/operators/over/RowTimeOverWindowTestBase.java | 4 +---
.../table/runtime/operators/rank/TopNFunctionTestBase.java | 8 ++------
.../operators/rank/window/WindowRankOperatorTest.java | 5 ++---
.../runtime/operators/sort/ProcTimeSortOperatorTest.java | 5 +----
.../runtime/operators/sort/RowTimeSortOperatorTest.java | 10 ++--------
.../runtime/operators/sort/StreamSortOperatorTest.java | 2 +-
.../operators/window/WindowOperatorContractTest.java | 3 +--
.../table/runtime/operators/window/WindowOperatorTest.java | 11 ++++-------
.../table/runtime/types/DataTypePrecisionFixerTest.java | 2 +-
.../table/runtime/typeutils/RowDataSerializerTest.java | 6 +++---
.../util/collections/binary/BytesHashMapTestBase.java | 2 +-
.../util/collections/binary/BytesMultiMapTestBase.java | 4 ++--
51 files changed, 93 insertions(+), 128 deletions(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
index 1c69d77..15b5daa 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
@@ -528,7 +528,7 @@ public final class LogicalTypeParser {
case VARCHAR:
return parseVarCharType();
case STRING:
- return new VarCharType(VarCharType.MAX_LENGTH);
+ return VarCharType.STRING_TYPE;
case BOOLEAN:
return new BooleanType();
case BINARY:
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
index 9d5288e..a658285 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
@@ -118,7 +118,7 @@ public class DataTypesTest {
.expectLogicalType(new VarCharType(2))
.expectConversionClass(String.class),
TestSpec.forDataType(STRING())
- .expectLogicalType(new VarCharType(VarCharType.MAX_LENGTH))
+ .expectLogicalType(VarCharType.STRING_TYPE)
.expectConversionClass(String.class),
TestSpec.forDataType(BOOLEAN())
.expectLogicalType(new BooleanType())
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalCommonTypeTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalCommonTypeTest.java
index 29745ec..ad76ea6 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalCommonTypeTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalCommonTypeTest.java
@@ -141,8 +141,8 @@ public class LogicalCommonTypeTest {
// VARCHAR types of different length
{
- Arrays.asList(new VarCharType(2), new VarCharType(VarCharType.MAX_LENGTH)),
- new VarCharType(VarCharType.MAX_LENGTH)
+ Arrays.asList(new VarCharType(2), VarCharType.STRING_TYPE),
+ VarCharType.STRING_TYPE
},
// mixed VARCHAR and CHAR types
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java
index 1a58475..88758f2 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java
@@ -91,7 +91,7 @@ public class LogicalTypeParserTest {
TestSpec.forString("CHAR(33)").expectType(new CharType(33)),
TestSpec.forString("VARCHAR").expectType(new VarCharType()),
TestSpec.forString("VARCHAR(33)").expectType(new VarCharType(33)),
- TestSpec.forString("STRING").expectType(new VarCharType(VarCharType.MAX_LENGTH)),
+ TestSpec.forString("STRING").expectType(VarCharType.STRING_TYPE),
TestSpec.forString("BOOLEAN").expectType(new BooleanType()),
TestSpec.forString("BINARY").expectType(new BinaryType()),
TestSpec.forString("BINARY(33)").expectType(new BinaryType(33)),
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
index f023a50..dd58c82 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
@@ -619,8 +619,7 @@ public class DataTypeExtractorTest {
new StructuredAttribute("intField", new IntType(true)),
new StructuredAttribute("primitiveBooleanField", new BooleanType(false)),
new StructuredAttribute("primitiveIntField", new IntType(false)),
- new StructuredAttribute(
- "stringField", new VarCharType(VarCharType.MAX_LENGTH))));
+ new StructuredAttribute("stringField", VarCharType.STRING_TYPE)));
builder.setFinal(true);
builder.setInstantiable(true);
final StructuredType structuredType = builder.build();
@@ -641,9 +640,7 @@ public class DataTypeExtractorTest {
builder.attributes(
Arrays.asList(
new StructuredAttribute(
- "mapField",
- new MapType(
- new VarCharType(VarCharType.MAX_LENGTH), new IntType())),
+ "mapField", new MapType(VarCharType.STRING_TYPE, new IntType())),
new StructuredAttribute(
"simplePojoField",
getSimplePojoDataType(simplePojoClass).getLogicalType()),
@@ -700,7 +697,7 @@ public class DataTypeExtractorTest {
final StructuredType.Builder builder = StructuredType.newBuilder(Tuple2.class);
builder.attributes(
Arrays.asList(
- new StructuredAttribute("f0", new VarCharType(VarCharType.MAX_LENGTH)),
+ new StructuredAttribute("f0", VarCharType.STRING_TYPE),
new StructuredAttribute("f1", new BooleanType())));
builder.setFinal(true);
builder.setInstantiable(true);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/type/FlinkReturnTypes.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/type/FlinkReturnTypes.java
index b495a98..1ff51b4 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/type/FlinkReturnTypes.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/type/FlinkReturnTypes.java
@@ -127,6 +127,6 @@ public class FlinkReturnTypes {
((FlinkTypeFactory) factory)
.createFieldTypeFromLogicalType(
new MapType(
- new VarCharType(VarCharType.MAX_LENGTH),
- new VarCharType(VarCharType.MAX_LENGTH))));
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE)));
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index b605ddc..7619486 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -765,7 +765,7 @@ object StringCallGen {
operands: Seq[GeneratedExpression]): GeneratedExpression = {
val className = classOf[SqlFunctionUtils].getCanonicalName
val t = new MapType(
- new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH))
+ VarCharType.STRING_TYPE, VarCharType.STRING_TYPE)
val converter = DataFormatConverters.getConverterForDataType(
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
val converterTerm = ctx.addReusableObject(converter, "mapConverter")
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
index 8a26c3e..931b0ec 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
@@ -105,7 +105,7 @@ public class SortCodeGeneratorTest {
new BigIntType(),
new FloatType(),
new DoubleType(),
- new VarCharType(VarCharType.MAX_LENGTH),
+ VarCharType.STRING_TYPE,
new DecimalType(18, 2),
new DecimalType(38, 18),
new VarBinaryType(VarBinaryType.MAX_LENGTH),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
index cd20033..8314f67 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
@@ -43,7 +43,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
util.addDataStream[(Int, Long, String)]("MyTable1", 'a, 'b, 'c)
util.addDataStream[(Int, Long, String)]("MyTable2", 'd, 'e, 'f)
- val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+ val STRING = VarCharType.STRING_TYPE
val LONG = new BigIntType()
val INT = new IntType()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index cadbaa5..a66db23 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -46,7 +46,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
util.addDataStream[(Int, Long, String)]("MyTable1", 'a, 'b, 'c)
util.addDataStream[(Int, Long, String)]("MyTable2", 'd, 'e, 'f)
- val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+ val STRING = VarCharType.STRING_TYPE
val LONG = new BigIntType()
val INT = new IntType()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactoryTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactoryTest.scala
index d8be0ea..d6c8c7e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactoryTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactoryTest.scala
@@ -69,7 +69,7 @@ class FlinkTypeFactoryTest {
test(new NullType())
test(new BooleanType())
test(new TinyIntType())
- test(new VarCharType(VarCharType.MAX_LENGTH))
+ test(VarCharType.STRING_TYPE)
test(new DoubleType())
test(new FloatType())
test(new IntType())
@@ -82,8 +82,8 @@ class FlinkTypeFactoryTest {
test(new LocalZonedTimestampType(3))
test(new ArrayType(new DoubleType()))
- test(new MapType(new DoubleType(), new VarCharType(VarCharType.MAX_LENGTH)))
- test(RowType.of(new DoubleType(), new VarCharType(VarCharType.MAX_LENGTH)))
+ test(new MapType(new DoubleType(), VarCharType.STRING_TYPE))
+ test(RowType.of(new DoubleType(), VarCharType.STRING_TYPE))
test(new RawType[DayOfWeek](
classOf[DayOfWeek],
new KryoSerializer[DayOfWeek](classOf[DayOfWeek], new ExecutionConfig)))
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala
index cd4d0d5..3fac689 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala
@@ -57,8 +57,8 @@ abstract class AggTestBase(isBatchMode: Boolean) {
private val planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
val inputNames = Array("f0", "f1", "f2", "f3", "f4")
val inputTypes: Array[LogicalType] = Array(
- new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new DoubleType(), new BigIntType(),
- new VarCharType(VarCharType.MAX_LENGTH))
+ VarCharType.STRING_TYPE, new BigIntType(), new DoubleType(), new BigIntType(),
+ VarCharType.STRING_TYPE)
val inputType: RowType = RowType.of(inputTypes, inputNames)
val relBuilder: RelBuilder = planner.getRelBuilder.values(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala
index d71a2a3..c5bf13b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala
@@ -43,7 +43,7 @@ abstract class BatchAggTestBase extends AggTestBase(isBatchMode = true) {
val globalOutputType = RowType.of(
Array[LogicalType](
- new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH),
+ VarCharType.STRING_TYPE, VarCharType.STRING_TYPE,
new BigIntType(),
new DoubleType(),
new BigIntType()),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
index 720263b..62ef130 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
@@ -35,7 +35,7 @@ class HashAggCodeGeneratorTest extends BatchAggTestBase {
val localOutputType = RowType.of(
Array[LogicalType](
- new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH),
+ VarCharType.STRING_TYPE, VarCharType.STRING_TYPE,
new BigIntType(), new BigIntType(),
new DoubleType(), new BigIntType(),
new BigIntType(), new BigIntType()),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala
index f69766f1..90f981c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala
@@ -32,7 +32,7 @@ class SortAggCodeGeneratorTest extends BatchAggTestBase {
val localOutputType = RowType.of(
Array[LogicalType](
- new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH),
+ VarCharType.STRING_TYPE, VarCharType.STRING_TYPE,
new BigIntType(), new BigIntType(),
new DoubleType(), new BigIntType(),
fromTypeInfoToLogicalType(imperativeAggFunc.getAccumulatorType)),
@@ -95,7 +95,7 @@ class SortAggCodeGeneratorTest extends BatchAggTestBase {
: (CodeGenOperatorFactory[RowData], RowType, RowType) = {
val localOutputType = RowType.of(
Array[LogicalType](
- new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH),
+ VarCharType.STRING_TYPE, VarCharType.STRING_TYPE,
new BigIntType(), new BigIntType(),
new DoubleType(), new BigIntType(),
fromTypeInfoToLogicalType(imperativeAggFunc.getAccumulatorType)),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index b62f158..48898e6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -374,7 +374,7 @@ abstract class ExpressionTestBase {
// generate code
val resultType = RowType.of(Seq.fill(rexNodes.size)(
- new VarCharType(VarCharType.MAX_LENGTH)): _*)
+ VarCharType.STRING_TYPE): _*)
val exprs = stringTestExprs.map(exprGenerator.generateExpression)
val genExpr = exprGenerator.generateResultExpression(exprs, resultType, classOf[BinaryRowData])
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
index 50701b9..b6fc21e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala
@@ -34,7 +34,7 @@ class DagOptimizationTest extends TableTestBase {
util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
util.addTableSource[(Int, Long, String)]("MyTable1", 'd, 'e, 'f)
- val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+ val STRING = VarCharType.STRING_TYPE
val LONG = new BigIntType()
val INT = new IntType()
val DOUBLE = new DoubleType()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
index f12993f..4759afa 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
@@ -198,7 +198,7 @@ object MetadataTestUtil {
val fieldNames = Array("a", "b", "c", "proctime", "rowtime")
val fieldTypes = Array[LogicalType](
new BigIntType(),
- new VarCharType(VarCharType.MAX_LENGTH),
+ VarCharType.STRING_TYPE,
new IntType(),
new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3),
new TimestampType(true, TimestampKind.ROWTIME, 3))
@@ -217,7 +217,7 @@ object MetadataTestUtil {
val fieldNames = Array("a", "b", "c", "proctime", "rowtime")
val fieldTypes = Array[LogicalType](
new BigIntType(),
- new VarCharType(VarCharType.MAX_LENGTH),
+ VarCharType.STRING_TYPE,
new IntType(),
new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3),
new TimestampType(true, TimestampKind.ROWTIME, 3))
@@ -238,7 +238,7 @@ object MetadataTestUtil {
val fieldTypes = Array[LogicalType](
new IntType(),
new BigIntType(),
- new VarCharType(VarCharType.MAX_LENGTH),
+ VarCharType.STRING_TYPE,
new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3),
new TimestampType(true, TimestampKind.ROWTIME, 3))
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala
index 4502232..414b044 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala
@@ -32,7 +32,7 @@ class DagOptimizationTest extends TableTestBase {
util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
util.addTableSource[(Int, Long, String)]("MyTable1", 'd, 'e, 'f)
- val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+ val STRING = VarCharType.STRING_TYPE
val LONG = new BigIntType()
val INT = new IntType()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala
index b952a9c..6244402 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala
@@ -31,7 +31,7 @@ class LegacySinkTest extends TableTestBase {
private val util = streamTestUtil()
util.addDataStream[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+ val STRING = VarCharType.STRING_TYPE
val LONG = new BigIntType()
val INT = new IntType()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
index eb2f7a3..0cf621c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
@@ -34,7 +34,7 @@ import org.junit.{Before, Test}
class MiniBatchIntervalInferTest extends TableTestBase {
private val util = streamTestUtil()
- val STRING = new VarCharType(VarCharType.MAX_LENGTH)
+ val STRING = VarCharType.STRING_TYPE
val LONG = new BigIntType()
val INT = new IntType()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
index bdba353..13bced6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -255,7 +255,7 @@ object PartitionableSinkITCase {
}
val fieldNames = Array("a", "b", "c")
- val dataType = Array(new IntType(), new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH))
+ val dataType = Array(new IntType(), new BigIntType(), VarCharType.STRING_TYPE)
val dataNullables = Array(true, true, true)
val testData = Seq(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
index b7a4302..27451d4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
@@ -33,7 +33,7 @@ import scala.collection.Seq
class UnionITCase extends BatchTestBase {
val type6 = InternalTypeInfo.ofFields(
- new IntType(), new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH))
+ new IntType(), new BigIntType(), VarCharType.STRING_TYPE)
val data6 = Seq(
binaryRow(type6.toRowFieldTypes, 1, 1L, fromString("Hi")),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index 1d887a1..b5e55ae 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -152,8 +152,8 @@ class CalcITCase extends StreamingTestBase {
tEnv.registerTable("MyTableRow", t)
val outputType = InternalTypeInfo.ofFields(
- new VarCharType(VarCharType.MAX_LENGTH),
- new VarCharType(VarCharType.MAX_LENGTH),
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE,
new IntType())
val result = tEnv.sqlQuery(sqlQuery).toAppendStream[RowData]
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java
index e7d5773..fb8a5fc 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java
@@ -501,8 +501,7 @@ public class BinaryArrayDataTest {
writer.writeRow(
0,
GenericRowData.of(fromString("1"), 1),
- new RowDataSerializer(
- RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType())));
+ new RowDataSerializer(RowType.of(VarCharType.STRING_TYPE, new IntType())));
writer.setNullAt(1);
writer.complete();
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
index c0c91a4..1a200e8 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
@@ -484,8 +484,7 @@ public class BinaryRowDataTest {
writer.writeRow(
0,
GenericRowData.of(fromString("1"), 1),
- new RowDataSerializer(
- RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType())));
+ new RowDataSerializer(RowType.of(VarCharType.STRING_TYPE, new IntType())));
writer.setNullAt(1);
writer.complete();
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
index de0c65b..852633f 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
@@ -189,10 +189,10 @@ public class DataFormatConvertersTest {
test(new RowTypeInfo(simpleTypes), new Row(simpleTypes.length));
test(new RowTypeInfo(simpleTypes), Row.ofKind(RowKind.DELETE, simpleValues));
test(
- InternalTypeInfo.ofFields(new VarCharType(VarCharType.MAX_LENGTH), new IntType()),
+ InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new IntType()),
GenericRowData.of(StringData.fromString("hehe"), 111));
test(
- InternalTypeInfo.ofFields(new VarCharType(VarCharType.MAX_LENGTH), new IntType()),
+ InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new IntType()),
GenericRowData.of(null, null));
test(new DecimalDataTypeInfo(10, 5), null);
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
index 8995185..1989701 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
@@ -110,8 +110,7 @@ public class SlicingWindowAggOperatorTest {
private static final RowDataHarnessAssertor ASSERTER =
new RowDataHarnessAssertor(
- OUTPUT_TYPES,
- new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
+ OUTPUT_TYPES, new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
@Test
public void testEventTimeHoppingWindows() throws Exception {
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java
index 99771bc..cdac040 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java
@@ -37,8 +37,7 @@ abstract class ProcTimeDeduplicateFunctionTestBase {
Time minTime = Time.milliseconds(10);
InternalTypeInfo<RowData> inputRowType =
- InternalTypeInfo.ofFields(
- new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new IntType());
+ InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new BigIntType(), new IntType());
int rowKeyIdx = 1;
RowDataKeySelector rowKeySelector =
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
index 2a6cc8f..bec4ba1 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
@@ -40,8 +40,7 @@ abstract class RowTimeDeduplicateFunctionTestBase {
protected final long miniBatchSize = 4L;
protected Time minTtlTime = Time.milliseconds(10);
protected InternalTypeInfo inputRowType =
- InternalTypeInfo.ofFields(
- new VarCharType(VarCharType.MAX_LENGTH), new IntType(), new BigIntType());
+ InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new IntType(), new BigIntType());
protected TypeSerializer<RowData> serializer = inputRowType.toSerializer();
protected int rowTimeIndex = 2;
protected int rowKeyIndex = 0;
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java
index 9c91345..386fd28 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java
@@ -84,8 +84,7 @@ public class RowTimeWindowDeduplicateOperatorTest {
private static final RowDataHarnessAssertor ASSERTER =
new RowDataHarnessAssertor(
- OUTPUT_TYPES,
- new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
+ OUTPUT_TYPES, new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java
index 99f93fd..f73d224 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java
@@ -261,13 +261,13 @@ public class RandomSortMergeInnerJoinTest {
boolean input1First)
throws Exception {
InternalTypeInfo<RowData> typeInfo =
- InternalTypeInfo.ofFields(new IntType(), new VarCharType(VarCharType.MAX_LENGTH));
+ InternalTypeInfo.ofFields(new IntType(), VarCharType.STRING_TYPE);
InternalTypeInfo<RowData> joinedInfo =
InternalTypeInfo.ofFields(
new IntType(),
- new VarCharType(VarCharType.MAX_LENGTH),
+ VarCharType.STRING_TYPE,
new IntType(),
- new VarCharType(VarCharType.MAX_LENGTH));
+ VarCharType.STRING_TYPE);
final TwoInputStreamTaskTestHarness<BinaryRowData, BinaryRowData, JoinedRowData>
testHarness =
new TwoInputStreamTaskTestHarness<>(
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
index 88c06bc..31eb491 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
@@ -48,15 +48,13 @@ import java.util.concurrent.LinkedBlockingQueue;
public class String2HashJoinOperatorTest implements Serializable {
private InternalTypeInfo<RowData> typeInfo =
- InternalTypeInfo.ofFields(
- new VarCharType(VarCharType.MAX_LENGTH),
- new VarCharType(VarCharType.MAX_LENGTH));
+ InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, VarCharType.STRING_TYPE);
private InternalTypeInfo<RowData> joinedInfo =
InternalTypeInfo.ofFields(
- new VarCharType(VarCharType.MAX_LENGTH),
- new VarCharType(VarCharType.MAX_LENGTH),
- new VarCharType(VarCharType.MAX_LENGTH),
- new VarCharType(VarCharType.MAX_LENGTH));
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE);
private transient TwoInputStreamTaskTestHarness<BinaryRowData, BinaryRowData, JoinedRowData>
testHarness;
private ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -354,6 +352,6 @@ public class String2HashJoinOperatorTest implements Serializable {
20,
10000,
10000,
- RowType.of(new VarCharType(VarCharType.MAX_LENGTH)));
+ RowType.of(VarCharType.STRING_TYPE));
}
}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
index 6a6c5060..719cecf 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
@@ -60,15 +60,13 @@ public class String2SortMergeJoinOperatorTest {
private boolean leftIsSmall;
InternalTypeInfo<RowData> typeInfo =
- InternalTypeInfo.ofFields(
- new VarCharType(VarCharType.MAX_LENGTH),
- new VarCharType(VarCharType.MAX_LENGTH));
+ InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, VarCharType.STRING_TYPE);
private InternalTypeInfo<RowData> joinedInfo =
InternalTypeInfo.ofFields(
- new VarCharType(VarCharType.MAX_LENGTH),
- new VarCharType(VarCharType.MAX_LENGTH),
- new VarCharType(VarCharType.MAX_LENGTH),
- new VarCharType(VarCharType.MAX_LENGTH));
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE);
public String2SortMergeJoinOperatorTest(boolean leftIsSmall) {
this.leftIsSmall = leftIsSmall;
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalStreamJoinTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalStreamJoinTestBase.java
index 6f92a34..034fe71 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalStreamJoinTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalStreamJoinTestBase.java
@@ -28,14 +28,14 @@ import org.apache.flink.table.types.logical.VarCharType;
/** Base Test for all subclass of {@link TimeIntervalJoin}. */
abstract class TimeIntervalStreamJoinTestBase {
InternalTypeInfo<RowData> rowType =
- InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH));
+ InternalTypeInfo.ofFields(new BigIntType(), VarCharType.STRING_TYPE);
private InternalTypeInfo<RowData> outputRowType =
InternalTypeInfo.ofFields(
new BigIntType(),
- new VarCharType(VarCharType.MAX_LENGTH),
+ VarCharType.STRING_TYPE,
new BigIntType(),
- new VarCharType(VarCharType.MAX_LENGTH));
+ VarCharType.STRING_TYPE);
RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(outputRowType.toRowFieldTypes());
protected String funcCode =
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java
index cc839c6..ff4e801 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java
@@ -43,7 +43,7 @@ public class TemporalProcessTimeJoinOperatorTest extends TemporalTimeJoinOperato
private int keyIdx = 0;
private InternalTypeInfo<RowData> rowType =
- InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH));
+ InternalTypeInfo.ofFields(new BigIntType(), VarCharType.STRING_TYPE);
private RowDataKeySelector keySelector =
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {keyIdx}, rowType.toRowFieldTypes());
@@ -51,9 +51,9 @@ public class TemporalProcessTimeJoinOperatorTest extends TemporalTimeJoinOperato
private InternalTypeInfo<RowData> outputRowType =
InternalTypeInfo.ofFields(
new BigIntType(),
- new VarCharType(VarCharType.MAX_LENGTH),
+ VarCharType.STRING_TYPE,
new BigIntType(),
- new VarCharType(VarCharType.MAX_LENGTH));
+ VarCharType.STRING_TYPE);
private RowDataHarnessAssertor assertor =
new RowDataHarnessAssertor(outputRowType.toRowFieldTypes());
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java
index 924e30e..8ba7a87 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java
@@ -46,17 +46,15 @@ abstract class TemporalTimeJoinOperatorTestBase {
new GeneratedJoinCondition("TimeTemporalJoinCondition", funcCode, new Object[0]);
protected InternalTypeInfo<RowData> rowType =
InternalTypeInfo.ofFields(
- new BigIntType(),
- new VarCharType(VarCharType.MAX_LENGTH),
- new VarCharType(VarCharType.MAX_LENGTH));
+ new BigIntType(), VarCharType.STRING_TYPE, VarCharType.STRING_TYPE);
protected InternalTypeInfo<RowData> outputRowType =
InternalTypeInfo.ofFields(
new BigIntType(),
- new VarCharType(VarCharType.MAX_LENGTH),
- new VarCharType(VarCharType.MAX_LENGTH),
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE,
new BigIntType(),
- new VarCharType(VarCharType.MAX_LENGTH),
- new VarCharType(VarCharType.MAX_LENGTH));
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE);
protected RowDataHarnessAssertor assertor =
new RowDataHarnessAssertor(outputRowType.toRowFieldTypes());
protected int keyIdx = 1;
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
index 3b7d093..762d434 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
@@ -50,14 +50,14 @@ import static org.junit.Assert.assertEquals;
public class WindowJoinOperatorTest {
private static final InternalTypeInfo<RowData> INPUT_ROW_TYPE =
- InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH));
+ InternalTypeInfo.ofFields(new BigIntType(), VarCharType.STRING_TYPE);
private static final InternalTypeInfo<RowData> OUTPUT_ROW_TYPE =
InternalTypeInfo.ofFields(
new BigIntType(),
- new VarCharType(VarCharType.MAX_LENGTH),
+ VarCharType.STRING_TYPE,
new BigIntType(),
- new VarCharType(VarCharType.MAX_LENGTH));
+ VarCharType.STRING_TYPE);
private static final RowDataHarnessAssertor ASSERTER =
new RowDataHarnessAssertor(OUTPUT_ROW_TYPE.toRowFieldTypes());
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java
index 94d337c..a9718fc 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java
@@ -50,7 +50,7 @@ public class ProcTimeRangeBoundedPrecedingFunctionTest {
private LogicalType[] inputFieldTypes =
new LogicalType[] {
- new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(),
+ VarCharType.STRING_TYPE, new BigIntType(),
};
private LogicalType[] accTypes = new LogicalType[] {new BigIntType()};
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java
index 689dec6..3f85eee 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java
@@ -42,9 +42,7 @@ public class RowTimeOverWindowTestBase {
};
protected LogicalType[] inputFieldTypes =
- new LogicalType[] {
- new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new BigIntType()
- };
+ new LogicalType[] {VarCharType.STRING_TYPE, new BigIntType(), new BigIntType()};
protected LogicalType[] accTypes = new LogicalType[] {new BigIntType()};
protected RowDataKeySelector keySelector =
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java
index aad99bc..c38e12c 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java
@@ -58,8 +58,7 @@ abstract class TopNFunctionTestBase {
long cacheSize = 10000L;
InternalTypeInfo<RowData> inputRowType =
- InternalTypeInfo.ofFields(
- new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new IntType());
+ InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new BigIntType(), new IntType());
static GeneratedRecordComparator generatedSortKeyComparator =
new GeneratedRecordComparator("", "", new Object[0]) {
@@ -107,10 +106,7 @@ abstract class TopNFunctionTestBase {
private InternalTypeInfo<RowData> outputTypeWithRowNumber =
InternalTypeInfo.ofFields(
- new VarCharType(VarCharType.MAX_LENGTH),
- new BigIntType(),
- new IntType(),
- new BigIntType());
+ VarCharType.STRING_TYPE, new BigIntType(), new IntType(), new BigIntType());
RowDataHarnessAssertor assertorWithoutRowNumber =
new RowDataHarnessAssertor(
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
index 087cc4c..60f686e 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
@@ -106,8 +106,7 @@ public class WindowRankOperatorTest {
private static final RowDataHarnessAssertor ASSERTER =
new RowDataHarnessAssertor(
- OUTPUT_TYPES,
- new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
+ OUTPUT_TYPES, new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
private static final LogicalType[] OUTPUT_TYPES_WITHOUT_RANK_NUMBER =
new LogicalType[] {new VarCharType(Integer.MAX_VALUE), new IntType(), new BigIntType()};
@@ -118,7 +117,7 @@ public class WindowRankOperatorTest {
private static final RowDataHarnessAssertor ASSERTER_WITHOUT_RANK_NUMBER =
new RowDataHarnessAssertor(
OUTPUT_TYPES_WITHOUT_RANK_NUMBER,
- new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
+ new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/ProcTimeSortOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/ProcTimeSortOperatorTest.java
index e42a1ec..fa29703 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/ProcTimeSortOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/ProcTimeSortOperatorTest.java
@@ -43,10 +43,7 @@ public class ProcTimeSortOperatorTest {
private InternalTypeInfo<RowData> inputRowType =
InternalTypeInfo.ofFields(
- new IntType(),
- new BigIntType(),
- new VarCharType(VarCharType.MAX_LENGTH),
- new IntType());
+ new IntType(), new BigIntType(), VarCharType.STRING_TYPE, new IntType());
private GeneratedRecordComparator gComparator =
new GeneratedRecordComparator("", "", new Object[0]) {
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/RowTimeSortOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/RowTimeSortOperatorTest.java
index 7acaaa5..53cf10b 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/RowTimeSortOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/RowTimeSortOperatorTest.java
@@ -46,10 +46,7 @@ public class RowTimeSortOperatorTest {
public void testSortOnTwoFields() throws Exception {
InternalTypeInfo<RowData> inputRowType =
InternalTypeInfo.ofFields(
- new IntType(),
- new BigIntType(),
- new VarCharType(VarCharType.MAX_LENGTH),
- new IntType());
+ new IntType(), new BigIntType(), VarCharType.STRING_TYPE, new IntType());
// Note: RowTimeIdx must be 0 in product environment, the value is 1 here just for simplify
// the testing
@@ -134,10 +131,7 @@ public class RowTimeSortOperatorTest {
public void testOnlySortOnRowTime() throws Exception {
InternalTypeInfo<RowData> inputRowType =
InternalTypeInfo.ofFields(
- new BigIntType(),
- new BigIntType(),
- new VarCharType(VarCharType.MAX_LENGTH),
- new IntType());
+ new BigIntType(), new BigIntType(), VarCharType.STRING_TYPE, new IntType());
int rowTimeIdx = 0;
RowDataHarnessAssertor assertor =
new RowDataHarnessAssertor(inputRowType.toRowFieldTypes());
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java
index e535ede..5d4879c 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java
@@ -40,7 +40,7 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord
public class StreamSortOperatorTest {
private InternalTypeInfo<RowData> inputRowType =
- InternalTypeInfo.ofFields(new VarCharType(VarCharType.MAX_LENGTH), new IntType());
+ InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new IntType());
private GeneratedRecordComparator sortKeyComparator =
new GeneratedRecordComparator("", "", new Object[0]) {
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorContractTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorContractTest.java
index e8308f8..76db205 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorContractTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorContractTest.java
@@ -195,8 +195,7 @@ public class WindowOperatorContractTest {
long allowedLateness)
throws Exception {
- LogicalType[] inputTypes =
- new LogicalType[] {new VarCharType(VarCharType.MAX_LENGTH), new IntType()};
+ LogicalType[] inputTypes = new LogicalType[] {VarCharType.STRING_TYPE, new IntType()};
RowDataKeySelector keySelector =
HandwrittenSelectorUtil.getRowDataSelector(new int[] {0}, inputTypes);
TypeInformation<RowData> keyType = keySelector.getProducedType();
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorTest.java
index ed0e75c..6235352 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorTest.java
@@ -123,13 +123,11 @@ public class WindowOperatorTest {
private static AtomicInteger closeCalled = new AtomicInteger(0);
private LogicalType[] inputFieldTypes =
- new LogicalType[] {
- new VarCharType(VarCharType.MAX_LENGTH), new IntType(), new BigIntType()
- };
+ new LogicalType[] {VarCharType.STRING_TYPE, new IntType(), new BigIntType()};
private InternalTypeInfo<RowData> outputType =
InternalTypeInfo.ofFields(
- new VarCharType(VarCharType.MAX_LENGTH),
+ VarCharType.STRING_TYPE,
new BigIntType(),
new BigIntType(),
new BigIntType(),
@@ -147,7 +145,7 @@ public class WindowOperatorTest {
private RowDataHarnessAssertor assertor =
new RowDataHarnessAssertor(
outputType.toRowFieldTypes(),
- new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
+ new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
private ConcurrentLinkedQueue<Object> doubleRecord(
boolean isDouble, StreamRecord<RowData> record) {
@@ -1543,8 +1541,7 @@ public class WindowOperatorTest {
RowDataHarnessAssertor assertor =
new RowDataHarnessAssertor(
outputType.toRowFieldTypes(),
- new GenericRowRecordSortComparator(
- 0, new VarCharType(VarCharType.MAX_LENGTH)));
+ new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java
index d8eb34c..3bf9e4c 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java
@@ -82,7 +82,7 @@ public class DataTypePrecisionFixerTest {
.logicalType(new LocalZonedTimestampType(2))
.expect(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(2)),
TestSpecs.fix(Types.STRING)
- .logicalType(new VarCharType(VarCharType.MAX_LENGTH))
+ .logicalType(VarCharType.STRING_TYPE)
.expect(DataTypes.STRING()),
// nested
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
index 1e0aa14..d87a5f0 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java
@@ -94,7 +94,7 @@ public class RowDataSerializerTest extends SerializerTestInstance<RowData> {
private static Object[] testRowDataSerializer() {
InternalTypeInfo<RowData> typeInfo =
- InternalTypeInfo.ofFields(new IntType(), new VarCharType(VarCharType.MAX_LENGTH));
+ InternalTypeInfo.ofFields(new IntType(), VarCharType.STRING_TYPE);
GenericRowData row1 = new GenericRowData(2);
row1.setField(0, 1);
row1.setField(1, fromString("a"));
@@ -122,7 +122,7 @@ public class RowDataSerializerTest extends SerializerTestInstance<RowData> {
new IntType(),
new IntType(),
new IntType(),
- new VarCharType(VarCharType.MAX_LENGTH));
+ VarCharType.STRING_TYPE);
GenericRowData row = new GenericRowData(13);
row.setField(0, 2);
@@ -147,7 +147,7 @@ public class RowDataSerializerTest extends SerializerTestInstance<RowData> {
InternalTypeInfo.ofFields(
new IntType(),
new DoubleType(),
- new VarCharType(VarCharType.MAX_LENGTH),
+ VarCharType.STRING_TYPE,
new ArrayType(new IntType()),
new MapType(new IntType(), new IntType()));
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
index a599f29..9d50c32 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
@@ -53,7 +53,7 @@ public abstract class BytesHashMapTestBase<K> extends BytesMapTestBase {
static final LogicalType[] KEY_TYPES =
new LogicalType[] {
new IntType(),
- new VarCharType(VarCharType.MAX_LENGTH),
+ VarCharType.STRING_TYPE,
new DoubleType(),
new BigIntType(),
new BooleanType(),
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java
index 22dca2d..c4a5d63 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java
@@ -50,7 +50,7 @@ public abstract class BytesMultiMapTestBase<K> extends BytesMapTestBase {
static final LogicalType[] KEY_TYPES =
new LogicalType[] {
new IntType(),
- new VarCharType(VarCharType.MAX_LENGTH),
+ VarCharType.STRING_TYPE,
new DoubleType(),
new BigIntType(),
new BooleanType(),
@@ -60,7 +60,7 @@ public abstract class BytesMultiMapTestBase<K> extends BytesMapTestBase {
static final LogicalType[] VALUE_TYPES =
new LogicalType[] {
- new VarCharType(VarCharType.MAX_LENGTH), new IntType(),
+ VarCharType.STRING_TYPE, new IntType(),
};
protected final PagedTypeSerializer<K> keySerializer;