You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/10 02:52:13 UTC
[flink] 01/03: [hotfix][avro] Fix TINYINT/SMALLINT types not work
in Avro format
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0980a28efee56ea900291336eb2d230d229baa61
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon Jun 8 17:36:04 2020 +0800
[hotfix][avro] Fix TINYINT/SMALLINT types not work in Avro format
---
.../avro/AvroRowDataDeserializationSchema.java | 6 ++--
.../avro/AvroRowDataSerializationSchema.java | 6 ++++
.../avro/typeutils/AvroSchemaConverter.java | 2 ++
.../avro/AvroRowDataDeSerializationSchemaTest.java | 38 +++++++++++++---------
4 files changed, 34 insertions(+), 18 deletions(-)
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
index 995a83c..31ba11d 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
@@ -229,9 +229,11 @@ public class AvroRowDataDeserializationSchema implements DeserializationSchema<R
switch (type.getTypeRoot()) {
case NULL:
return avroObject -> null;
+ case TINYINT:
+ return avroObject -> ((Integer) avroObject).byteValue();
+ case SMALLINT:
+ return avroObject -> ((Integer) avroObject).shortValue();
case BOOLEAN: // boolean
- case TINYINT: // short
- case SMALLINT: // int
case INTEGER: // int
case INTERVAL_YEAR_MONTH: // long
case BIGINT: // long
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
index 5b1fbbe..bc6eca7 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
@@ -187,6 +187,12 @@ public class AvroRowDataSerializationSchema implements SerializationSchema<RowDa
case NULL:
converter = (schema, object) -> null;
break;
+ case TINYINT:
+ converter = (schema, object) -> ((Byte) object).intValue();
+ break;
+ case SMALLINT:
+ converter = (schema, object) -> ((Short) object).intValue();
+ break;
case BOOLEAN: // boolean
case INTEGER: // int
case INTERVAL_YEAR_MONTH: // long
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
index 37745e5..4524726 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -186,6 +186,8 @@ public class AvroSchemaConverter {
return SchemaBuilder.builder().nullType();
case BOOLEAN:
return getNullableBuilder(logicalType).booleanType();
+ case TINYINT:
+ case SMALLINT:
case INTEGER:
return getNullableBuilder(logicalType).intType();
case BIGINT:
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
index 38c6c2f..9110e2f 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
@@ -62,9 +62,11 @@ import static org.apache.flink.table.api.DataTypes.FLOAT;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.MAP;
import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.api.DataTypes.TIME;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
import static org.junit.Assert.assertArrayEquals;
/**
@@ -76,6 +78,8 @@ public class AvroRowDataDeSerializationSchemaTest {
public void testSerializeDeserialize() throws Exception {
final DataType dataType = ROW(
FIELD("bool", BOOLEAN()),
+ FIELD("tinyint", TINYINT()),
+ FIELD("smallint", SMALLINT()),
FIELD("int", INT()),
FIELD("bigint", BIGINT()),
FIELD("float", FLOAT()),
@@ -97,45 +101,47 @@ public class AvroRowDataDeSerializationSchemaTest {
final Schema schema = AvroSchemaConverter.convertToSchema(rowType);
final GenericRecord record = new GenericData.Record(schema);
record.put(0, true);
- record.put(1, 33);
- record.put(2, 44L);
- record.put(3, 12.34F);
- record.put(4, 23.45);
- record.put(5, "hello avro");
- record.put(6, ByteBuffer.wrap(new byte[]{1, 2, 4, 5, 6, 7, 8, 12}));
-
- record.put(7, ByteBuffer.wrap(
+ record.put(1, (int) Byte.MAX_VALUE);
+ record.put(2, (int) Short.MAX_VALUE);
+ record.put(3, 33);
+ record.put(4, 44L);
+ record.put(5, 12.34F);
+ record.put(6, 23.45);
+ record.put(7, "hello avro");
+ record.put(8, ByteBuffer.wrap(new byte[]{1, 2, 4, 5, 6, 7, 8, 12}));
+
+ record.put(9, ByteBuffer.wrap(
BigDecimal.valueOf(123456789, 6).unscaledValue().toByteArray()));
List<Double> doubles = new ArrayList<>();
doubles.add(1.2);
doubles.add(3.4);
doubles.add(567.8901);
- record.put(8, doubles);
+ record.put(10, doubles);
- record.put(9, 18397);
- record.put(10, 10087);
- record.put(11, 1589530213123L);
- record.put(12, 1589530213122L);
+ record.put(11, 18397);
+ record.put(12, 10087);
+ record.put(13, 1589530213123L);
+ record.put(14, 1589530213122L);
Map<String, Long> map = new HashMap<>();
map.put("flink", 12L);
map.put("avro", 23L);
- record.put(13, map);
+ record.put(15, map);
Map<String, Map<String, Integer>> map2map = new HashMap<>();
Map<String, Integer> innerMap = new HashMap<>();
innerMap.put("inner_key1", 123);
innerMap.put("inner_key2", 234);
map2map.put("outer_key", innerMap);
- record.put(14, map2map);
+ record.put(16, map2map);
List<Integer> list1 = Arrays.asList(1, 2, 3, 4, 5, 6);
List<Integer> list2 = Arrays.asList(11, 22, 33, 44, 55);
Map<String, List<Integer>> map2list = new HashMap<>();
map2list.put("list1", list1);
map2list.put("list2", list2);
- record.put(15, map2list);
+ record.put(17, map2list);
AvroRowDataSerializationSchema serializationSchema = new AvroRowDataSerializationSchema(rowType);
serializationSchema.open(null);