You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/12/07 03:56:04 UTC

[GitHub] [iceberg] chenjunjiedada opened a new pull request #1882: Flink: fix projection NPE caused by timestamp type

chenjunjiedada opened a new pull request #1882:
URL: https://github.com/apache/iceberg/pull/1882


   This fixes an NPE that when projecting out the timestamp type.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1882: Flink: fix projection NPE caused by timestamp type

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1882:
URL: https://github.com/apache/iceberg/pull/1882#discussion_r537287338



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
##########
@@ -100,6 +100,33 @@ public void testNestedProjection() throws Exception {
     assertRows(result, expected);
   }
 
+  @Test
+  public void testBasicProjection() throws IOException {
+    Schema writeSchema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(1, "data", Types.StringType.get()),
+        Types.NestedField.optional(2, "time", Types.TimestampType.withZone())
+    );
+
+    Table table = catalog.createTable(TableIdentifier.of("default", "t"), writeSchema);
+
+    List<Record> writeRecords = RandomGenericData.generate(writeSchema, 2, 0L);
+    new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(writeRecords);
+
+    TableSchema projectedSchema = TableSchema.builder()
+        .field("id", DataTypes.BIGINT())
+        .field("data", DataTypes.STRING())
+        .build();
+    List<Row> result = runFormat(FlinkSource.forRowData().tableLoader(loader()).project(projectedSchema).buildFormat());

Review comment:
       I run this unit test under my host without the fixed null checks, and got this exception: 
   
   ```java
   Test testBasicProjection[format=parquet](org.apache.iceberg.flink.source.TestFlinkInputFormat) failed with:
   java.lang.NullPointerException
   	at org.apache.iceberg.flink.data.FlinkParquetReaders$ReadBuilder.primitive(FlinkParquetReaders.java:197)
   	at org.apache.iceberg.flink.data.FlinkParquetReaders$ReadBuilder.primitive(FlinkParquetReaders.java:73)
   	at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visit(TypeWithSchemaVisitor.java:52)
   	at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visitField(TypeWithSchemaVisitor.java:155)
   	at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visitFields(TypeWithSchemaVisitor.java:169)
   	at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visit(TypeWithSchemaVisitor.java:47)
   	at org.apache.iceberg.flink.data.FlinkParquetReaders.buildReader(FlinkParquetReaders.java:68)
   	at org.apache.iceberg.flink.source.RowDataIterator.lambda$newParquetIterable$1(RowDataIterator.java:126)
   	at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:118)
   	at org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:66)
   	at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:77)
   	at org.apache.iceberg.flink.source.RowDataIterator.openTaskIterator(RowDataIterator.java:76)
   	at org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator(DataIterator.java:102)
   	at org.apache.iceberg.flink.source.DataIterator.hasNext(DataIterator.java:84)
   	at org.apache.iceberg.flink.source.FlinkInputFormat.reachedEnd(FlinkInputFormat.java:100)
   	at org.apache.iceberg.flink.TestHelpers.readRowData(TestHelpers.java:91)
   	at org.apache.iceberg.flink.TestHelpers.readRows(TestHelpers.java:105)
   	at org.apache.iceberg.flink.source.TestFlinkInputFormat.runFormat(TestFlinkInputFormat.java:132)
   	at org.apache.iceberg.flink.source.TestFlinkInputFormat.testBasicProjection(TestFlinkInputFormat.java:120)
   	at java.lang.Thread.run(Thread.java:748)
   ```
   
   Checked the code,  it currently will build a parquet value reader like the following if only project the `id` and `data` field: 
   
   ```
   RowDataReader
            |--> UnboxedReader
            |--> StringReader
            |--> MillisToTimestampReader
   ```
   
   Although the `MillisToTimestampReader` won't read any real data when projecting `id` and `data` only.  Now we will change to fill it with a `null` reader,  I'm not quite sure whether there's other reason that we fill it with a default non-null reader in the previous design.   Still take a look for more details. 
   
   Thanks @chenjunjiedada for providing this corner case, I'm pretty sure we need to fix this( current way or just check the `expected` refs like the following)
   
   ```
   diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
   index 3c9bd5cd..4cb94d2b 100644
   --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
   +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
   @@ -194,13 +194,13 @@ public class FlinkParquetReaders {
              case INT_64:
                return new ParquetValueReaders.UnboxedReader<>(desc);
              case TIMESTAMP_MICROS:
   -            if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
   +            if (expected != null && ((Types.TimestampType) expected).shouldAdjustToUTC()) {
                  return new MicrosToTimestampTzReader(desc);
                } else {
                  return new MicrosToTimestampReader(desc);
                }
              case TIMESTAMP_MILLIS:
   -            if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
   +            if (expected != null && ((Types.TimestampType) expected).shouldAdjustToUTC()) {
                  return new MillisToTimestampTzReader(desc);
                } else {
                  return new MillisToTimestampReader(desc);
   ```
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1882: Flink: fix projection NPE caused by timestamp type

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1882:
URL: https://github.com/apache/iceberg/pull/1882#discussion_r537685778



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
##########
@@ -100,6 +100,33 @@ public void testNestedProjection() throws Exception {
     assertRows(result, expected);
   }
 
+  @Test
+  public void testBasicProjection() throws IOException {
+    Schema writeSchema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(1, "data", Types.StringType.get()),
+        Types.NestedField.optional(2, "time", Types.TimestampType.withZone())
+    );
+
+    Table table = catalog.createTable(TableIdentifier.of("default", "t"), writeSchema);
+
+    List<Record> writeRecords = RandomGenericData.generate(writeSchema, 2, 0L);
+    new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(writeRecords);
+
+    TableSchema projectedSchema = TableSchema.builder()
+        .field("id", DataTypes.BIGINT())
+        .field("data", DataTypes.STRING())
+        .build();
+    List<Row> result = runFormat(FlinkSource.forRowData().tableLoader(loader()).project(projectedSchema).buildFormat());

Review comment:
       I like the fix in this PR. If an expected type is not present, then it is fine to return null and to skip adding it to the field map in the struct method. I'd rather do that than return a different reader. At least that way, we don't create extra readers that won't be used anyway.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1882: Flink: fix projection NPE caused by timestamp type

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1882:
URL: https://github.com/apache/iceberg/pull/1882#discussion_r537490840



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
##########
@@ -100,6 +100,33 @@ public void testNestedProjection() throws Exception {
     assertRows(result, expected);
   }
 
+  @Test
+  public void testBasicProjection() throws IOException {
+    Schema writeSchema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(1, "data", Types.StringType.get()),
+        Types.NestedField.optional(2, "time", Types.TimestampType.withZone())
+    );
+
+    Table table = catalog.createTable(TableIdentifier.of("default", "t"), writeSchema);
+
+    List<Record> writeRecords = RandomGenericData.generate(writeSchema, 2, 0L);
+    new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(writeRecords);
+
+    TableSchema projectedSchema = TableSchema.builder()
+        .field("id", DataTypes.BIGINT())
+        .field("data", DataTypes.STRING())
+        .build();
+    List<Row> result = runFormat(FlinkSource.forRowData().tableLoader(loader()).project(projectedSchema).buildFormat());

Review comment:
       @rdblue  What do you think about this ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx merged pull request #1882: Flink: fix projection NPE caused by timestamp type

Posted by GitBox <gi...@apache.org>.
openinx merged pull request #1882:
URL: https://github.com/apache/iceberg/pull/1882


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1882: Flink: fix projection NPE caused by timestamp type

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1882:
URL: https://github.com/apache/iceberg/pull/1882#discussion_r537980316



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
##########
@@ -100,6 +100,33 @@ public void testNestedProjection() throws Exception {
     assertRows(result, expected);
   }
 
+  @Test
+  public void testBasicProjection() throws IOException {
+    Schema writeSchema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(1, "data", Types.StringType.get()),
+        Types.NestedField.optional(2, "time", Types.TimestampType.withZone())
+    );
+
+    Table table = catalog.createTable(TableIdentifier.of("default", "t"), writeSchema);
+
+    List<Record> writeRecords = RandomGenericData.generate(writeSchema, 2, 0L);
+    new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(writeRecords);
+
+    TableSchema projectedSchema = TableSchema.builder()
+        .field("id", DataTypes.BIGINT())
+        .field("data", DataTypes.STRING())
+        .build();
+    List<Row> result = runFormat(FlinkSource.forRowData().tableLoader(loader()).project(projectedSchema).buildFormat());

Review comment:
       I'm okay about merging this patch.  One thing I think we may need to do is:  it's better to extend the [TestRowProjection](https://github.com/apache/iceberg/blob/master/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java#L55)  to all file format ( parquet/orc etc),  currently we only test the avro read projections.  
   
   Maybe it similar to the spark unit tests `TestSparkReadProjection`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org