You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/07 07:15:29 UTC

[GitHub] [flink] fsk119 commented on a change in pull request #14539: [FLINK-20321][formats] Fix NPE when using AvroDeserializationSchema to deserialize null input

fsk119 commented on a change in pull request #14539:
URL: https://github.com/apache/flink/pull/14539#discussion_r553116137



##########
File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
##########
@@ -94,6 +94,9 @@ public JsonRowDataDeserializationSchema(
 
     @Override
     public RowData deserialize(byte[] message) throws IOException {

Review comment:
       ditto

##########
File path: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
##########
@@ -68,10 +68,24 @@
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
 import static org.apache.flink.table.api.DataTypes.TINYINT;
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNull;
 
 /** Test for the Avro serialization and deserialization schema. */
 public class AvroRowDataDeSerializationSchemaTest {
 
+    @Test
+    public void testDeserializeNullRow() throws Exception {
+        final DataType dataType = ROW(FIELD("bool", BOOLEAN())).nullable();

Review comment:
       The code is duplicate here. Could you extract a common method that create a new schema for all test to reuse the same code, e.g 
   
   ```
   private AvroRowDataDeserializationSchema createSchema(DataType dataType) throws Exception {
           final RowType rowType = (RowType) dataType.getLogicalType();
           final TypeInformation<RowData> typeInfo = InternalTypeInfo.of(rowType);
   
           AvroRowDataDeserializationSchema deserializationSchema =
                   new AvroRowDataDeserializationSchema(rowType, typeInfo);
           deserializationSchema.open(null);
           return deserializationSchema;
       }
   ```

##########
File path: flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
##########
@@ -206,6 +206,12 @@ public void testDeserializeUnsupportedNull() throws Exception {
                 Row.of("Test", null, "Test"), testDeserialization(true, false, "Test,null,Test"));
     }
 
+    @Test
+    public void testDeserializeNullRow() throws Exception {
+        // return null for null input
+        assertNull(testDeserialization(true, false, null));

Review comment:
        Maybe it's better to set the parameter `allowParsingErrors` false.

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
##########
@@ -65,6 +65,9 @@ public RegistryAvroDeserializationSchema(
 
     @Override
     public T deserialize(byte[] message) throws IOException {

Review comment:
       ditto

##########
File path: flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
##########
@@ -141,6 +141,9 @@ public CsvRowDataDeserializationSchema build() {
 
     @Override
     public RowData deserialize(byte[] message) throws IOException {

Review comment:
       ditto

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
##########
@@ -128,6 +128,9 @@ Decoder getDecoder() {
 
     @Override
     public T deserialize(byte[] message) throws IOException {

Review comment:
       Please add annotation @NULLABLE before parameter message.
   
   You can also take a look at the constructor of the KafkaDynamicSource, which allows its input is null.

##########
File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
##########
@@ -94,6 +94,9 @@ public void open(InitializationContext context) throws Exception {
 
     @Override
     public RowData deserialize(byte[] message) throws IOException {

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org