You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/14 19:48:27 UTC
[flink] 03/07: [hotfix] Fix schema to DataType/Type conversion
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b025db621a56c6f61482c0fbb2fdb91381422904
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Sep 11 16:30:16 2020 +0200
[hotfix] Fix schema to DataType/Type conversion
---
.../avro/typeutils/AvroSchemaConverter.java | 22 ++++++++++------------
.../avro/typeutils/AvroSchemaConverterTest.java | 10 +++++-----
2 files changed, 15 insertions(+), 17 deletions(-)
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
index 44c5d84..332d18e 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -157,6 +157,8 @@ public class AvroSchemaConverter {
// logical timestamp type
if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
return Types.SQL_TIMESTAMP;
+ } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
+ return Types.SQL_TIME;
}
return Types.LONG;
case FLOAT:
@@ -255,28 +257,24 @@ public class AvroSchemaConverter {
decimalType.getScale())
.notNull();
}
- return DataTypes.ARRAY(DataTypes.TINYINT().bridgedTo(Byte.class))
- .notNull();
+ return DataTypes.BYTES().notNull();
case INT:
// logical date and time type
final org.apache.avro.LogicalType logicalType = schema.getLogicalType();
if (logicalType == LogicalTypes.date()) {
- return DataTypes.DATE().bridgedTo(java.sql.Date.class).notNull();
+ return DataTypes.DATE().notNull();
} else if (logicalType == LogicalTypes.timeMillis()) {
- return DataTypes.TIME().bridgedTo(java.sql.Time.class).notNull();
+ return DataTypes.TIME().notNull();
}
return DataTypes.INT().notNull();
case LONG:
// logical timestamp type
if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
- return DataTypes.TIMESTAMP(3)
- .bridgedTo(java.sql.Timestamp.class)
- .notNull();
- }
- if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
- return DataTypes.TIMESTAMP(6)
- .bridgedTo(java.sql.Timestamp.class)
- .notNull();
+ return DataTypes.TIMESTAMP(3).notNull();
+ } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
+ return DataTypes.TIMESTAMP(6).notNull();
+ } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
+ return DataTypes.TIME(6).notNull();
}
return DataTypes.BIGINT().notNull();
case FLOAT:
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
index 95b5696..98efd6e 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
@@ -190,14 +190,14 @@ public class AvroSchemaConverterTest {
DataTypes.FIELD("type_fixed", DataTypes.VARBINARY(16)),
DataTypes.FIELD("type_union", DataTypes.RAW(Types.GENERIC(Object.class)).notNull()),
DataTypes.FIELD("type_nested", address),
- DataTypes.FIELD("type_bytes", DataTypes.ARRAY(DataTypes.TINYINT().bridgedTo(Byte.class)).notNull()),
- DataTypes.FIELD("type_date", DataTypes.DATE().bridgedTo(java.sql.Date.class).notNull()),
- DataTypes.FIELD("type_time_millis", DataTypes.TIME().bridgedTo(java.sql.Time.class).notNull()),
+ DataTypes.FIELD("type_bytes", DataTypes.BYTES().notNull()),
+ DataTypes.FIELD("type_date", DataTypes.DATE().notNull()),
+ DataTypes.FIELD("type_time_millis", DataTypes.TIME().notNull()),
DataTypes.FIELD("type_time_micros", DataTypes.INT().notNull()),
DataTypes.FIELD("type_timestamp_millis",
- DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).notNull()),
+ DataTypes.TIMESTAMP(3).notNull()),
DataTypes.FIELD("type_timestamp_micros",
- DataTypes.TIMESTAMP(6).bridgedTo(java.sql.Timestamp.class).notNull()),
+ DataTypes.TIMESTAMP(6).notNull()),
DataTypes.FIELD("type_decimal_bytes", DataTypes.DECIMAL(4, 2).notNull()),
DataTypes.FIELD("type_decimal_fixed", DataTypes.DECIMAL(4, 2).notNull()))
.notNull();