You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/01/14 08:14:57 UTC
[flink] 01/02: [FLINK-25230][table-planner] Replace RelDataType with LogicalType serialization
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 75992f495192b84cfe406d7e24a1188c7cb284b0
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Jan 12 14:34:43 2022 +0100
[FLINK-25230][table-planner] Replace RelDataType with LogicalType serialization
---
.../exec/serde/AggregateCallJsonSerializer.java | 2 +-
.../exec/serde/RelDataTypeJsonDeserializer.java | 173 +-----
.../exec/serde/RelDataTypeJsonSerializer.java | 146 +----
.../nodes/exec/serde/RexNodeJsonSerializer.java | 74 ++-
.../exec/serde/RexWindowBoundJsonSerializer.java | 3 +-
.../planner/plan/schema/StructuredRelDataType.java | 2 +-
.../typeutils/LogicalRelDataTypeConverter.java | 649 +++++++++++++++++++++
.../nodes/exec/serde/DataTypeJsonSerdeTest.java | 51 +-
.../serde/DynamicTableSourceSpecSerdeTest.java | 3 +
.../plan/nodes/exec/serde/JsonSerdeMocks.java | 76 +++
.../nodes/exec/serde/LogicalTypeJsonSerdeTest.java | 45 +-
.../nodes/exec/serde/RelDataTypeJsonSerdeTest.java | 219 +++----
.../plan/nodes/exec/serde/RexNodeSerdeTest.java | 103 +---
.../nodes/exec/serde/RexWindowBoundSerdeTest.java | 3 +-
.../serde/TemporalTableSourceSpecSerdeTest.java | 8 +-
.../typeutils/LogicalRelDataTypeConverterTest.java | 215 +++++++
16 files changed, 1157 insertions(+), 615 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
index 92c85b4..27f2549 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
@@ -83,7 +83,7 @@ public class AggregateCallJsonSerializer extends StdSerializer<AggregateCall> {
jsonGenerator.writeBooleanField(FIELD_NAME_DISTINCT, aggCall.isDistinct());
jsonGenerator.writeBooleanField(FIELD_NAME_APPROXIMATE, aggCall.isApproximate());
jsonGenerator.writeBooleanField(FIELD_NAME_IGNORE_NULLS, aggCall.ignoreNulls());
- jsonGenerator.writeObjectField(FIELD_NAME_TYPE, aggCall.getType());
+ serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, aggCall.getType(), jsonGenerator);
jsonGenerator.writeEndObject();
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java
index 1476e41..6b35780 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java
@@ -18,57 +18,26 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableException;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.typeutils.LogicalRelDataTypeConverter;
import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RawType;
-import org.apache.flink.table.types.logical.StructuredType;
-import org.apache.flink.table.types.logical.TimestampKind;
-import org.apache.flink.table.types.logical.TypeInformationRawType;
-import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
-import org.apache.flink.table.utils.EncodingUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.ObjectCodec;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
-import org.apache.calcite.avatica.util.TimeUnit;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.StructKind;
-import org.apache.calcite.sql.SqlIntervalQualifier;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.util.Util;
import java.io.IOException;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_ELEMENT;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_FIELDS;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_FILED_NAME;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_KEY;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_NULLABLE;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_PRECISION;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_RAW_TYPE;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_SCALE;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_STRUCTURED_TYPE;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_STRUCT_KIND;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_TIMESTAMP_KIND;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_TYPE_INFO;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_TYPE_NAME;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_VALUE;
-import static org.apache.flink.util.Preconditions.checkArgument;
-
/**
- * JSON deserializer for {@link RelDataType}. refer to {@link RelDataTypeJsonSerializer} for
- * serializer.
+ * JSON deserializer for {@link RelDataType}.
+ *
+ * @see RelDataTypeJsonSerializer for the reverse operation
*/
+@Internal
public class RelDataTypeJsonDeserializer extends StdDeserializer<RelDataType> {
private static final long serialVersionUID = 1L;
@@ -79,129 +48,11 @@ public class RelDataTypeJsonDeserializer extends StdDeserializer<RelDataType> {
@Override
public RelDataType deserialize(JsonParser jsonParser, DeserializationContext ctx)
throws IOException {
- JsonNode jsonNode = jsonParser.readValueAsTree();
- return deserialize(jsonNode, jsonParser.getCodec(), ctx);
- }
-
- private RelDataType deserialize(
- JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) throws IOException {
- SerdeContext serdeContext = SerdeContext.get(ctx);
- FlinkTypeFactory typeFactory = serdeContext.getTypeFactory();
- if (jsonNode instanceof ObjectNode) {
- ObjectNode objectNode = (ObjectNode) jsonNode;
- if (objectNode.has(FIELD_NAME_TIMESTAMP_KIND)) {
- boolean nullable = objectNode.get(FIELD_NAME_NULLABLE).booleanValue();
- String typeName = objectNode.get(FIELD_NAME_TYPE_NAME).textValue();
- boolean isTimestampLtz =
- SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE.name().equals(typeName);
- TimestampKind timestampKind =
- TimestampKind.valueOf(
- objectNode.get(FIELD_NAME_TIMESTAMP_KIND).asText().toUpperCase());
- switch (timestampKind) {
- case ROWTIME:
- return typeFactory.createRowtimeIndicatorType(nullable, isTimestampLtz);
- case PROCTIME:
- return typeFactory.createProctimeIndicatorType(nullable);
- default:
- throw new TableException(timestampKind + " is not supported.");
- }
- } else if (objectNode.has(FIELD_NAME_STRUCTURED_TYPE)) {
- JsonNode structuredTypeNode = objectNode.get(FIELD_NAME_STRUCTURED_TYPE);
- LogicalType structuredType =
- ctx.readValue(structuredTypeNode.traverse(codec), LogicalType.class);
- checkArgument(structuredType instanceof StructuredType);
- return serdeContext.getTypeFactory().createFieldTypeFromLogicalType(structuredType);
- } else if (objectNode.has(FIELD_NAME_STRUCT_KIND)) {
- ArrayNode arrayNode = (ArrayNode) objectNode.get(FIELD_NAME_FIELDS);
- RelDataTypeFactory.Builder builder = typeFactory.builder();
- for (JsonNode node : arrayNode) {
- builder.add(
- node.get(FIELD_NAME_FILED_NAME).asText(),
- deserialize(node, codec, ctx));
- }
- StructKind structKind =
- StructKind.valueOf(
- objectNode.get(FIELD_NAME_STRUCT_KIND).asText().toUpperCase());
- boolean nullable = objectNode.get(FIELD_NAME_NULLABLE).booleanValue();
- return builder.kind(structKind).nullableRecord(nullable).build();
- } else if (objectNode.has(FIELD_NAME_FIELDS)) {
- JsonNode fields = objectNode.get(FIELD_NAME_FIELDS);
- // Nested struct
- return deserialize(fields, codec, ctx);
- } else {
- SqlTypeName sqlTypeName =
- Util.enumVal(
- SqlTypeName.class, objectNode.get(FIELD_NAME_TYPE_NAME).asText());
- boolean nullable = objectNode.get(FIELD_NAME_NULLABLE).booleanValue();
- if (SqlTypeName.INTERVAL_TYPES.contains(sqlTypeName)) {
- TimeUnit startUnit = sqlTypeName.getStartUnit();
- TimeUnit endUnit = sqlTypeName.getEndUnit();
- return typeFactory.createTypeWithNullability(
- typeFactory.createSqlIntervalType(
- new SqlIntervalQualifier(
- startUnit, endUnit, SqlParserPos.ZERO)),
- nullable);
- }
- if (sqlTypeName == SqlTypeName.OTHER && objectNode.has(FIELD_NAME_RAW_TYPE)) {
- RawType<?> rawType =
- (RawType<?>)
- LogicalTypeParser.parse(
- objectNode.get(FIELD_NAME_RAW_TYPE).asText(),
- serdeContext.getClassLoader());
- return typeFactory.createTypeWithNullability(
- typeFactory.createFieldTypeFromLogicalType(rawType), nullable);
- }
- if (sqlTypeName == SqlTypeName.ANY && objectNode.has(FIELD_NAME_RAW_TYPE)) {
- JsonNode rawTypeNode = objectNode.get(FIELD_NAME_RAW_TYPE);
- boolean nullableOfTypeInfo =
- rawTypeNode.get(FIELD_NAME_NULLABLE).booleanValue();
- TypeInformation<?> typeInfo =
- EncodingUtils.decodeStringToObject(
- rawTypeNode.get(FIELD_NAME_TYPE_INFO).asText(),
- TypeInformation.class,
- serdeContext.getClassLoader());
- TypeInformationRawType<?> rawType =
- new TypeInformationRawType<>(nullableOfTypeInfo, typeInfo);
- return typeFactory.createTypeWithNullability(
- typeFactory.createFieldTypeFromLogicalType(rawType), nullable);
- }
-
- Integer precision =
- objectNode.has(FIELD_NAME_PRECISION)
- ? objectNode.get(FIELD_NAME_PRECISION).intValue()
- : null;
- Integer scale =
- objectNode.has(FIELD_NAME_SCALE)
- ? objectNode.get(FIELD_NAME_SCALE).intValue()
- : null;
- final RelDataType type;
- if (sqlTypeName == SqlTypeName.ARRAY) {
- RelDataType elementType =
- deserialize(objectNode.get(FIELD_NAME_ELEMENT), codec, ctx);
- type = typeFactory.createArrayType(elementType, -1);
- } else if (sqlTypeName == SqlTypeName.MULTISET) {
- RelDataType elementType =
- deserialize(objectNode.get(FIELD_NAME_ELEMENT), codec, ctx);
- type = typeFactory.createMultisetType(elementType, -1);
- } else if (sqlTypeName == SqlTypeName.MAP) {
- RelDataType keyType = deserialize(objectNode.get(FIELD_NAME_KEY), codec, ctx);
- RelDataType valueType =
- deserialize(objectNode.get(FIELD_NAME_VALUE), codec, ctx);
- type = typeFactory.createMapType(keyType, valueType);
- } else if (precision == null) {
- type = typeFactory.createSqlType(sqlTypeName);
- } else if (scale == null) {
- type = typeFactory.createSqlType(sqlTypeName, precision);
- } else {
- type = typeFactory.createSqlType(sqlTypeName, precision, scale);
- }
- return typeFactory.createTypeWithNullability(type, nullable);
- }
- } else if (jsonNode instanceof TextNode) {
- SqlTypeName sqlTypeName = Util.enumVal(SqlTypeName.class, jsonNode.asText());
- return typeFactory.createSqlType(sqlTypeName);
- } else {
- throw new TableException("Unknown type: " + jsonNode.toPrettyString());
- }
+ final JsonNode logicalTypeNode = jsonParser.readValueAsTree();
+ final SerdeContext serdeContext = SerdeContext.get(ctx);
+ final FlinkTypeFactory typeFactory = serdeContext.getTypeFactory();
+ final LogicalType logicalType =
+ LogicalTypeJsonDeserializer.deserialize(logicalTypeNode, serdeContext);
+ return LogicalRelDataTypeConverter.toRelDataType(logicalType, typeFactory);
}
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java
index 01f500c..5d76abb 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java
@@ -18,49 +18,28 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
-import org.apache.flink.table.planner.plan.schema.GenericRelDataType;
-import org.apache.flink.table.planner.plan.schema.RawRelDataType;
-import org.apache.flink.table.planner.plan.schema.StructuredRelDataType;
-import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
-import org.apache.flink.table.types.logical.TimestampKind;
-import org.apache.flink.table.types.logical.TypeInformationRawType;
-import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.planner.typeutils.LogicalRelDataTypeConverter;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.sql.type.ArraySqlType;
-import org.apache.calcite.sql.type.MapSqlType;
-import org.apache.calcite.sql.type.MultisetSqlType;
-import org.apache.calcite.sql.type.SqlTypeName;
import java.io.IOException;
/**
- * JSON serializer for {@link RelDataType}. refer to {@link RelDataTypeJsonDeserializer} for
- * deserializer.
+ * JSON serializer for {@link RelDataType}.
+ *
+ * @see RelDataTypeJsonDeserializer for the reverse operation.
*/
-public class RelDataTypeJsonSerializer extends StdSerializer<RelDataType> {
+@Internal
+public final class RelDataTypeJsonSerializer extends StdSerializer<RelDataType> {
private static final long serialVersionUID = 1L;
- public static final String FIELD_NAME_TYPE_NAME = "typeName";
- public static final String FIELD_NAME_FILED_NAME = "fieldName";
- public static final String FIELD_NAME_NULLABLE = "nullable";
- public static final String FIELD_NAME_PRECISION = "precision";
- public static final String FIELD_NAME_SCALE = "scale";
- public static final String FIELD_NAME_FIELDS = "fields";
- public static final String FIELD_NAME_STRUCT_KIND = "structKind";
- public static final String FIELD_NAME_TIMESTAMP_KIND = "timestampKind";
- public static final String FIELD_NAME_ELEMENT = "element";
- public static final String FIELD_NAME_KEY = "key";
- public static final String FIELD_NAME_VALUE = "value";
- public static final String FIELD_NAME_TYPE_INFO = "typeInfo";
- public static final String FIELD_NAME_RAW_TYPE = "rawType";
- public static final String FIELD_NAME_STRUCTURED_TYPE = "structuredType";
-
public RelDataTypeJsonSerializer() {
super(RelDataType.class);
}
@@ -71,103 +50,14 @@ public class RelDataTypeJsonSerializer extends StdSerializer<RelDataType> {
JsonGenerator jsonGenerator,
SerializerProvider serializerProvider)
throws IOException {
- jsonGenerator.writeStartObject();
- serializeInternal(relDataType, jsonGenerator, serializerProvider);
- jsonGenerator.writeEndObject();
- }
-
- private void serializeInternal(
- RelDataType relDataType, JsonGenerator gen, SerializerProvider serializerProvider)
- throws IOException {
- if (relDataType instanceof TimeIndicatorRelDataType) {
- TimeIndicatorRelDataType timeIndicatorType = (TimeIndicatorRelDataType) relDataType;
- gen.writeStringField(
- FIELD_NAME_TIMESTAMP_KIND,
- timeIndicatorType.isEventTime()
- ? TimestampKind.ROWTIME.name()
- : TimestampKind.PROCTIME.name());
- gen.writeStringField(
- FIELD_NAME_TYPE_NAME, timeIndicatorType.originalType().getSqlTypeName().name());
- gen.writeBooleanField(FIELD_NAME_NULLABLE, relDataType.isNullable());
- } else if (relDataType instanceof StructuredRelDataType) {
- StructuredRelDataType structuredType = (StructuredRelDataType) relDataType;
- serializerProvider.defaultSerializeField(
- FIELD_NAME_STRUCTURED_TYPE, structuredType.getStructuredType(), gen);
- } else if (relDataType.isStruct()) {
- gen.writeStringField(FIELD_NAME_STRUCT_KIND, relDataType.getStructKind().name());
- gen.writeBooleanField(FIELD_NAME_NULLABLE, relDataType.isNullable());
-
- gen.writeFieldName(FIELD_NAME_FIELDS);
- gen.writeStartArray();
- for (RelDataTypeField field : relDataType.getFieldList()) {
- gen.writeStartObject();
- serializeInternal(field.getType(), gen, serializerProvider);
- gen.writeStringField(FIELD_NAME_FILED_NAME, field.getName());
- gen.writeEndObject();
- }
- gen.writeEndArray();
- } else if (relDataType.getSqlTypeName() == SqlTypeName.ARRAY) {
- serializeCommon(relDataType, gen);
- ArraySqlType arraySqlType = (ArraySqlType) relDataType;
-
- gen.writeFieldName(FIELD_NAME_ELEMENT);
- gen.writeStartObject();
- serializeInternal(arraySqlType.getComponentType(), gen, serializerProvider);
- gen.writeEndObject();
- } else if (relDataType.getSqlTypeName() == SqlTypeName.MULTISET) {
- assert relDataType instanceof MultisetSqlType;
- serializeCommon(relDataType, gen);
- MultisetSqlType multisetSqlType = (MultisetSqlType) relDataType;
-
- gen.writeFieldName(FIELD_NAME_ELEMENT);
- gen.writeStartObject();
- serializeInternal(multisetSqlType.getComponentType(), gen, serializerProvider);
- gen.writeEndObject();
- } else if (relDataType.getSqlTypeName() == SqlTypeName.MAP) {
- assert relDataType instanceof MapSqlType;
- serializeCommon(relDataType, gen);
- MapSqlType mapSqlType = (MapSqlType) relDataType;
-
- gen.writeFieldName(FIELD_NAME_KEY);
- gen.writeStartObject();
- serializeInternal(mapSqlType.getKeyType(), gen, serializerProvider);
- gen.writeEndObject();
-
- gen.writeFieldName(FIELD_NAME_VALUE);
- gen.writeStartObject();
- serializeInternal(mapSqlType.getValueType(), gen, serializerProvider);
- gen.writeEndObject();
- } else if (relDataType instanceof GenericRelDataType) {
- assert relDataType.getSqlTypeName() == SqlTypeName.ANY;
- serializeCommon(relDataType, gen);
- TypeInformationRawType<?> rawType = ((GenericRelDataType) relDataType).genericType();
-
- gen.writeFieldName(FIELD_NAME_RAW_TYPE);
- gen.writeStartObject();
- gen.writeBooleanField(FIELD_NAME_NULLABLE, rawType.isNullable());
- gen.writeStringField(
- FIELD_NAME_TYPE_INFO,
- EncodingUtils.encodeObjectToString(rawType.getTypeInformation()));
- gen.writeEndObject();
- } else if (relDataType instanceof RawRelDataType) {
- assert relDataType.getSqlTypeName() == SqlTypeName.OTHER;
- serializeCommon(relDataType, gen);
- RawRelDataType rawType = (RawRelDataType) relDataType;
- gen.writeStringField(FIELD_NAME_RAW_TYPE, rawType.getRawType().asSerializableString());
- } else {
- serializeCommon(relDataType, gen);
- }
- }
-
- private void serializeCommon(RelDataType relDataType, JsonGenerator gen) throws IOException {
- final SqlTypeName typeName = relDataType.getSqlTypeName();
- gen.writeStringField(FIELD_NAME_TYPE_NAME, typeName.name());
- gen.writeBooleanField(FIELD_NAME_NULLABLE, relDataType.isNullable());
- if (relDataType.getSqlTypeName().allowsPrec()) {
- gen.writeNumberField(FIELD_NAME_PRECISION, relDataType.getPrecision());
- }
- if (relDataType.getSqlTypeName().allowsScale()) {
- gen.writeNumberField(FIELD_NAME_SCALE, relDataType.getScale());
- }
+ final SerdeContext serdeContext = SerdeContext.get(serializerProvider);
+ final DataTypeFactory dataTypeFactory =
+ serdeContext.getFlinkContext().getCatalogManager().getDataTypeFactory();
+ // Conversion to LogicalType also ensures that Calcite's type system is materialized
+ // so data types like DECIMAL will receive a concrete precision and scale (not unspecified
+ // anymore).
+ final LogicalType logicalType =
+ LogicalRelDataTypeConverter.toLogicalType(relDataType, dataTypeFactory);
+ serializerProvider.defaultSerializeValue(logicalType, jsonGenerator);
}
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
index f16e805..427e488 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
@@ -106,52 +106,63 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
switch (rexNode.getKind()) {
case INPUT_REF:
case TABLE_INPUT_REF:
- serialize((RexInputRef) rexNode, jsonGenerator);
+ serialize((RexInputRef) rexNode, jsonGenerator, serializerProvider);
break;
case LITERAL:
- serialize((RexLiteral) rexNode, jsonGenerator);
+ serialize((RexLiteral) rexNode, jsonGenerator, serializerProvider);
break;
case FIELD_ACCESS:
- serialize((RexFieldAccess) rexNode, jsonGenerator);
+ serialize((RexFieldAccess) rexNode, jsonGenerator, serializerProvider);
break;
case CORREL_VARIABLE:
- serialize((RexCorrelVariable) rexNode, jsonGenerator);
+ serialize((RexCorrelVariable) rexNode, jsonGenerator, serializerProvider);
break;
case PATTERN_INPUT_REF:
- serialize((RexPatternFieldRef) rexNode, jsonGenerator);
+ serialize((RexPatternFieldRef) rexNode, jsonGenerator, serializerProvider);
break;
default:
if (rexNode instanceof RexCall) {
- serialize((RexCall) rexNode, jsonGenerator);
+ serialize((RexCall) rexNode, jsonGenerator, serializerProvider);
} else {
throw new TableException("Unknown RexNode: " + rexNode);
}
}
}
- private void serialize(RexPatternFieldRef inputRef, JsonGenerator gen) throws IOException {
+ private void serialize(
+ RexPatternFieldRef inputRef, JsonGenerator gen, SerializerProvider serializerProvider)
+ throws IOException {
gen.writeStartObject();
gen.writeStringField(FIELD_NAME_KIND, SQL_KIND_PATTERN_INPUT_REF);
gen.writeStringField(FIELD_NAME_ALPHA, inputRef.getAlpha());
gen.writeNumberField(FIELD_NAME_INPUT_INDEX, inputRef.getIndex());
- gen.writeObjectField(FIELD_NAME_TYPE, inputRef.getType());
+ serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, inputRef.getType(), gen);
gen.writeEndObject();
}
- private void serialize(RexInputRef inputRef, JsonGenerator gen) throws IOException {
+ private void serialize(
+ RexInputRef inputRef, JsonGenerator gen, SerializerProvider serializerProvider)
+ throws IOException {
gen.writeStartObject();
gen.writeStringField(FIELD_NAME_KIND, SQL_KIND_INPUT_REF);
gen.writeNumberField(FIELD_NAME_INPUT_INDEX, inputRef.getIndex());
- gen.writeObjectField(FIELD_NAME_TYPE, inputRef.getType());
+ serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, inputRef.getType(), gen);
gen.writeEndObject();
}
- private void serialize(RexLiteral literal, JsonGenerator gen) throws IOException {
+ private void serialize(
+ RexLiteral literal, JsonGenerator gen, SerializerProvider serializerProvider)
+ throws IOException {
gen.writeStartObject();
gen.writeStringField(FIELD_NAME_KIND, SQL_KIND_LITERAL);
Comparable<?> value = literal.getValueAs(Comparable.class);
- serialize(value, literal.getTypeName(), literal.getType().getSqlTypeName(), gen);
- gen.writeObjectField(FIELD_NAME_TYPE, literal.getType());
+ serialize(
+ value,
+ literal.getTypeName(),
+ literal.getType().getSqlTypeName(),
+ gen,
+ serializerProvider);
+ serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, literal.getType(), gen);
gen.writeEndObject();
}
@@ -160,7 +171,8 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
Comparable<?> value,
SqlTypeName literalTypeName,
SqlTypeName elementTypeName,
- JsonGenerator gen)
+ JsonGenerator gen,
+ SerializerProvider serializerProvider)
throws IOException {
if (value == null) {
gen.writeNullField(FIELD_NAME_VALUE);
@@ -229,14 +241,14 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
gen.writeStringField(FIELD_NAME_CLASS, value.getClass().getName());
break;
case SARG:
- serialize((Sarg<?>) value, elementTypeName, gen);
+ serialize((Sarg<?>) value, elementTypeName, gen, serializerProvider);
break;
case ROW:
case MULTISET:
gen.writeFieldName(FIELD_NAME_VALUE);
gen.writeStartArray();
for (RexLiteral v : (FlatLists.ComparableList<RexLiteral>) value) {
- serialize(v, gen);
+ serialize(v, gen, serializerProvider);
}
gen.writeEndArray();
break;
@@ -247,7 +259,11 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
}
@SuppressWarnings("UnstableApiUsage")
- private void serialize(Sarg<?> value, SqlTypeName sqlTypeName, JsonGenerator gen)
+ private void serialize(
+ Sarg<?> value,
+ SqlTypeName sqlTypeName,
+ JsonGenerator gen,
+ SerializerProvider serializerProvider)
throws IOException {
gen.writeFieldName(FIELD_NAME_SARG);
gen.writeStartObject();
@@ -258,14 +274,14 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
if (range.hasLowerBound()) {
gen.writeFieldName(FIELD_NAME_BOUND_LOWER);
gen.writeStartObject();
- serialize(range.lowerEndpoint(), sqlTypeName, sqlTypeName, gen);
+ serialize(range.lowerEndpoint(), sqlTypeName, sqlTypeName, gen, serializerProvider);
gen.writeStringField(FIELD_NAME_BOUND_TYPE, range.lowerBoundType().name());
gen.writeEndObject();
}
if (range.hasUpperBound()) {
gen.writeFieldName(FIELD_NAME_BOUND_UPPER);
gen.writeStartObject();
- serialize(range.upperEndpoint(), sqlTypeName, sqlTypeName, gen);
+ serialize(range.upperEndpoint(), sqlTypeName, sqlTypeName, gen, serializerProvider);
gen.writeStringField(FIELD_NAME_BOUND_TYPE, range.upperBoundType().name());
gen.writeEndObject();
}
@@ -276,23 +292,29 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
gen.writeEndObject();
}
- private void serialize(RexFieldAccess fieldAccess, JsonGenerator gen) throws IOException {
+ private void serialize(
+ RexFieldAccess fieldAccess, JsonGenerator gen, SerializerProvider serializerProvider)
+ throws IOException {
gen.writeStartObject();
gen.writeStringField(FIELD_NAME_KIND, SQL_KIND_FIELD_ACCESS);
gen.writeStringField(FIELD_NAME_NAME, fieldAccess.getField().getName());
- gen.writeObjectField(FIELD_NAME_EXPR, fieldAccess.getReferenceExpr());
+ serializerProvider.defaultSerializeField(
+ FIELD_NAME_EXPR, fieldAccess.getReferenceExpr(), gen);
gen.writeEndObject();
}
- private void serialize(RexCorrelVariable variable, JsonGenerator gen) throws IOException {
+ private void serialize(
+ RexCorrelVariable variable, JsonGenerator gen, SerializerProvider serializerProvider)
+ throws IOException {
gen.writeStartObject();
gen.writeStringField(FIELD_NAME_KIND, SQL_KIND_CORREL_VARIABLE);
gen.writeStringField(FIELD_NAME_CORREL, variable.getName());
- gen.writeObjectField(FIELD_NAME_TYPE, variable.getType());
+ serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, variable.getType(), gen);
gen.writeEndObject();
}
- private void serialize(RexCall call, JsonGenerator gen) throws IOException {
+ private void serialize(RexCall call, JsonGenerator gen, SerializerProvider serializerProvider)
+ throws IOException {
if (!call.getClass().isAssignableFrom(RexCall.class)) {
throw new TableException("Unknown RexCall: " + call);
}
@@ -302,10 +324,10 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
gen.writeFieldName(FIELD_NAME_OPERANDS);
gen.writeStartArray();
for (RexNode operand : call.getOperands()) {
- gen.writeObject(operand);
+ serializerProvider.defaultSerializeValue(operand, gen);
}
gen.writeEndArray();
- gen.writeObjectField(FIELD_NAME_TYPE, call.getType());
+ serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, call.getType(), gen);
gen.writeEndObject();
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java
index ac6dcc2..021731c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java
@@ -75,7 +75,8 @@ public class RexWindowBoundJsonSerializer extends StdSerializer<RexWindowBound>
} else {
throw new TableException("Unknown RexWindowBound: " + rexWindowBound);
}
- gen.writeObjectField(FIELD_NAME_OFFSET, rexWindowBound.getOffset());
+ serializerProvider.defaultSerializeField(
+ FIELD_NAME_OFFSET, rexWindowBound.getOffset(), gen);
}
gen.writeEndObject();
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java
index 78b5ec3..3021040 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java
@@ -54,7 +54,7 @@ public final class StructuredRelDataType extends ObjectSqlType {
private final StructuredType structuredType;
- private StructuredRelDataType(StructuredType structuredType, List<RelDataTypeField> fields) {
+ public StructuredRelDataType(StructuredType structuredType, List<RelDataTypeField> fields) {
super(
SqlTypeName.STRUCTURED,
createSqlIdentifier(structuredType),
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverter.java
new file mode 100644
index 0000000..5645031
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverter.java
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.RawRelDataType;
+import org.apache.flink.table.planner.plan.schema.StructuredRelDataType;
+import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rel.type.StructKind;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Symmetric converter between {@link LogicalType} and {@link RelDataType}.
+ *
+ * <p>This converter has many similarities with {@link FlinkTypeFactory} and might also replace it
+ * at some point. However, for now it is more consistent and does not lose information (i.e. for
+ * TIME(9) or interval types). It still delegates to {@link RelDataTypeFactory} but only for
+ * predefined/basic types.
+ *
+ * <p>Note: The conversion to {@link LogicalType} is not 100% symmetric and is currently optimized
+ * for expressions. Information about the {@link StructKind} of a {@link RelRecordType} is always
+ * set to {@link StructKind#PEEK_FIELDS_NO_EXPAND}. Missing precision and scale will be filled with
+ * Flink's default values such that all resulting {@link LogicalType}s will be fully resolved.
+ */
+@Internal
+public final class LogicalRelDataTypeConverter {
+
+ public static RelDataType toRelDataType(
+ LogicalType logicalType, RelDataTypeFactory relDataTypeFactory) {
+ final LogicalToRelDataTypeConverter converter =
+ new LogicalToRelDataTypeConverter(relDataTypeFactory);
+ final RelDataType relDataType = logicalType.accept(converter);
+ // this also canonizes in the factory (see SqlTypeFactoryImpl.canonize)
+ return relDataTypeFactory.createTypeWithNullability(relDataType, logicalType.isNullable());
+ }
+
+ public static LogicalType toLogicalType(
+ RelDataType relDataType, DataTypeFactory dataTypeFactory) {
+ final LogicalType logicalType = toLogicalTypeNotNull(relDataType, dataTypeFactory);
+ return logicalType.copy(relDataType.isNullable());
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // LogicalType to RelDataType
+ // --------------------------------------------------------------------------------------------
+
+ private static class LogicalToRelDataTypeConverter implements LogicalTypeVisitor<RelDataType> {
+
+ private final RelDataTypeFactory relDataTypeFactory;
+
+ LogicalToRelDataTypeConverter(RelDataTypeFactory relDataTypeFactory) {
+ this.relDataTypeFactory = relDataTypeFactory;
+ }
+
+ @Override
+ public RelDataType visit(CharType charType) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.CHAR, charType.getLength());
+ }
+
+ @Override
+ public RelDataType visit(VarCharType varCharType) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR, varCharType.getLength());
+ }
+
+ @Override
+ public RelDataType visit(BooleanType booleanType) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.BOOLEAN);
+ }
+
+ @Override
+ public RelDataType visit(BinaryType binaryType) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.BINARY, binaryType.getLength());
+ }
+
+ @Override
+ public RelDataType visit(VarBinaryType varBinaryType) {
+ return relDataTypeFactory.createSqlType(
+ SqlTypeName.VARBINARY, varBinaryType.getLength());
+ }
+
+ @Override
+ public RelDataType visit(DecimalType decimalType) {
+ return relDataTypeFactory.createSqlType(
+ SqlTypeName.DECIMAL, decimalType.getPrecision(), decimalType.getScale());
+ }
+
+ @Override
+ public RelDataType visit(TinyIntType tinyIntType) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.TINYINT);
+ }
+
+ @Override
+ public RelDataType visit(SmallIntType smallIntType) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.SMALLINT);
+ }
+
+ @Override
+ public RelDataType visit(IntType intType) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.INTEGER);
+ }
+
+ @Override
+ public RelDataType visit(BigIntType bigIntType) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.BIGINT);
+ }
+
+ @Override
+ public RelDataType visit(FloatType floatType) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.FLOAT);
+ }
+
+ @Override
+ public RelDataType visit(DoubleType doubleType) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.DOUBLE);
+ }
+
+ @Override
+ public RelDataType visit(DateType dateType) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.DATE);
+ }
+
+ @Override
+ public RelDataType visit(TimeType timeType) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.TIME, timeType.getPrecision());
+ }
+
+ @Override
+ public RelDataType visit(TimestampType timestampType) {
+ final RelDataType timestampRelDataType =
+ relDataTypeFactory.createSqlType(
+ SqlTypeName.TIMESTAMP, timestampType.getPrecision());
+ switch (timestampType.getKind()) {
+ case REGULAR:
+ return timestampRelDataType;
+ case ROWTIME:
+ assert timestampType.getPrecision() == 3;
+ return new TimeIndicatorRelDataType(
+ relDataTypeFactory.getTypeSystem(),
+ (BasicSqlType) timestampRelDataType,
+ timestampType.isNullable(),
+ true);
+ default:
+ throw new TableException("Unknown timestamp kind.");
+ }
+ }
+
+ @Override
+ public RelDataType visit(ZonedTimestampType zonedTimestampType) {
+ throw new TableException("TIMESTAMP WITH TIME ZONE is currently not supported.");
+ }
+
+ @Override
+ public RelDataType visit(LocalZonedTimestampType localZonedTimestampType) {
+ final RelDataType timestampRelDataType =
+ relDataTypeFactory.createSqlType(
+ SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+ localZonedTimestampType.getPrecision());
+ switch (localZonedTimestampType.getKind()) {
+ case REGULAR:
+ return timestampRelDataType;
+ case ROWTIME:
+ assert localZonedTimestampType.getPrecision() == 3;
+ return new TimeIndicatorRelDataType(
+ relDataTypeFactory.getTypeSystem(),
+ (BasicSqlType) timestampRelDataType,
+ localZonedTimestampType.isNullable(),
+ true);
+ case PROCTIME:
+ assert localZonedTimestampType.getPrecision() == 3;
+ return new TimeIndicatorRelDataType(
+ relDataTypeFactory.getTypeSystem(),
+ (BasicSqlType) timestampRelDataType,
+ localZonedTimestampType.isNullable(),
+ false);
+ default:
+ throw new TableException("Unknown timestamp kind.");
+ }
+ }
+
+ @Override
+ public RelDataType visit(YearMonthIntervalType yearMonthIntervalType) {
+ final int yearPrecision = yearMonthIntervalType.getYearPrecision();
+ final SqlIntervalQualifier intervalQualifier;
+ switch (yearMonthIntervalType.getResolution()) {
+ case YEAR:
+ intervalQualifier =
+ new SqlIntervalQualifier(
+ TimeUnit.YEAR,
+ yearPrecision,
+ TimeUnit.YEAR,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ SqlParserPos.ZERO);
+ break;
+ case YEAR_TO_MONTH:
+ intervalQualifier =
+ new SqlIntervalQualifier(
+ TimeUnit.YEAR,
+ yearPrecision,
+ TimeUnit.MONTH,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ SqlParserPos.ZERO);
+ break;
+ case MONTH:
+ intervalQualifier =
+ new SqlIntervalQualifier(
+ TimeUnit.MONTH,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ TimeUnit.MONTH,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ SqlParserPos.ZERO);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown interval resolution.");
+ }
+ return relDataTypeFactory.createSqlIntervalType(intervalQualifier);
+ }
+
+ @Override
+ public RelDataType visit(DayTimeIntervalType dayTimeIntervalType) {
+ final int dayPrecision = dayTimeIntervalType.getDayPrecision();
+ final int fractionalPrecision = dayTimeIntervalType.getFractionalPrecision();
+ final SqlIntervalQualifier intervalQualifier;
+ switch (dayTimeIntervalType.getResolution()) {
+ case DAY:
+ intervalQualifier =
+ new SqlIntervalQualifier(
+ TimeUnit.DAY,
+ dayPrecision,
+ TimeUnit.DAY,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ SqlParserPos.ZERO);
+ break;
+ case DAY_TO_HOUR:
+ intervalQualifier =
+ new SqlIntervalQualifier(
+ TimeUnit.DAY,
+ dayPrecision,
+ TimeUnit.HOUR,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ SqlParserPos.ZERO);
+ break;
+ case DAY_TO_MINUTE:
+ intervalQualifier =
+ new SqlIntervalQualifier(
+ TimeUnit.DAY,
+ dayPrecision,
+ TimeUnit.MINUTE,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ SqlParserPos.ZERO);
+ break;
+ case DAY_TO_SECOND:
+ intervalQualifier =
+ new SqlIntervalQualifier(
+ TimeUnit.DAY,
+ dayPrecision,
+ TimeUnit.SECOND,
+ fractionalPrecision,
+ SqlParserPos.ZERO);
+ break;
+ case HOUR:
+ intervalQualifier =
+ new SqlIntervalQualifier(
+ TimeUnit.HOUR,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ TimeUnit.HOUR,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ SqlParserPos.ZERO);
+ break;
+ case HOUR_TO_MINUTE:
+ intervalQualifier =
+ new SqlIntervalQualifier(
+ TimeUnit.HOUR,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ TimeUnit.MINUTE,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ SqlParserPos.ZERO);
+ break;
+ case HOUR_TO_SECOND:
+ intervalQualifier =
+ new SqlIntervalQualifier(
+ TimeUnit.HOUR,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ TimeUnit.SECOND,
+ fractionalPrecision,
+ SqlParserPos.ZERO);
+ break;
+ case MINUTE:
+ intervalQualifier =
+ new SqlIntervalQualifier(
+ TimeUnit.MINUTE,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ TimeUnit.MINUTE,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ SqlParserPos.ZERO);
+ break;
+ case MINUTE_TO_SECOND:
+ intervalQualifier =
+ new SqlIntervalQualifier(
+ TimeUnit.MINUTE,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ TimeUnit.SECOND,
+ fractionalPrecision,
+ SqlParserPos.ZERO);
+ break;
+ case SECOND:
+ intervalQualifier =
+ new SqlIntervalQualifier(
+ TimeUnit.SECOND,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ TimeUnit.SECOND,
+ fractionalPrecision,
+ SqlParserPos.ZERO);
+ break;
+ default:
+ throw new TableException("Unknown interval resolution.");
+ }
+ return relDataTypeFactory.createSqlIntervalType(intervalQualifier);
+ }
+
+ @Override
+ public RelDataType visit(ArrayType arrayType) {
+ final RelDataType elementRelDataType =
+ toRelDataType(arrayType.getElementType(), relDataTypeFactory);
+ return relDataTypeFactory.createArrayType(elementRelDataType, -1);
+ }
+
+ @Override
+ public RelDataType visit(MultisetType multisetType) {
+ final RelDataType elementRelDataType =
+ toRelDataType(multisetType.getElementType(), relDataTypeFactory);
+ return relDataTypeFactory.createMultisetType(elementRelDataType, -1);
+ }
+
+ @Override
+ public RelDataType visit(MapType mapType) {
+ final RelDataType keyRelDataType =
+ toRelDataType(mapType.getKeyType(), relDataTypeFactory);
+ final RelDataType valueRelDataType =
+ toRelDataType(mapType.getValueType(), relDataTypeFactory);
+ return relDataTypeFactory.createMapType(keyRelDataType, valueRelDataType);
+ }
+
+ @Override
+ public RelDataType visit(RowType rowType) {
+ return relDataTypeFactory.createStructType(
+ StructKind.PEEK_FIELDS_NO_EXPAND,
+ rowType.getFields().stream()
+ .map(f -> toRelDataType(f.getType(), relDataTypeFactory))
+ .collect(Collectors.toList()),
+ rowType.getFieldNames());
+ }
+
+ @Override
+ public RelDataType visit(DistinctType distinctType) {
+ throw new TableException("DISTINCT type is currently not supported.");
+ }
+
+ @Override
+ public RelDataType visit(StructuredType structuredType) {
+ final List<RelDataTypeField> fields = new ArrayList<>();
+ for (int i = 0; i < structuredType.getAttributes().size(); i++) {
+ final StructuredType.StructuredAttribute attribute =
+ structuredType.getAttributes().get(i);
+ final RelDataTypeField field =
+ new RelDataTypeFieldImpl(
+ attribute.getName(),
+ i,
+ toRelDataType(attribute.getType(), relDataTypeFactory));
+ fields.add(field);
+ }
+ return new StructuredRelDataType(structuredType, fields);
+ }
+
+ @Override
+ public RelDataType visit(NullType nullType) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.NULL);
+ }
+
+ @Override
+ public RelDataType visit(RawType<?> rawType) {
+ return new RawRelDataType(rawType);
+ }
+
+ @Override
+ public RelDataType visit(SymbolType<?> symbolType) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.SYMBOL);
+ }
+
+ @Override
+ public RelDataType visit(LogicalType other) {
+ throw new TableException(
+ String.format(
+ "Logical type '%s' cannot be converted to a RelDataType.", other));
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // RelDataType to LogicalType
+ // --------------------------------------------------------------------------------------------
+
+ private static LogicalType toLogicalTypeNotNull(
+ RelDataType relDataType, DataTypeFactory dataTypeFactory) {
+ // dataTypeFactory is a preparation for catalog user-defined types
+ switch (relDataType.getSqlTypeName()) {
+ case BOOLEAN:
+ return new BooleanType(false);
+ case TINYINT:
+ return new TinyIntType(false);
+ case SMALLINT:
+ return new SmallIntType(false);
+ case INTEGER:
+ return new IntType(false);
+ case BIGINT:
+ return new BigIntType(false);
+ case DECIMAL:
+ if (relDataType.getScale() < 0) {
+ // negative scale is not supported, normalize it
+ return new DecimalType(
+ false, relDataType.getPrecision() - relDataType.getScale(), 0);
+ }
+ return new DecimalType(false, relDataType.getPrecision(), relDataType.getScale());
+ case FLOAT:
+ return new FloatType(false);
+ case DOUBLE:
+ return new DoubleType(false);
+ case DATE:
+ return new DateType(false);
+ case TIME:
+ return new TimeType(false, relDataType.getPrecision());
+ case TIMESTAMP:
+ return new TimestampType(
+ false, getTimestampKind(relDataType), relDataType.getPrecision());
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return new LocalZonedTimestampType(
+ false, getTimestampKind(relDataType), relDataType.getPrecision());
+ case INTERVAL_YEAR:
+ case INTERVAL_YEAR_MONTH:
+ case INTERVAL_MONTH:
+ return new YearMonthIntervalType(
+ false, getYearMonthResolution(relDataType), relDataType.getPrecision());
+ case INTERVAL_DAY:
+ case INTERVAL_DAY_HOUR:
+ case INTERVAL_DAY_MINUTE:
+ case INTERVAL_DAY_SECOND:
+ case INTERVAL_HOUR:
+ case INTERVAL_HOUR_MINUTE:
+ case INTERVAL_HOUR_SECOND:
+ case INTERVAL_MINUTE:
+ case INTERVAL_MINUTE_SECOND:
+ return new DayTimeIntervalType(
+ false,
+ getDayTimeResolution(relDataType),
+ relDataType.getPrecision(),
+ relDataType.getScale());
+ case INTERVAL_SECOND:
+ return new DayTimeIntervalType(
+ false,
+ getDayTimeResolution(relDataType),
+ DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+ relDataType.getScale());
+ case CHAR:
+ if (relDataType.getPrecision() == 0) {
+ return CharType.ofEmptyLiteral();
+ }
+ return new CharType(false, relDataType.getPrecision());
+ case VARCHAR:
+ if (relDataType.getPrecision() == 0) {
+ return VarCharType.ofEmptyLiteral();
+ }
+ return new VarCharType(false, relDataType.getPrecision());
+ case BINARY:
+ if (relDataType.getPrecision() == 0) {
+ return BinaryType.ofEmptyLiteral();
+ }
+ return new BinaryType(false, relDataType.getPrecision());
+ case VARBINARY:
+ if (relDataType.getPrecision() == 0) {
+ return VarBinaryType.ofEmptyLiteral();
+ }
+ return new VarBinaryType(false, relDataType.getPrecision());
+ case NULL:
+ return new NullType();
+ case SYMBOL:
+ return new SymbolType<>(false);
+ case MULTISET:
+ return new MultisetType(
+ false, toLogicalType(relDataType.getComponentType(), dataTypeFactory));
+ case ARRAY:
+ return new ArrayType(
+ false, toLogicalType(relDataType.getComponentType(), dataTypeFactory));
+ case MAP:
+ return new MapType(
+ false,
+ toLogicalType(relDataType.getKeyType(), dataTypeFactory),
+ toLogicalType(relDataType.getValueType(), dataTypeFactory));
+ case DISTINCT:
+ throw new TableException("DISTINCT type is currently not supported.");
+ case ROW:
+ return new RowType(
+ false,
+ relDataType.getFieldList().stream()
+ .map(
+ f ->
+ new RowField(
+ f.getName(),
+ toLogicalType(
+ f.getType(), dataTypeFactory)))
+ .collect(Collectors.toList()));
+ case STRUCTURED:
+ case OTHER:
+ if (relDataType instanceof StructuredRelDataType) {
+ return ((StructuredRelDataType) relDataType).getStructuredType();
+ } else if (relDataType instanceof RawRelDataType) {
+ return ((RawRelDataType) relDataType).getRawType();
+ }
+ // fall through
+ case REAL:
+ case TIME_WITH_LOCAL_TIME_ZONE:
+ case ANY:
+ case CURSOR:
+ case COLUMN_LIST:
+ case DYNAMIC_STAR:
+ case GEOMETRY:
+ case SARG:
+ default:
+ throw new TableException("Unsupported RelDataType: " + relDataType);
+ }
+ }
+
+ private static TimestampKind getTimestampKind(RelDataType relDataType) {
+ if (relDataType instanceof TimeIndicatorRelDataType) {
+ final TimeIndicatorRelDataType timeIndicator = (TimeIndicatorRelDataType) relDataType;
+ if (timeIndicator.isEventTime()) {
+ return TimestampKind.ROWTIME;
+ } else {
+ return TimestampKind.PROCTIME;
+ }
+ } else {
+ return TimestampKind.REGULAR;
+ }
+ }
+
+ private static YearMonthResolution getYearMonthResolution(RelDataType relDataType) {
+ switch (relDataType.getSqlTypeName()) {
+ case INTERVAL_YEAR:
+ return YearMonthResolution.YEAR;
+ case INTERVAL_YEAR_MONTH:
+ return YearMonthResolution.YEAR_TO_MONTH;
+ case INTERVAL_MONTH:
+ return YearMonthResolution.MONTH;
+ default:
+ throw new TableException("Unsupported YearMonthResolution.");
+ }
+ }
+
+ private static DayTimeResolution getDayTimeResolution(RelDataType relDataType) {
+ switch (relDataType.getSqlTypeName()) {
+ case INTERVAL_DAY:
+ return DayTimeResolution.DAY;
+ case INTERVAL_DAY_HOUR:
+ return DayTimeResolution.DAY_TO_HOUR;
+ case INTERVAL_DAY_MINUTE:
+ return DayTimeResolution.DAY_TO_MINUTE;
+ case INTERVAL_DAY_SECOND:
+ return DayTimeResolution.DAY_TO_SECOND;
+ case INTERVAL_HOUR:
+ return DayTimeResolution.HOUR;
+ case INTERVAL_HOUR_MINUTE:
+ return DayTimeResolution.HOUR_TO_MINUTE;
+ case INTERVAL_HOUR_SECOND:
+ return DayTimeResolution.HOUR_TO_SECOND;
+ case INTERVAL_MINUTE:
+ return DayTimeResolution.MINUTE;
+ case INTERVAL_MINUTE_SECOND:
+ return DayTimeResolution.MINUTE_TO_SECOND;
+ case INTERVAL_SECOND:
+ return DayTimeResolution.SECOND;
+ default:
+ throw new TableException("Unsupported DayTimeResolution.");
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerdeTest.java
index e2a335e..a7b176a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerdeTest.java
@@ -19,18 +19,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.module.ModuleManager;
-import org.apache.flink.table.planner.calcite.FlinkContextImpl;
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.utils.CatalogManagerMocks;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@@ -38,6 +27,9 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.stream.Stream;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.configuredSerdeContext;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toJson;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toObject;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link DataType} serialization and deserialization. */
@@ -48,7 +40,7 @@ public class DataTypeJsonSerdeTest {
public void testDataTypeSerde(DataType dataType) throws IOException {
final SerdeContext serdeContext = configuredSerdeContext();
final String json = toJson(serdeContext, dataType);
- final DataType actual = toDataType(serdeContext, json);
+ final DataType actual = toObject(serdeContext, json, DataType.class);
assertThat(actual).isEqualTo(dataType);
}
@@ -83,41 +75,6 @@ public class DataTypeJsonSerdeTest {
// Shared utilities
// --------------------------------------------------------------------------------------------
- static SerdeContext configuredSerdeContext() {
- return configuredSerdeContext(
- CatalogManagerMocks.createEmptyCatalogManager(), TableConfig.getDefault());
- }
-
- static SerdeContext configuredSerdeContext(
- CatalogManager catalogManager, TableConfig tableConfig) {
- return new SerdeContext(
- new FlinkContextImpl(
- false, tableConfig, new ModuleManager(), null, catalogManager, null),
- Thread.currentThread().getContextClassLoader(),
- FlinkTypeFactory.INSTANCE(),
- FlinkSqlOperatorTable.instance());
- }
-
- static String toJson(SerdeContext serdeContext, DataType dataType) {
- final ObjectWriter objectWriter = JsonSerdeUtil.createObjectWriter(serdeContext);
- final String json;
- try {
- json = objectWriter.writeValueAsString(dataType);
- } catch (JsonProcessingException e) {
- throw new AssertionError(e);
- }
- return json;
- }
-
- static DataType toDataType(SerdeContext serdeContext, String json) {
- final ObjectReader objectReader = JsonSerdeUtil.createObjectReader(serdeContext);
- try {
- return objectReader.readValue(json, DataType.class);
- } catch (IOException e) {
- throw new AssertionError(e);
- }
- }
-
/** Testing class. */
public static class PojoClass {
public int f0;
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
index 7d095f0..1fb5b43 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
@@ -53,6 +53,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectRea
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
@@ -221,7 +222,9 @@ public class DynamicTableSourceSpecSerdeTest {
BigDecimal.valueOf(1000),
new SqlIntervalQualifier(
TimeUnit.SECOND,
+ RelDataType.PRECISION_NOT_SPECIFIED,
TimeUnit.SECOND,
+ 3,
SqlParserPos.ZERO))),
5000,
RowType.of(
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeMocks.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeMocks.java
new file mode 100644
index 0000000..f50dd9a
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeMocks.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.planner.calcite.FlinkContextImpl;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
+
+import java.io.IOException;
+
+/** Mocks and utilities for serde tests. */
+final class JsonSerdeMocks {
+
+ private JsonSerdeMocks() {
+ // no instantiation
+ }
+
+ static SerdeContext configuredSerdeContext() {
+ return configuredSerdeContext(
+ CatalogManagerMocks.createEmptyCatalogManager(), TableConfig.getDefault());
+ }
+
+ static SerdeContext configuredSerdeContext(
+ CatalogManager catalogManager, TableConfig tableConfig) {
+ return new SerdeContext(
+ new FlinkContextImpl(
+ false, tableConfig, new ModuleManager(), null, catalogManager, null),
+ Thread.currentThread().getContextClassLoader(),
+ FlinkTypeFactory.INSTANCE(),
+ FlinkSqlOperatorTable.instance());
+ }
+
+ static String toJson(SerdeContext serdeContext, Object object) {
+ final ObjectWriter objectWriter = JsonSerdeUtil.createObjectWriter(serdeContext);
+ final String json;
+ try {
+ json = objectWriter.writeValueAsString(object);
+ } catch (JsonProcessingException e) {
+ throw new AssertionError(e);
+ }
+ return json;
+ }
+
+ static <T> T toObject(SerdeContext serdeContext, String json, Class<T> clazz) {
+ final ObjectReader objectReader = JsonSerdeUtil.createObjectReader(serdeContext);
+ try {
+ return objectReader.readValue(json, clazz);
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
index d96dcb5..b8ff8a5 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
@@ -69,8 +69,6 @@ import org.apache.flink.types.Row;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
import org.junit.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -88,7 +86,9 @@ import java.util.Optional;
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation.ALL;
import static org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation.IDENTIFIER;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerdeTest.configuredSerdeContext;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.configuredSerdeContext;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toJson;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toObject;
import static org.apache.flink.table.utils.CatalogManagerMocks.preparedCatalogManager;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -102,7 +102,7 @@ public class LogicalTypeJsonSerdeTest {
final SerdeContext serdeContext = configuredSerdeContext();
final String json = toJson(serdeContext, logicalType);
- final LogicalType actual = toLogicalType(serdeContext, json);
+ final LogicalType actual = toObject(serdeContext, json, LogicalType.class);
assertThat(actual).isEqualTo(logicalType);
}
@@ -126,7 +126,7 @@ public class LogicalTypeJsonSerdeTest {
TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
TableConfigOptions.CatalogPlanRestore.IDENTIFIER);
dataTypeFactoryMock.logicalType = Optional.empty();
- assertThatThrownBy(() -> toLogicalType(serdeContext, minimalJson))
+ assertThatThrownBy(() -> toObject(serdeContext, minimalJson, LogicalType.class))
.satisfies(anyCauseMatches(ValidationException.class, "No type found."));
// catalog lookup
@@ -134,7 +134,8 @@ public class LogicalTypeJsonSerdeTest {
TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
TableConfigOptions.CatalogPlanRestore.IDENTIFIER);
dataTypeFactoryMock.logicalType = Optional.of(STRUCTURED_TYPE);
- assertThat(toLogicalType(serdeContext, minimalJson)).isEqualTo(STRUCTURED_TYPE);
+ assertThat(toObject(serdeContext, minimalJson, LogicalType.class))
+ .isEqualTo(STRUCTURED_TYPE);
// maximum plan content
config.set(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS, ALL);
@@ -151,7 +152,7 @@ public class LogicalTypeJsonSerdeTest {
TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
TableConfigOptions.CatalogPlanRestore.IDENTIFIER);
dataTypeFactoryMock.logicalType = Optional.empty();
- assertThatThrownBy(() -> toLogicalType(serdeContext, maximumJson))
+ assertThatThrownBy(() -> toObject(serdeContext, maximumJson, LogicalType.class))
.satisfies(anyCauseMatches(ValidationException.class, "No type found."));
// catalog lookup
@@ -159,14 +160,16 @@ public class LogicalTypeJsonSerdeTest {
TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
TableConfigOptions.CatalogPlanRestore.IDENTIFIER);
dataTypeFactoryMock.logicalType = Optional.of(UPDATED_STRUCTURED_TYPE);
- assertThat(toLogicalType(serdeContext, maximumJson)).isEqualTo(UPDATED_STRUCTURED_TYPE);
+ assertThat(toObject(serdeContext, maximumJson, LogicalType.class))
+ .isEqualTo(UPDATED_STRUCTURED_TYPE);
// no lookup
config.set(
TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
TableConfigOptions.CatalogPlanRestore.ALL);
dataTypeFactoryMock.logicalType = Optional.of(UPDATED_STRUCTURED_TYPE);
- assertThat(toLogicalType(serdeContext, maximumJson)).isEqualTo(STRUCTURED_TYPE);
+ assertThat(toObject(serdeContext, maximumJson, LogicalType.class))
+ .isEqualTo(STRUCTURED_TYPE);
}
// --------------------------------------------------------------------------------------------
@@ -418,28 +421,4 @@ public class LogicalTypeJsonSerdeTest {
DataType dataType, boolean isInternalType) {
return isInternalType ? dataType.toInternal() : dataType;
}
-
- // --------------------------------------------------------------------------------------------
- // Shared utilities
- // --------------------------------------------------------------------------------------------
-
- static String toJson(SerdeContext serdeContext, LogicalType logicalType) {
- final ObjectWriter objectWriter = JsonSerdeUtil.createObjectWriter(serdeContext);
- final String json;
- try {
- json = objectWriter.writeValueAsString(logicalType);
- } catch (JsonProcessingException e) {
- throw new AssertionError(e);
- }
- return json;
- }
-
- static LogicalType toLogicalType(SerdeContext serdeContext, String json) {
- final ObjectReader objectReader = JsonSerdeUtil.createObjectReader(serdeContext);
- try {
- return objectReader.readValue(json, LogicalType.class);
- } catch (IOException e) {
- throw new AssertionError(e);
- }
- }
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerdeTest.java
index 69aa587..5426fe2 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerdeTest.java
@@ -18,26 +18,15 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
-import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
-import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.module.ModuleManager;
-import org.apache.flink.table.planner.calcite.FlinkContextImpl;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LegacyTypeInformationType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RawType;
import org.apache.flink.table.types.logical.StructuredType;
-import org.apache.flink.table.types.logical.TypeInformationRawType;
import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.table.utils.CatalogManagerMocks;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.calcite.avatica.util.TimeUnit;
import org.apache.calcite.rel.type.RelDataType;
@@ -46,83 +35,123 @@ import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.configuredSerdeContext;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toJson;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toObject;
+import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for serialization/deserialization of {@link RelDataType}. */
-@RunWith(Parameterized.class)
+/** Tests for {@link RelDataType} serialization and deserialization. */
public class RelDataTypeJsonSerdeTest {
+
private static final FlinkTypeFactory FACTORY = FlinkTypeFactory.INSTANCE();
- @Parameterized.Parameters(name = "type = {0}")
- public static Collection<RelDataType> parameters() {
+ @ParameterizedTest
+ @MethodSource("testRelDataTypeSerde")
+ public void testRelDataTypeSerde(RelDataType relDataType) throws IOException {
+ final SerdeContext serdeContext = configuredSerdeContext();
+
+ final String json = toJson(serdeContext, relDataType);
+ final RelDataType actual = toObject(serdeContext, json, RelDataType.class);
+
+ assertThat(actual).isSameAs(relDataType);
+ }
+
+ @Test
+ public void testMissingPrecisionAndScale() {
+ final SerdeContext serdeContext = configuredSerdeContext();
+
+ final String json =
+ toJson(
+ serdeContext,
+ FACTORY.createSqlIntervalType(
+ new SqlIntervalQualifier(
+ TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)));
+ final RelDataType actual = toObject(serdeContext, json, RelDataType.class);
+
+ assertThat(actual)
+ .isSameAs(
+ FACTORY.createSqlIntervalType(
+ new SqlIntervalQualifier(
+ TimeUnit.DAY,
+ DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+ TimeUnit.SECOND,
+ DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION,
+ SqlParserPos.ZERO)));
+ }
+
+ @Test
+ public void testNegativeScale() {
+ final SerdeContext serdeContext = configuredSerdeContext();
+
+ final String json = toJson(serdeContext, FACTORY.createSqlType(SqlTypeName.DECIMAL, 5, -1));
+ final RelDataType actual = toObject(serdeContext, json, RelDataType.class);
+
+ assertThat(actual).isSameAs(FACTORY.createSqlType(SqlTypeName.DECIMAL, 6, 0));
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Test data
+ // --------------------------------------------------------------------------------------------
+
+ @Parameters(name = "{0}")
+ public static List<RelDataType> testRelDataTypeSerde() {
// the values in the list do not care about nullable.
- List<RelDataType> types =
+ final List<RelDataType> types =
Arrays.asList(
FACTORY.createSqlType(SqlTypeName.BOOLEAN),
FACTORY.createSqlType(SqlTypeName.TINYINT),
FACTORY.createSqlType(SqlTypeName.SMALLINT),
FACTORY.createSqlType(SqlTypeName.INTEGER),
FACTORY.createSqlType(SqlTypeName.BIGINT),
- FACTORY.createSqlType(SqlTypeName.DECIMAL, 3, 10),
- FACTORY.createSqlType(SqlTypeName.DECIMAL, 0, 19),
- FACTORY.createSqlType(SqlTypeName.DECIMAL, -1, 19),
+ FACTORY.createSqlType(SqlTypeName.DECIMAL, 10, 3),
+ FACTORY.createSqlType(SqlTypeName.DECIMAL, 19, 0),
+ FACTORY.createSqlType(SqlTypeName.DECIMAL, 38, 19),
FACTORY.createSqlType(SqlTypeName.FLOAT),
- FACTORY.createSqlType(SqlTypeName.REAL),
FACTORY.createSqlType(SqlTypeName.DOUBLE),
FACTORY.createSqlType(SqlTypeName.DATE),
FACTORY.createSqlType(SqlTypeName.TIME),
- FACTORY.createSqlType(SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE),
FACTORY.createSqlType(SqlTypeName.TIMESTAMP),
FACTORY.createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE),
FACTORY.createSqlIntervalType(
new SqlIntervalQualifier(
- TimeUnit.YEAR, TimeUnit.YEAR, SqlParserPos.ZERO)),
- FACTORY.createSqlIntervalType(
- new SqlIntervalQualifier(
- TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO)),
- FACTORY.createSqlIntervalType(
- new SqlIntervalQualifier(
- TimeUnit.MONTH, TimeUnit.MONTH, SqlParserPos.ZERO)),
- FACTORY.createSqlIntervalType(
- new SqlIntervalQualifier(
- TimeUnit.DAY, TimeUnit.DAY, SqlParserPos.ZERO)),
+ TimeUnit.DAY,
+ 2,
+ TimeUnit.MINUTE,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ SqlParserPos.ZERO)),
FACTORY.createSqlIntervalType(
new SqlIntervalQualifier(
- TimeUnit.DAY, TimeUnit.HOUR, SqlParserPos.ZERO)),
+ TimeUnit.DAY, 6, TimeUnit.SECOND, 9, SqlParserPos.ZERO)),
FACTORY.createSqlIntervalType(
new SqlIntervalQualifier(
- TimeUnit.DAY, TimeUnit.MINUTE, SqlParserPos.ZERO)),
+ TimeUnit.HOUR,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ TimeUnit.SECOND,
+ 9,
+ SqlParserPos.ZERO)),
FACTORY.createSqlIntervalType(
new SqlIntervalQualifier(
- TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)),
+ TimeUnit.MINUTE,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ TimeUnit.SECOND,
+ 0,
+ SqlParserPos.ZERO)),
FACTORY.createSqlIntervalType(
new SqlIntervalQualifier(
- TimeUnit.HOUR, TimeUnit.HOUR, SqlParserPos.ZERO)),
- FACTORY.createSqlIntervalType(
- new SqlIntervalQualifier(
- TimeUnit.HOUR, TimeUnit.MINUTE, SqlParserPos.ZERO)),
- FACTORY.createSqlIntervalType(
- new SqlIntervalQualifier(
- TimeUnit.HOUR, TimeUnit.SECOND, SqlParserPos.ZERO)),
- FACTORY.createSqlIntervalType(
- new SqlIntervalQualifier(
- TimeUnit.MINUTE, TimeUnit.MINUTE, SqlParserPos.ZERO)),
- FACTORY.createSqlIntervalType(
- new SqlIntervalQualifier(
- TimeUnit.MINUTE, TimeUnit.SECOND, SqlParserPos.ZERO)),
- FACTORY.createSqlIntervalType(
- new SqlIntervalQualifier(
- TimeUnit.SECOND, TimeUnit.SECOND, SqlParserPos.ZERO)),
+ TimeUnit.SECOND,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ TimeUnit.SECOND,
+ 6,
+ SqlParserPos.ZERO)),
FACTORY.createSqlType(SqlTypeName.CHAR),
FACTORY.createSqlType(SqlTypeName.CHAR, 0),
FACTORY.createSqlType(SqlTypeName.CHAR, 32),
@@ -136,7 +165,6 @@ public class RelDataTypeJsonSerdeTest {
FACTORY.createSqlType(SqlTypeName.VARBINARY, 0),
FACTORY.createSqlType(SqlTypeName.VARBINARY, 1000),
FACTORY.createSqlType(SqlTypeName.NULL),
- FACTORY.createSqlType(SqlTypeName.ANY),
FACTORY.createSqlType(SqlTypeName.SYMBOL),
FACTORY.createMultisetType(FACTORY.createSqlType(SqlTypeName.VARCHAR), -1),
FACTORY.createArrayType(FACTORY.createSqlType(SqlTypeName.VARCHAR, 16), -1),
@@ -156,17 +184,16 @@ public class RelDataTypeJsonSerdeTest {
FACTORY.createSqlType(SqlTypeName.INTEGER),
FACTORY.createSqlType(SqlTypeName.VARCHAR, 10)),
-1)),
- FACTORY.createSqlType(SqlTypeName.DISTINCT),
- FACTORY.createSqlType(SqlTypeName.STRUCTURED),
// simple struct type
FACTORY.createStructType(
- StructKind.PEEK_FIELDS,
+ StructKind.PEEK_FIELDS_NO_EXPAND,
Arrays.asList(
FACTORY.createSqlType(SqlTypeName.INTEGER),
- FACTORY.createSqlType(SqlTypeName.DECIMAL, 3, 10)),
+ FACTORY.createSqlType(SqlTypeName.DECIMAL, 10, 3)),
Arrays.asList("f1", "f2")),
// struct type with array type
FACTORY.createStructType(
+ StructKind.PEEK_FIELDS_NO_EXPAND,
Arrays.asList(
FACTORY.createSqlType(SqlTypeName.VARCHAR),
FACTORY.createArrayType(
@@ -175,8 +202,10 @@ public class RelDataTypeJsonSerdeTest {
Arrays.asList("f1", "f2")),
// nested struct type
FACTORY.createStructType(
+ StructKind.PEEK_FIELDS_NO_EXPAND,
Arrays.asList(
FACTORY.createStructType(
+ StructKind.PEEK_FIELDS_NO_EXPAND,
Arrays.asList(
FACTORY.createSqlType(
SqlTypeName.VARCHAR, 5),
@@ -187,13 +216,10 @@ public class RelDataTypeJsonSerdeTest {
FACTORY.createSqlType(SqlTypeName.VARCHAR, 16),
-1)),
Arrays.asList("f3", "f4")),
- FACTORY.createSqlType(SqlTypeName.SARG),
FACTORY.createRowtimeIndicatorType(true, false),
FACTORY.createRowtimeIndicatorType(true, true),
FACTORY.createProctimeIndicatorType(true),
FACTORY.createFieldTypeFromLogicalType(
- new LegacyTypeInformationType<>(LogicalTypeRoot.RAW, Types.STRING)),
- FACTORY.createFieldTypeFromLogicalType(
StructuredType.newBuilder(
ObjectIdentifier.of("cat", "db", "structuredType"),
DataTypeJsonSerdeTest.PojoClass.class)
@@ -213,79 +239,28 @@ public class RelDataTypeJsonSerdeTest {
.description("description for StructuredType")
.build()));
- List<RelDataType> ret = new ArrayList<>(types.size() * 2);
+ final List<RelDataType> mutableTypes = new ArrayList<>(types.size() * 2);
for (RelDataType type : types) {
- ret.add(FACTORY.createTypeWithNullability(type, true));
- ret.add(FACTORY.createTypeWithNullability(type, false));
+ mutableTypes.add(FACTORY.createTypeWithNullability(type, true));
+ mutableTypes.add(FACTORY.createTypeWithNullability(type, false));
}
- ret.add(
+ mutableTypes.add(
FACTORY.createTypeWithNullability(
FACTORY.createFieldTypeFromLogicalType(
new RawType<>(true, Void.class, VoidSerializer.INSTANCE)),
true));
- ret.add(
+ mutableTypes.add(
FACTORY.createTypeWithNullability(
FACTORY.createFieldTypeFromLogicalType(
new RawType<>(false, Void.class, VoidSerializer.INSTANCE)),
false));
- ret.add(
+ mutableTypes.add(
FACTORY.createTypeWithNullability(
FACTORY.createFieldTypeFromLogicalType(
new RawType<>(true, Void.class, VoidSerializer.INSTANCE)),
false));
- ret.add(
- FACTORY.createTypeWithNullability(
- FACTORY.createFieldTypeFromLogicalType(
- new TypeInformationRawType<>(true, Types.STRING)),
- true));
- ret.add(
- FACTORY.createTypeWithNullability(
- FACTORY.createFieldTypeFromLogicalType(
- new TypeInformationRawType<>(false, Types.STRING)),
- false));
- ret.add(
- FACTORY.createTypeWithNullability(
- FACTORY.createFieldTypeFromLogicalType(
- new TypeInformationRawType<>(true, Types.STRING)),
- false));
- return ret;
- }
-
- @Parameterized.Parameter public RelDataType relDataType;
-
- @Test
- public void testTypeSerde() throws IOException {
- SerdeContext serdeCtx =
- new SerdeContext(
- new FlinkContextImpl(
- false,
- TableConfig.getDefault(),
- new ModuleManager(),
- null,
- CatalogManagerMocks.createEmptyCatalogManager(),
- null),
- Thread.currentThread().getContextClassLoader(),
- FlinkTypeFactory.INSTANCE(),
- FlinkSqlOperatorTable.instance());
- ObjectReader objectReader = JsonSerdeUtil.createObjectReader(serdeCtx);
- ObjectWriter objectWriter = JsonSerdeUtil.createObjectWriter(serdeCtx);
-
- final String json = objectWriter.writeValueAsString(relDataType);
-
- RelDataType actual = objectReader.readValue(json, RelDataType.class);
- // type system will fill the default precision if the precision is not defined
- if (relDataType.toString().equals("DECIMAL")) {
- assertEquals(SqlTypeName.DECIMAL, actual.getSqlTypeName());
- assertEquals(relDataType.getScale(), actual.getScale());
- assertEquals(
- serdeCtx.getTypeFactory()
- .getTypeSystem()
- .getDefaultPrecision(SqlTypeName.DECIMAL),
- actual.getPrecision());
- } else {
- assertSame(relDataType, actual);
- }
+ return mutableTypes;
}
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java
index becc4030..07a6ad2 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java
@@ -35,7 +35,6 @@ import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.utils.EncodingUtils;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
@@ -60,7 +59,6 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.io.StringWriter;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
@@ -96,12 +94,12 @@ public class RexNodeSerdeTest {
RexBuilder rexBuilder = new RexBuilder(FACTORY);
RelDataType inputType =
FACTORY.createStructType(
- StructKind.PEEK_FIELDS,
+ StructKind.PEEK_FIELDS_NO_EXPAND,
Arrays.asList(
FACTORY.createSqlType(SqlTypeName.INTEGER),
FACTORY.createSqlType(SqlTypeName.BIGINT),
FACTORY.createStructType(
- StructKind.PEEK_FIELDS,
+ StructKind.PEEK_FIELDS_NO_EXPAND,
Arrays.asList(
FACTORY.createSqlType(SqlTypeName.VARCHAR),
FACTORY.createSqlType(SqlTypeName.VARCHAR)),
@@ -133,100 +131,34 @@ public class RexNodeSerdeTest {
FACTORY.createSqlType(SqlTypeName.FLOAT)),
rexBuilder.makeExactLiteral(BigDecimal.valueOf(random.nextDouble())),
rexBuilder.makeIntervalLiteral(
- new SqlIntervalQualifier(
- TimeUnit.YEAR, TimeUnit.YEAR, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
BigDecimal.valueOf(100),
new SqlIntervalQualifier(
- TimeUnit.YEAR, TimeUnit.YEAR, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- BigDecimal.valueOf(3),
- new SqlIntervalQualifier(
- TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- new SqlIntervalQualifier(
- TimeUnit.MONTH, TimeUnit.MONTH, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- BigDecimal.valueOf(3),
- new SqlIntervalQualifier(
- TimeUnit.MONTH, TimeUnit.MONTH, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- new SqlIntervalQualifier(
- TimeUnit.DAY, TimeUnit.DAY, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- BigDecimal.valueOf(3),
- new SqlIntervalQualifier(
- TimeUnit.DAY, TimeUnit.DAY, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- new SqlIntervalQualifier(
- TimeUnit.DAY, TimeUnit.HOUR, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- BigDecimal.valueOf(3),
- new SqlIntervalQualifier(
- TimeUnit.DAY, TimeUnit.HOUR, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- new SqlIntervalQualifier(
- TimeUnit.DAY, TimeUnit.MINUTE, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- BigDecimal.valueOf(3),
- new SqlIntervalQualifier(
- TimeUnit.DAY, TimeUnit.MINUTE, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- new SqlIntervalQualifier(
- TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- BigDecimal.valueOf(3),
- new SqlIntervalQualifier(
- TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- new SqlIntervalQualifier(
- TimeUnit.HOUR, TimeUnit.HOUR, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- BigDecimal.valueOf(3),
- new SqlIntervalQualifier(
- TimeUnit.HOUR, TimeUnit.HOUR, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- new SqlIntervalQualifier(
- TimeUnit.HOUR, TimeUnit.MINUTE, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- BigDecimal.valueOf(3),
- new SqlIntervalQualifier(
- TimeUnit.HOUR, TimeUnit.MINUTE, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- new SqlIntervalQualifier(
- TimeUnit.HOUR, TimeUnit.SECOND, SqlParserPos.ZERO)),
+ TimeUnit.YEAR,
+ 4,
+ TimeUnit.YEAR,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ SqlParserPos.ZERO)),
rexBuilder.makeIntervalLiteral(
BigDecimal.valueOf(3),
new SqlIntervalQualifier(
- TimeUnit.HOUR, TimeUnit.SECOND, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- new SqlIntervalQualifier(
- TimeUnit.MINUTE, TimeUnit.MINUTE, SqlParserPos.ZERO)),
+ TimeUnit.YEAR,
+ 2,
+ TimeUnit.MONTH,
+ RelDataType.PRECISION_NOT_SPECIFIED,
+ SqlParserPos.ZERO)),
rexBuilder.makeIntervalLiteral(
BigDecimal.valueOf(3),
new SqlIntervalQualifier(
- TimeUnit.MINUTE, TimeUnit.MINUTE, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- new SqlIntervalQualifier(
- TimeUnit.MINUTE, TimeUnit.SECOND, SqlParserPos.ZERO)),
+ TimeUnit.DAY, 2, TimeUnit.SECOND, 6, SqlParserPos.ZERO)),
rexBuilder.makeIntervalLiteral(
BigDecimal.valueOf(3),
new SqlIntervalQualifier(
- TimeUnit.MINUTE, TimeUnit.SECOND, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- new SqlIntervalQualifier(
- TimeUnit.SECOND, TimeUnit.SECOND, SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- BigDecimal.valueOf(3),
- new SqlIntervalQualifier(
- TimeUnit.SECOND, TimeUnit.SECOND, SqlParserPos.ZERO)),
+ TimeUnit.SECOND, 2, TimeUnit.SECOND, 6, SqlParserPos.ZERO)),
rexBuilder.makeDateLiteral(DateString.fromDaysSinceEpoch(10)),
rexBuilder.makeDateLiteral(new DateString("2000-12-12")),
rexBuilder.makeTimeLiteral(TimeString.fromMillisOfDay(1234), 3),
rexBuilder.makeTimeLiteral(TimeString.fromMillisOfDay(123456), 6),
rexBuilder.makeTimeLiteral(new TimeString("01:01:01.000000001"), 9),
- rexBuilder.makeTimeWithLocalTimeZoneLiteral(
- TimeString.fromMillisOfDay(1234), 3),
rexBuilder.makeTimestampLiteral(
TimestampString.fromMillisSinceEpoch(1234), 3),
rexBuilder.makeTimestampLiteral(
@@ -245,6 +177,7 @@ public class RexNodeSerdeTest {
rexBuilder.makeLiteral(
Arrays.<Object>asList(1, 2L),
FACTORY.createStructType(
+ StructKind.PEEK_FIELDS_NO_EXPAND,
Arrays.asList(
FACTORY.createSqlType(SqlTypeName.INTEGER),
FACTORY.createSqlType(SqlTypeName.BIGINT)),
@@ -373,11 +306,7 @@ public class RexNodeSerdeTest {
ObjectReader objectReader = JsonSerdeUtil.createObjectReader(serdeCtx);
ObjectWriter objectWriter = JsonSerdeUtil.createObjectWriter(serdeCtx);
- StringWriter writer = new StringWriter(100);
- try (JsonGenerator gen = objectWriter.getFactory().createGenerator(writer)) {
- gen.writeObject(rexNode);
- }
- String json = writer.toString();
+ String json = objectWriter.writeValueAsString(rexNode);
RexNode actual = objectReader.readValue(json, RexNode.class);
assertEquals(rexNode, actual);
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java
index 2a04c58..1f69163 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.planner.calcite.FlinkContextImpl;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
@@ -48,7 +49,7 @@ public class RexWindowBoundSerdeTest {
TableConfig.getDefault(),
new ModuleManager(),
null,
- null,
+ CatalogManagerMocks.createEmptyCatalogManager(),
null),
Thread.currentThread().getContextClassLoader(),
FlinkTypeFactory.INSTANCE(),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java
index 9ad7670..ff56411 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java
@@ -38,7 +38,6 @@ import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
import org.apache.flink.table.utils.CatalogManagerMocks;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
@@ -47,7 +46,6 @@ import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.Test;
import java.io.IOException;
-import java.io.StringWriter;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -81,13 +79,9 @@ public class TemporalTableSourceSpecSerdeTest {
ObjectReader objectReader = JsonSerdeUtil.createObjectReader(serdeCtx);
ObjectWriter objectWriter = JsonSerdeUtil.createObjectWriter(serdeCtx);
- StringWriter writer = new StringWriter(100);
List<TemporalTableSourceSpec> specs = testData();
for (TemporalTableSourceSpec spec : specs) {
- try (JsonGenerator gen = objectWriter.getFactory().createGenerator(writer)) {
- gen.writeObject(spec);
- }
- String json = writer.toString();
+ String json = objectWriter.writeValueAsString(spec);
TemporalTableSourceSpec actual =
objectReader.readValue(json, TemporalTableSourceSpec.class);
assertEquals(spec.getTableSourceSpec(), actual.getTableSourceSpec());
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverterTest.java
new file mode 100644
index 0000000..7ef17fe
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverterTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.typeutils;
+
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerdeTest;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.utils.DataTypeFactoryMock;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.test.TableAssertions.assertThat;
+
+/** Tests for {@link LogicalRelDataTypeConverter}. */
+public class LogicalRelDataTypeConverterTest {
+
+ @ParameterizedTest
+ @MethodSource("testConversion")
+ public void testConversion(LogicalType logicalType) throws IOException {
+ final RelDataTypeFactory typeFactory = FlinkTypeFactory.INSTANCE();
+ final DataTypeFactoryMock dataTypeFactory = new DataTypeFactoryMock();
+ final RelDataType relDataType =
+ LogicalRelDataTypeConverter.toRelDataType(logicalType, typeFactory);
+ assertThat(LogicalRelDataTypeConverter.toLogicalType(relDataType, dataTypeFactory))
+ .isEqualTo(logicalType);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Test data
+ // --------------------------------------------------------------------------------------------
+
+ @Parameters(name = "{0}")
+ private static Stream<LogicalType> testConversion() {
+ return Stream.of(
+ new BooleanType(),
+ new TinyIntType(),
+ new SmallIntType(),
+ new IntType(),
+ new BigIntType(),
+ new FloatType(),
+ new DoubleType(),
+ new DecimalType(10),
+ new DecimalType(15, 5),
+ CharType.ofEmptyLiteral(),
+ new CharType(),
+ new CharType(5),
+ VarCharType.ofEmptyLiteral(),
+ new VarCharType(),
+ new VarCharType(5),
+ BinaryType.ofEmptyLiteral(),
+ new BinaryType(),
+ new BinaryType(100),
+ VarBinaryType.ofEmptyLiteral(),
+ new VarBinaryType(),
+ new VarBinaryType(100),
+ new DateType(),
+ new TimeType(),
+ new TimeType(3),
+ new TimestampType(),
+ new TimestampType(3),
+ new LocalZonedTimestampType(false, TimestampKind.PROCTIME, 3),
+ new TimestampType(false, TimestampKind.ROWTIME, 3),
+ new LocalZonedTimestampType(),
+ new LocalZonedTimestampType(3),
+ new LocalZonedTimestampType(false, TimestampKind.PROCTIME, 3),
+ new LocalZonedTimestampType(false, TimestampKind.ROWTIME, 3),
+ new DayTimeIntervalType(DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR),
+ new DayTimeIntervalType(
+ false, DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR, 3, 6),
+ new YearMonthIntervalType(YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH),
+ new YearMonthIntervalType(
+ false, YearMonthIntervalType.YearMonthResolution.MONTH, 2),
+ new LocalZonedTimestampType(),
+ new LocalZonedTimestampType(false, TimestampKind.PROCTIME, 3),
+ new SymbolType<>(),
+ new ArrayType(new IntType(false)),
+ new ArrayType(new LocalZonedTimestampType(false, TimestampKind.ROWTIME, 3)),
+ new ArrayType(new TimestampType()),
+ new ArrayType(CharType.ofEmptyLiteral()),
+ new ArrayType(VarCharType.ofEmptyLiteral()),
+ new ArrayType(BinaryType.ofEmptyLiteral()),
+ new ArrayType(VarBinaryType.ofEmptyLiteral()),
+ new MapType(new BigIntType(), new IntType(false)),
+ new MapType(
+ new TimestampType(false, TimestampKind.ROWTIME, 3),
+ new LocalZonedTimestampType()),
+ new MapType(CharType.ofEmptyLiteral(), CharType.ofEmptyLiteral()),
+ new MapType(VarCharType.ofEmptyLiteral(), VarCharType.ofEmptyLiteral()),
+ new MapType(BinaryType.ofEmptyLiteral(), BinaryType.ofEmptyLiteral()),
+ new MapType(VarBinaryType.ofEmptyLiteral(), VarBinaryType.ofEmptyLiteral()),
+ new MultisetType(new IntType(false)),
+ new MultisetType(new TimestampType()),
+ new MultisetType(new TimestampType(true, TimestampKind.ROWTIME, 3)),
+ new MultisetType(CharType.ofEmptyLiteral()),
+ new MultisetType(VarCharType.ofEmptyLiteral()),
+ new MultisetType(BinaryType.ofEmptyLiteral()),
+ new MultisetType(VarBinaryType.ofEmptyLiteral()),
+ RowType.of(new BigIntType(), new IntType(false), new VarCharType(200)),
+ RowType.of(
+ new LogicalType[] {
+ new BigIntType(), new IntType(false), new VarCharType(200)
+ },
+ new String[] {"f1", "f2", "f3"}),
+ RowType.of(
+ new TimestampType(false, TimestampKind.ROWTIME, 3),
+ new TimestampType(false, TimestampKind.REGULAR, 3),
+ new LocalZonedTimestampType(false, TimestampKind.ROWTIME, 3),
+ new LocalZonedTimestampType(false, TimestampKind.PROCTIME, 3),
+ new LocalZonedTimestampType(false, TimestampKind.REGULAR, 3)),
+ RowType.of(
+ CharType.ofEmptyLiteral(),
+ VarCharType.ofEmptyLiteral(),
+ BinaryType.ofEmptyLiteral(),
+ VarBinaryType.ofEmptyLiteral()),
+ // registered structured type
+ StructuredType.newBuilder(
+ ObjectIdentifier.of("cat", "db", "structuredType"),
+ DataTypeJsonSerdeTest.PojoClass.class)
+ .attributes(
+ Arrays.asList(
+ new StructuredType.StructuredAttribute(
+ "f0", new IntType(true)),
+ new StructuredType.StructuredAttribute(
+ "f1", new BigIntType(true)),
+ new StructuredType.StructuredAttribute(
+ "f2", new VarCharType(200), "desc")))
+ .comparison(StructuredType.StructuredComparison.FULL)
+ .setFinal(false)
+ .setInstantiable(false)
+ .superType(
+ StructuredType.newBuilder(
+ ObjectIdentifier.of("cat", "db", "structuredType2"))
+ .attributes(
+ Collections.singletonList(
+ new StructuredType.StructuredAttribute(
+ "f0", new BigIntType(false))))
+ .build())
+ .description("description for StructuredType")
+ .build(),
+ // unregistered structured type
+ StructuredType.newBuilder(PojoClass.class)
+ .attributes(
+ Arrays.asList(
+ new StructuredType.StructuredAttribute(
+ "f0", new IntType(true)),
+ new StructuredType.StructuredAttribute(
+ "f1", new BigIntType(true)),
+ new StructuredType.StructuredAttribute(
+ "f2", new VarCharType(200), "desc")))
+ .build(),
+ // custom RawType
+ new RawType<>(LocalDateTime.class, LocalDateTimeSerializer.INSTANCE));
+ }
+
+ /** Testing class. */
+ public static class PojoClass {
+ public int f0;
+ public long f1;
+ public String f2;
+ }
+}