You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/29 07:54:21 UTC

[GitHub] [iceberg] openinx commented on a change in pull request #1232: Flink: Using RowData to avro reader and writer

openinx commented on a change in pull request #1232:
URL: https://github.com/apache/iceberg/pull/1232#discussion_r461997869



##########
File path: flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
##########
@@ -22,52 +22,76 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
-import org.apache.flink.types.Row;
+import java.util.List;
+import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.DataTest;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
-import static org.apache.iceberg.flink.data.RandomData.COMPLEX_SCHEMA;
+public class TestFlinkAvroReaderWriter extends DataTest {
 
-public class TestFlinkAvroReaderWriter {
-  private static final int NUM_RECORDS = 20_000;
+  private static final int NUM_RECORDS = 100;
 
-  @Rule
-  public TemporaryFolder temp = new TemporaryFolder();
+  @Override
+  protected void writeAndValidate(Schema schema) throws IOException {
+    List<RowData> inputs = generateDataFromAvroFile(schema);

Review comment:
       I see you will generate the `List<Record>` firstly, then write to the file appender, and finally read them into `List<RowData>`,   could we just use the `RandomData#generateRowData`  to produce those `RowData` ?   

##########
File path: flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java
##########
@@ -88,6 +93,34 @@ public Row next() {
     };
   }
 
+  private static Iterable<RowData> generateRowData(Schema schema, int numRecords,
+      Supplier<RandomRowGenerator> supplier) {
+    DataStructureConverter<Object, Object> converter =
+        DataStructureConverters.getConverter(TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema)));

Review comment:
       Here  we may need to call `converter.open(RandomData.class.getClassLoader())` to initialize the converter ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org