You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/14 19:48:27 UTC

[flink] 03/07: [hotfix] Fix schema to DataType/Type conversion

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b025db621a56c6f61482c0fbb2fdb91381422904
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Sep 11 16:30:16 2020 +0200

    [hotfix] Fix schema to DataType/Type conversion
---
 .../avro/typeutils/AvroSchemaConverter.java        | 22 ++++++++++------------
 .../avro/typeutils/AvroSchemaConverterTest.java    | 10 +++++-----
 2 files changed, 15 insertions(+), 17 deletions(-)

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
index 44c5d84..332d18e 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -157,6 +157,8 @@ public class AvroSchemaConverter {
 				// logical timestamp type
 				if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
 					return Types.SQL_TIMESTAMP;
+				} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
+					return Types.SQL_TIME;
 				}
 				return Types.LONG;
 			case FLOAT:
@@ -255,28 +257,24 @@ public class AvroSchemaConverter {
 						decimalType.getScale())
 						.notNull();
 			}
-			return DataTypes.ARRAY(DataTypes.TINYINT().bridgedTo(Byte.class))
-					.notNull();
+			return DataTypes.BYTES().notNull();
 		case INT:
 			// logical date and time type
 			final org.apache.avro.LogicalType logicalType = schema.getLogicalType();
 			if (logicalType == LogicalTypes.date()) {
-				return DataTypes.DATE().bridgedTo(java.sql.Date.class).notNull();
+				return DataTypes.DATE().notNull();
 			} else if (logicalType == LogicalTypes.timeMillis()) {
-				return DataTypes.TIME().bridgedTo(java.sql.Time.class).notNull();
+				return DataTypes.TIME().notNull();
 			}
 			return DataTypes.INT().notNull();
 		case LONG:
 			// logical timestamp type
 			if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
-				return DataTypes.TIMESTAMP(3)
-						.bridgedTo(java.sql.Timestamp.class)
-						.notNull();
-			}
-			if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
-				return DataTypes.TIMESTAMP(6)
-						.bridgedTo(java.sql.Timestamp.class)
-						.notNull();
+				return DataTypes.TIMESTAMP(3).notNull();
+			} else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
+				return DataTypes.TIMESTAMP(6).notNull();
+			} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
+				return DataTypes.TIME(6).notNull();
 			}
 			return DataTypes.BIGINT().notNull();
 		case FLOAT:
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
index 95b5696..98efd6e 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
@@ -190,14 +190,14 @@ public class AvroSchemaConverterTest {
 				DataTypes.FIELD("type_fixed", DataTypes.VARBINARY(16)),
 				DataTypes.FIELD("type_union", DataTypes.RAW(Types.GENERIC(Object.class)).notNull()),
 				DataTypes.FIELD("type_nested", address),
-				DataTypes.FIELD("type_bytes", DataTypes.ARRAY(DataTypes.TINYINT().bridgedTo(Byte.class)).notNull()),
-				DataTypes.FIELD("type_date", DataTypes.DATE().bridgedTo(java.sql.Date.class).notNull()),
-				DataTypes.FIELD("type_time_millis", DataTypes.TIME().bridgedTo(java.sql.Time.class).notNull()),
+				DataTypes.FIELD("type_bytes", DataTypes.BYTES().notNull()),
+				DataTypes.FIELD("type_date", DataTypes.DATE().notNull()),
+				DataTypes.FIELD("type_time_millis", DataTypes.TIME().notNull()),
 				DataTypes.FIELD("type_time_micros", DataTypes.INT().notNull()),
 				DataTypes.FIELD("type_timestamp_millis",
-						DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).notNull()),
+						DataTypes.TIMESTAMP(3).notNull()),
 				DataTypes.FIELD("type_timestamp_micros",
-						DataTypes.TIMESTAMP(6).bridgedTo(java.sql.Timestamp.class).notNull()),
+						DataTypes.TIMESTAMP(6).notNull()),
 				DataTypes.FIELD("type_decimal_bytes", DataTypes.DECIMAL(4, 2).notNull()),
 				DataTypes.FIELD("type_decimal_fixed", DataTypes.DECIMAL(4, 2).notNull()))
 				.notNull();