You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/29 07:42:10 UTC

[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #1237: Update Flink Parquet reader and writer to use schema visitor

chenjunjiedada commented on a change in pull request #1237:
URL: https://github.com/apache/iceberg/pull/1237#discussion_r461654978



##########
File path: flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,714 @@
 
 package org.apache.iceberg.flink.data;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
 import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+public class FlinkParquetReaders {
+  private FlinkParquetReaders() {
+  }
 
-  private static final FlinkParquetReaders INSTANCE = new FlinkParquetReaders();
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema, MessageType fileSchema) {
+    return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+  }
 
-  private FlinkParquetReaders() {
+  @SuppressWarnings("unchecked")
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+                                                        MessageType fileSchema,
+                                                        Map<Integer, ?> idToConstant) {
+    if (ParquetSchemaUtil.hasIds(fileSchema)) {
+      return (ParquetValueReader<RowData>)
+          TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+              new ReadBuilder(fileSchema, idToConstant));
+    } else {
+      return (ParquetValueReader<RowData>)
+          TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+              new FallbackReadBuilder(fileSchema, idToConstant));
+    }
+  }
+
+  private static class FallbackReadBuilder extends ReadBuilder {

Review comment:
       Will do.

##########
File path: flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java
##########
@@ -41,34 +47,193 @@
   @Rule
   public TemporaryFolder temp = new TemporaryFolder();
 
-  private void testCorrectness(Schema schema, int numRecords, Iterable<Row> iterable) throws IOException {
+  private void testCorrectness(Schema schema, int numRecords, Iterable<RowData> iterable) throws IOException {
     File testFile = temp.newFile();
     Assert.assertTrue("Delete should succeed", testFile.delete());
 
-    try (FileAppender<Row> writer = Parquet.write(Files.localOutput(testFile))
+    try (FileAppender<RowData> writer = Parquet.write(Files.localOutput(testFile))
         .schema(schema)
-        .createWriterFunc(FlinkParquetWriters::buildWriter)
+        .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(FlinkSchemaUtil.convert(schema), msgType))
         .build()) {
       writer.addAll(iterable);
     }
 
-    try (CloseableIterable<Row> reader = Parquet.read(Files.localInput(testFile))
+    try (CloseableIterable<RowData> reader = Parquet.read(Files.localInput(testFile))
         .project(schema)
         .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type))
         .build()) {
-      Iterator<Row> expected = iterable.iterator();
-      Iterator<Row> rows = reader.iterator();
+      Iterator<RowData> expected = iterable.iterator();
+      Iterator<RowData> rows = reader.iterator();
       for (int i = 0; i < numRecords; i += 1) {
         Assert.assertTrue("Should have expected number of rows", rows.hasNext());
-        Assert.assertEquals(expected.next(), rows.next());
+        assertRowData(schema.asStruct(), expected.next(), rows.next());
       }
       Assert.assertFalse("Should not have extra rows", rows.hasNext());
     }
   }
 
+  private void assertRowData(Type type, RowData expected, RowData actual) {

Review comment:
       Will do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org