You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/12/07 07:36:25 UTC

[GitHub] [iceberg] openinx commented on a change in pull request #1882: Flink: fix projection NPE caused by timestamp type

openinx commented on a change in pull request #1882:
URL: https://github.com/apache/iceberg/pull/1882#discussion_r537287338



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
##########
@@ -100,6 +100,33 @@ public void testNestedProjection() throws Exception {
     assertRows(result, expected);
   }
 
+  @Test
+  public void testBasicProjection() throws IOException {
+    Schema writeSchema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(1, "data", Types.StringType.get()),
+        Types.NestedField.optional(2, "time", Types.TimestampType.withZone())
+    );
+
+    Table table = catalog.createTable(TableIdentifier.of("default", "t"), writeSchema);
+
+    List<Record> writeRecords = RandomGenericData.generate(writeSchema, 2, 0L);
+    new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(writeRecords);
+
+    TableSchema projectedSchema = TableSchema.builder()
+        .field("id", DataTypes.BIGINT())
+        .field("data", DataTypes.STRING())
+        .build();
+    List<Row> result = runFormat(FlinkSource.forRowData().tableLoader(loader()).project(projectedSchema).buildFormat());

Review comment:
       I run this unit test under my host without the fixed null checks, and got this exception: 
   
   ```java
   Test testBasicProjection[format=parquet](org.apache.iceberg.flink.source.TestFlinkInputFormat) failed with:
   java.lang.NullPointerException
   	at org.apache.iceberg.flink.data.FlinkParquetReaders$ReadBuilder.primitive(FlinkParquetReaders.java:197)
   	at org.apache.iceberg.flink.data.FlinkParquetReaders$ReadBuilder.primitive(FlinkParquetReaders.java:73)
   	at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visit(TypeWithSchemaVisitor.java:52)
   	at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visitField(TypeWithSchemaVisitor.java:155)
   	at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visitFields(TypeWithSchemaVisitor.java:169)
   	at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visit(TypeWithSchemaVisitor.java:47)
   	at org.apache.iceberg.flink.data.FlinkParquetReaders.buildReader(FlinkParquetReaders.java:68)
   	at org.apache.iceberg.flink.source.RowDataIterator.lambda$newParquetIterable$1(RowDataIterator.java:126)
   	at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:118)
   	at org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:66)
   	at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:77)
   	at org.apache.iceberg.flink.source.RowDataIterator.openTaskIterator(RowDataIterator.java:76)
   	at org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator(DataIterator.java:102)
   	at org.apache.iceberg.flink.source.DataIterator.hasNext(DataIterator.java:84)
   	at org.apache.iceberg.flink.source.FlinkInputFormat.reachedEnd(FlinkInputFormat.java:100)
   	at org.apache.iceberg.flink.TestHelpers.readRowData(TestHelpers.java:91)
   	at org.apache.iceberg.flink.TestHelpers.readRows(TestHelpers.java:105)
   	at org.apache.iceberg.flink.source.TestFlinkInputFormat.runFormat(TestFlinkInputFormat.java:132)
   	at org.apache.iceberg.flink.source.TestFlinkInputFormat.testBasicProjection(TestFlinkInputFormat.java:120)
   	at java.lang.Thread.run(Thread.java:748)
   ```
   
   Checked the code,  it currently will build a parquet value reader like the following if only project the `id` and `data` field: 
   
   ```
   RowDataReader
            |--> UnboxedReader
            |--> StringReader
            |--> MillisToTimestampReader
   ```
   
   Although the `MillisToTimestampReader` won't read any real data when projecting `id` and `data` only.  Now we will change to fill it with a `null` reader,  I'm not quite sure whether there's other reason that we fill it with a default non-null reader in the previous design.   Still take a look for more details. 
   
   Thanks @chenjunjiedada for providing this corner case, I'm pretty sure we need to fix this( current way or just check the `expected` refs like the following)
   
   ```
   diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
   index 3c9bd5cd..4cb94d2b 100644
   --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
   +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
   @@ -194,13 +194,13 @@ public class FlinkParquetReaders {
              case INT_64:
                return new ParquetValueReaders.UnboxedReader<>(desc);
              case TIMESTAMP_MICROS:
   -            if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
   +            if (expected != null && ((Types.TimestampType) expected).shouldAdjustToUTC()) {
                  return new MicrosToTimestampTzReader(desc);
                } else {
                  return new MicrosToTimestampReader(desc);
                }
              case TIMESTAMP_MILLIS:
   -            if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
   +            if (expected != null && ((Types.TimestampType) expected).shouldAdjustToUTC()) {
                  return new MillisToTimestampTzReader(desc);
                } else {
                  return new MillisToTimestampReader(desc);
   ```
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org