You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/10 02:52:13 UTC

[flink] 01/03: [hotfix][avro] Fix TINYINT/SMALLINT types not work in Avro format

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0980a28efee56ea900291336eb2d230d229baa61
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon Jun 8 17:36:04 2020 +0800

    [hotfix][avro] Fix TINYINT/SMALLINT types not work in Avro format
---
 .../avro/AvroRowDataDeserializationSchema.java     |  6 ++--
 .../avro/AvroRowDataSerializationSchema.java       |  6 ++++
 .../avro/typeutils/AvroSchemaConverter.java        |  2 ++
 .../avro/AvroRowDataDeSerializationSchemaTest.java | 38 +++++++++++++---------
 4 files changed, 34 insertions(+), 18 deletions(-)

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
index 995a83c..31ba11d 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
@@ -229,9 +229,11 @@ public class AvroRowDataDeserializationSchema implements DeserializationSchema<R
 		switch (type.getTypeRoot()) {
 			case NULL:
 				return avroObject -> null;
+			case TINYINT:
+				return avroObject -> ((Integer) avroObject).byteValue();
+			case SMALLINT:
+				return avroObject -> ((Integer) avroObject).shortValue();
 			case BOOLEAN: // boolean
-			case TINYINT: // short
-			case SMALLINT: // int
 			case INTEGER: // int
 			case INTERVAL_YEAR_MONTH: // long
 			case BIGINT: // long
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
index 5b1fbbe..bc6eca7 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
@@ -187,6 +187,12 @@ public class AvroRowDataSerializationSchema implements SerializationSchema<RowDa
 			case NULL:
 				converter = (schema, object) -> null;
 				break;
+			case TINYINT:
+				converter = (schema, object) -> ((Byte) object).intValue();
+				break;
+			case SMALLINT:
+				converter = (schema, object) -> ((Short) object).intValue();
+				break;
 			case BOOLEAN: // boolean
 			case INTEGER: // int
 			case INTERVAL_YEAR_MONTH: // long
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
index 37745e5..4524726 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -186,6 +186,8 @@ public class AvroSchemaConverter {
 				return SchemaBuilder.builder().nullType();
 			case BOOLEAN:
 				return getNullableBuilder(logicalType).booleanType();
+			case TINYINT:
+			case SMALLINT:
 			case INTEGER:
 				return getNullableBuilder(logicalType).intType();
 			case BIGINT:
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
index 38c6c2f..9110e2f 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
@@ -62,9 +62,11 @@ import static org.apache.flink.table.api.DataTypes.FLOAT;
 import static org.apache.flink.table.api.DataTypes.INT;
 import static org.apache.flink.table.api.DataTypes.MAP;
 import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.apache.flink.table.api.DataTypes.TIME;
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
 import static org.junit.Assert.assertArrayEquals;
 
 /**
@@ -76,6 +78,8 @@ public class AvroRowDataDeSerializationSchemaTest {
 	public void testSerializeDeserialize() throws Exception {
 		final DataType dataType = ROW(
 			FIELD("bool", BOOLEAN()),
+			FIELD("tinyint", TINYINT()),
+			FIELD("smallint", SMALLINT()),
 			FIELD("int", INT()),
 			FIELD("bigint", BIGINT()),
 			FIELD("float", FLOAT()),
@@ -97,45 +101,47 @@ public class AvroRowDataDeSerializationSchemaTest {
 		final Schema schema = AvroSchemaConverter.convertToSchema(rowType);
 		final GenericRecord record = new GenericData.Record(schema);
 		record.put(0, true);
-		record.put(1, 33);
-		record.put(2, 44L);
-		record.put(3, 12.34F);
-		record.put(4, 23.45);
-		record.put(5, "hello avro");
-		record.put(6, ByteBuffer.wrap(new byte[]{1, 2, 4, 5, 6, 7, 8, 12}));
-
-		record.put(7, ByteBuffer.wrap(
+		record.put(1, (int) Byte.MAX_VALUE);
+		record.put(2, (int) Short.MAX_VALUE);
+		record.put(3, 33);
+		record.put(4, 44L);
+		record.put(5, 12.34F);
+		record.put(6, 23.45);
+		record.put(7, "hello avro");
+		record.put(8, ByteBuffer.wrap(new byte[]{1, 2, 4, 5, 6, 7, 8, 12}));
+
+		record.put(9, ByteBuffer.wrap(
 				BigDecimal.valueOf(123456789, 6).unscaledValue().toByteArray()));
 
 		List<Double> doubles = new ArrayList<>();
 		doubles.add(1.2);
 		doubles.add(3.4);
 		doubles.add(567.8901);
-		record.put(8, doubles);
+		record.put(10, doubles);
 
-		record.put(9, 18397);
-		record.put(10, 10087);
-		record.put(11, 1589530213123L);
-		record.put(12, 1589530213122L);
+		record.put(11, 18397);
+		record.put(12, 10087);
+		record.put(13, 1589530213123L);
+		record.put(14, 1589530213122L);
 
 		Map<String, Long> map = new HashMap<>();
 		map.put("flink", 12L);
 		map.put("avro", 23L);
-		record.put(13, map);
+		record.put(15, map);
 
 		Map<String, Map<String, Integer>> map2map = new HashMap<>();
 		Map<String, Integer> innerMap = new HashMap<>();
 		innerMap.put("inner_key1", 123);
 		innerMap.put("inner_key2", 234);
 		map2map.put("outer_key", innerMap);
-		record.put(14, map2map);
+		record.put(16, map2map);
 
 		List<Integer> list1 = Arrays.asList(1, 2, 3, 4, 5, 6);
 		List<Integer> list2 = Arrays.asList(11, 22, 33, 44, 55);
 		Map<String, List<Integer>> map2list = new HashMap<>();
 		map2list.put("list1", list1);
 		map2list.put("list2", list2);
-		record.put(15, map2list);
+		record.put(17, map2list);
 
 		AvroRowDataSerializationSchema serializationSchema = new AvroRowDataSerializationSchema(rowType);
 		serializationSchema.open(null);