You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2023/01/13 22:57:00 UTC

[GitHub] [iceberg] stevenzwu opened a new pull request, #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

stevenzwu opened a new pull request, #6584:
URL: https://github.com/apache/iceberg/pull/6584

   cc @hililiwei 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1080815482


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Read Iceberg rows as {@link GenericRecord}. */
+public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<GenericRecord> {
+  private final String tableName;
+  private final Schema readSchema;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final RowDataFileScanTaskReader rowDataReader;
+
+  private transient RowDataToAvroGenericRecordConverter converter;
+
+  /**
+   * Create a reader function without projection and name mapping. Column name is case-insensitive.
+   */
+  public static AvroGenericRecordReaderFunction fromTable(Table table) {
+    return new AvroGenericRecordReaderFunction(
+        table.name(),
+        new Configuration(),
+        table.schema(),
+        null,
+        null,
+        false,

Review Comment:
   > Can we provide a similar entry where the user simply tells the Builder that he wants to use AvroGenericRecordReaderFunction , and then lets the Builder create it?
   
   I see your point. I agree that it is cumbersome for users to construct `AvroGenericRecordReaderFunction` or `RowDataReaderFunction`. Since we can extract type information from generic `T`, one idea would to add a setter for type class to the builder. If the type class is `RowData`, builder constructs `RowDataReaderFunction`. If it is Avro `GenericRecord, builder constructs `AvroGenericRecordReaderFunction`.
   
   ```
   public outputType(Class<T> outputRecordTypeClass)
   ```
   
   > we should also allow users to query metadata tables via the flink java api, right?
   
   I am not sure if that is necessary. Doesn't Iceberg Java API cover that already?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
hililiwei commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1072149460


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Read Iceberg rows as {@link GenericRecord}. */
+public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<GenericRecord> {
+  private final String tableName;
+  private final Schema readSchema;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final RowDataFileScanTaskReader rowDataReader;
+
+  private transient RowDataToAvroGenericRecordConverter converter;
+
+  /**
+   * Create a reader function without projection and name mapping. Column name is case-insensitive.
+   */
+  public static AvroGenericRecordReaderFunction fromTable(Table table) {
+    return new AvroGenericRecordReaderFunction(
+        table.name(),
+        new Configuration(),
+        table.schema(),
+        null,
+        null,
+        false,

Review Comment:
   Could we get `case-sensitive` from `FlinkReadOptions`? Or provide `fromTable(Table table, boolean caseSensitive)? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1071794671


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/ListDataIteratorBatcher.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * FlinkRecordReaderFunction essentially cloned objects already. So there is no need to use array
+ * pool to clone objects. Simply create a new ArrayList for each batch.
+ */
+class ListDataIteratorBatcher<T> implements DataIteratorBatcher<T> {

Review Comment:
   Thx!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1073036477


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Read Iceberg rows as {@link GenericRecord}. */
+public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<GenericRecord> {
+  private final String tableName;
+  private final Schema readSchema;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final RowDataFileScanTaskReader rowDataReader;
+
+  private transient RowDataToAvroGenericRecordConverter converter;
+
+  /**
+   * Create a reader function without projection and name mapping. Column name is case-insensitive.
+   */
+  public static AvroGenericRecordReaderFunction fromTable(Table table) {
+    return new AvroGenericRecordReaderFunction(
+        table.name(),
+        new Configuration(),
+        table.schema(),
+        null,
+        null,
+        false,

Review Comment:
   > caseSensitive is used a little more frequently.
   
   I am not sure about that. I would think projection might be common too. name mapping might also be useful if the Avro schema is not identical (e.g. same fields and order), although we don't support name mapping right now for Avro records. Hence I would be reluctant to add a factory method like `fromTable(Table table, boolean caseSensitive)`.
   
   >  Could we provide an enum for the user to choose from and then help build it automatically in the IcebergSourceBuilder instead of letting the user create it themselves?
   
   I agree with that point. By default, users shouldn't need to construct the `RowDataReaderFunction` as `IcebergSource` builds it internally if not set. We provide the `ReaderFunction` to `IcebergSource#Builder` to support  more customized reader like this. As for `BaseMetadataTable`, users probably only indirectly use via Flink SQL for inspecting metadata tables, right? I doubt users would ever need to construct it programmatically.
   
   ```
         if (readerFunction == null) {
           if (table instanceof BaseMetadataTable) {
             MetaDataReaderFunction rowDataReaderFunction =
                 new MetaDataReaderFunction(
                     flinkConfig, table.schema(), context.project(), table.io(), table.encryption());
             this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction;
           } else {
             RowDataReaderFunction rowDataReaderFunction =
                 new RowDataReaderFunction(
                     flinkConfig,
                     table.schema(),
                     context.project(),
                     context.nameMapping(),
                     context.caseSensitive(),
                     table.io(),
                     table.encryption());
             this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction;
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1073036477


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Read Iceberg rows as {@link GenericRecord}. */
+public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<GenericRecord> {
+  private final String tableName;
+  private final Schema readSchema;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final RowDataFileScanTaskReader rowDataReader;
+
+  private transient RowDataToAvroGenericRecordConverter converter;
+
+  /**
+   * Create a reader function without projection and name mapping. Column name is case-insensitive.
+   */
+  public static AvroGenericRecordReaderFunction fromTable(Table table) {
+    return new AvroGenericRecordReaderFunction(
+        table.name(),
+        new Configuration(),
+        table.schema(),
+        null,
+        null,
+        false,

Review Comment:
   > caseSensitive is used a little more frequently.
   
   I am not sure about that. I would think projection might be common too. name mapping might also be useful if the Avro schema is not identical (e.g. same fields and order), although we don't support name mapping right now for Avro records.
   
   >  Could we provide an enum for the user to choose from and then help build it automatically in the IcebergSourceBuilder instead of letting the user create it themselves?
   
   I agree with that point. By default, users shouldn't need to construct the `RowDataReaderFunction` as `IcebergSource` builds it internally if not set. We provide the `ReaderFunction` to `IcebergSource#Builder` to support  more customized reader like this. As for `BaseMetadataTable`, users probably only indirectly use via Flink SQL for inspecting metadata tables, right? I doubt users would ever need to construct it programmatically.
   
   ```
         if (readerFunction == null) {
           if (table instanceof BaseMetadataTable) {
             MetaDataReaderFunction rowDataReaderFunction =
                 new MetaDataReaderFunction(
                     flinkConfig, table.schema(), context.project(), table.io(), table.encryption());
             this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction;
           } else {
             RowDataReaderFunction rowDataReaderFunction =
                 new RowDataReaderFunction(
                     flinkConfig,
                     table.schema(),
                     context.project(),
                     context.nameMapping(),
                     context.caseSensitive(),
                     table.io(),
                     table.encryption());
             this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction;
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1072512846


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Read Iceberg rows as {@link GenericRecord}. */
+public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<GenericRecord> {
+  private final String tableName;
+  private final Schema readSchema;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final RowDataFileScanTaskReader rowDataReader;
+
+  private transient RowDataToAvroGenericRecordConverter converter;
+
+  /**
+   * Create a reader function without projection and name mapping. Column name is case-insensitive.
+   */
+  public static AvroGenericRecordReaderFunction fromTable(Table table) {
+    return new AvroGenericRecordReaderFunction(
+        table.name(),
+        new Configuration(),
+        table.schema(),
+        null,
+        null,
+        false,

Review Comment:
   ``fromTable(Table table, boolean caseSensitive)` is not working to work, because we also have totally three read configs.
   ```
         Schema projectedSchema,
         String nameMapping,
         boolean caseSensitive,
   ```
   
   Regarding using `FlinkReadOptions`, are you suggesting another factory method like this? We need all three to construct the `FlinkReadOptions`.
   ```
   AvroGenericRecordReaderFunction fromTableAndReadConfig(Table table, Map<String, String> readOptions, ReadableConfig readableConfig)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1089903225


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Read Iceberg rows as {@link GenericRecord}. */
+public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<GenericRecord> {
+  private final String tableName;
+  private final Schema readSchema;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final RowDataFileScanTaskReader rowDataReader;
+
+  private transient RowDataToAvroGenericRecordConverter converter;
+
+  /**
+   * Create a reader function without projection and name mapping. Column name is case-insensitive.
+   */
+  public static AvroGenericRecordReaderFunction fromTable(Table table) {
+    return new AvroGenericRecordReaderFunction(
+        table.name(),
+        new Configuration(),
+        table.schema(),
+        null,
+        null,
+        false,

Review Comment:
   > For the metadata table, the most important use-case is when we read the metadata on the job initialisation and change the job based on the metadata read. Do we have use-case when we want to stream the results of the metadata query? Like when new files added to the table, or new snapshot created for the table?
   
   We have security, monitoring, and so on scenarios where these capabilities need to be used. Currently we implement this using `iceberg listeners`.
   
   



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Read Iceberg rows as {@link GenericRecord}. */
+public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<GenericRecord> {
+  private final String tableName;
+  private final Schema readSchema;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final RowDataFileScanTaskReader rowDataReader;
+
+  private transient RowDataToAvroGenericRecordConverter converter;
+
+  /**
+   * Create a reader function without projection and name mapping. Column name is case-insensitive.
+   */
+  public static AvroGenericRecordReaderFunction fromTable(Table table) {
+    return new AvroGenericRecordReaderFunction(
+        table.name(),
+        new Configuration(),
+        table.schema(),
+        null,
+        null,
+        false,

Review Comment:
   > For the metadata table, the most important use-case is when we read the metadata on the job initialisation and change the job based on the metadata read. Do we have use-case when we want to stream the results of the metadata query? Like when new files added to the table, or new snapshot created for the table?
   
   We have security, monitoring, and so on scenarios where these capabilities need to be used. Currently we implement this by `iceberg listeners`.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1072503036


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.data.RowDataToRowMapper;
+import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
+import org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestIcebergSourceBoundedGenericRecord {
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Rule
+  public final HadoopCatalogResource catalogResource =
+      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+  @Parameterized.Parameters(name = "format={0}, parallelism = {1}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {"avro", 2},
+      {"parquet", 2},
+      {"orc", 2}
+    };
+  }
+
+  private final FileFormat fileFormat;
+  private final int parallelism;
+
+  public TestIcebergSourceBoundedGenericRecord(String format, int parallelism) {
+    this.fileFormat = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+    this.parallelism = parallelism;
+  }
+
+  @Test
+  public void testUnpartitionedTable() throws Exception {
+    Table table =
+        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
+    new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(expectedRecords);
+    TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA);
+  }
+
+  @Test
+  public void testPartitionedTable() throws Exception {
+    Table table =
+        catalogResource
+            .catalog()
+            .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC);
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
+    for (int i = 0; i < expectedRecords.size(); ++i) {
+      expectedRecords.get(i).setField("dt", "2020-03-20");

Review Comment:
   sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1072512846


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Read Iceberg rows as {@link GenericRecord}. */
+public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<GenericRecord> {
+  private final String tableName;
+  private final Schema readSchema;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final RowDataFileScanTaskReader rowDataReader;
+
+  private transient RowDataToAvroGenericRecordConverter converter;
+
+  /**
+   * Create a reader function without projection and name mapping. Column name is case-insensitive.
+   */
+  public static AvroGenericRecordReaderFunction fromTable(Table table) {
+    return new AvroGenericRecordReaderFunction(
+        table.name(),
+        new Configuration(),
+        table.schema(),
+        null,
+        null,
+        false,

Review Comment:
   `fromTable(Table table, boolean caseSensitive)` is not working to work, because we also have totally three read configs.
   ```
         Schema projectedSchema,
         String nameMapping,
         boolean caseSensitive,
   ```
   
   Regarding using `FlinkReadOptions`, are you suggesting another factory method like this? We need all three to construct the `FlinkReadOptions`.
   ```
   AvroGenericRecordReaderFunction fromTableAndReadConfig(Table table, Map<String, String> readOptions, ReadableConfig readableConfig)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1080920039


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Read Iceberg rows as {@link GenericRecord}. */
+public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<GenericRecord> {
+  private final String tableName;
+  private final Schema readSchema;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final RowDataFileScanTaskReader rowDataReader;
+
+  private transient RowDataToAvroGenericRecordConverter converter;
+
+  /**
+   * Create a reader function without projection and name mapping. Column name is case-insensitive.
+   */
+  public static AvroGenericRecordReaderFunction fromTable(Table table) {
+    return new AvroGenericRecordReaderFunction(
+        table.name(),
+        new Configuration(),
+        table.schema(),
+        null,
+        null,
+        false,

Review Comment:
   > > Can we provide a similar entry where the user simply tells the Builder that he wants to use AvroGenericRecordReaderFunction , and then lets the Builder create it?
   > 
   > I see your point. I agree that it is cumbersome for users to construct `AvroGenericRecordReaderFunction` or `RowDataReaderFunction`. Since we can extract type information from generic `T`, one idea would to add a setter for type class to the builder. If the type class is `RowData`, builder constructs `RowDataReaderFunction`. If it is Avro `GenericRecord, builder constructs `AvroGenericRecordReaderFunction`.
   > 
   > ```
   > public outputType(Class<T> outputRecordTypeClass)
   > ```
   > 
   
   I think this is a good idea, but I think this would merit another PR.
   We should do this for the Source and the Sink as well.
   
   > > we should also allow users to query metadata tables via the flink java api, right?
   > 
   > I am not sure if that is necessary. Doesn't Iceberg Java API cover that already?
   
   For the metadata table, the most important use-case is when we read the metadata on the job initialisation and change the job based on the metadata read. Do we have use-case when we want to stream the results of the metadata query? Like when new files added to the table, or new snapshot created for the table?
   
   IMHO, this is something for another PR if we decide to do it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
hililiwei commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1072136039


##########
docs/flink-getting-started.md:
##########
@@ -613,6 +613,47 @@ env.execute("Test Iceberg Streaming Read");
 There are other options that we could set by Java API, please see the 
 [IcebergSource#Builder](../../../javadoc/{{% icebergVersion %}}/org/apache/iceberg/flink/source/IcebergSource.html).
 
+### Read as Avro GenericRecord
+
+FLIP-27 Iceberg source provides `AvroGenericRecordReaderFunction` that converts
+Flink `RowData` Avro `GenericRecord`. You can use the convert to read from
+Iceberg table as Avro GenericRecord DataStream.
+
+Please make sure `flink-avro` jar is included in the classpath.
+Also `iceberg-flink-runtime` shaded bundle jar can't be used
+because the runtime jar shades the avro package.
+Please use non-shaded `iceberg-flink` jar instead.
+
+```java
+TableLoader tableLoader = ...;
+Table table;
+try (TableLoader loader = tableLoader) {
+    loader.open();
+    table = loader.loadTable();
+}
+
+AvroGenericRecordReaderFunction readerFunction =
+    new AvroGenericRecordReaderFunction(

Review Comment:
   nit: Would it look cleaner if we replaced it with `AvroGenericRecordReaderFunction.fromTable(table);`?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1081567565


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Read Iceberg rows as {@link GenericRecord}. */
+public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<GenericRecord> {
+  private final String tableName;
+  private final Schema readSchema;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final RowDataFileScanTaskReader rowDataReader;
+
+  private transient RowDataToAvroGenericRecordConverter converter;
+
+  /**
+   * Create a reader function without projection and name mapping. Column name is case-insensitive.
+   */
+  public static AvroGenericRecordReaderFunction fromTable(Table table) {
+    return new AvroGenericRecordReaderFunction(
+        table.name(),
+        new Configuration(),
+        table.schema(),
+        null,
+        null,
+        false,

Review Comment:
   @pvary yes, I totally agree with you that the `outputType` should be a separate PR if we decided that it is the right direction. and yes, we should do it for both source and sink.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1071172080


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/ListDataIteratorBatcher.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * FlinkRecordReaderFunction essentially cloned objects already. So there is no need to use array
+ * pool to clone objects. Simply create a new ArrayList for each batch.
+ */
+class ListDataIteratorBatcher<T> implements DataIteratorBatcher<T> {

Review Comment:
   Question: Do I understand correctly that the role of this class it to create a read buffer for the converted records? I would guess that this is some performance optimisation, but I do not understand why buffering helps here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1071705366


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/ListDataIteratorBatcher.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * FlinkRecordReaderFunction essentially cloned objects already. So there is no need to use array
+ * pool to clone objects. Simply create a new ArrayList for each batch.
+ */
+class ListDataIteratorBatcher<T> implements DataIteratorBatcher<T> {

Review Comment:
   The purpose of this batcher/grouping is to convert iterator of `T` to iterator of `RecordsWithSplitIds<RecordAndPosition<T>`, where `RecordsWithSplitIds` stores a batch of records. 
   
   Flink uses batched `RecordsWithSplitIds` to transfer records from source I/O threads to Flink operator threads. Batching here is to reduce the handover btw threads.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
hililiwei commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1072142396


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.data.RowDataToRowMapper;
+import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
+import org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestIcebergSourceBoundedGenericRecord {
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Rule
+  public final HadoopCatalogResource catalogResource =
+      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+  @Parameterized.Parameters(name = "format={0}, parallelism = {1}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {"avro", 2},
+      {"parquet", 2},
+      {"orc", 2}
+    };
+  }
+
+  private final FileFormat fileFormat;
+  private final int parallelism;
+
+  public TestIcebergSourceBoundedGenericRecord(String format, int parallelism) {
+    this.fileFormat = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+    this.parallelism = parallelism;
+  }
+
+  @Test
+  public void testUnpartitionedTable() throws Exception {
+    Table table =
+        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
+    new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(expectedRecords);
+    TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA);
+  }
+
+  @Test
+  public void testPartitionedTable() throws Exception {
+    Table table =
+        catalogResource
+            .catalog()
+            .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC);
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
+    for (int i = 0; i < expectedRecords.size(); ++i) {
+      expectedRecords.get(i).setField("dt", "2020-03-20");
+    }
+    new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER)
+        .appendToTable(org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), expectedRecords);
+    TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA);
+  }
+
+  @Test
+  public void testProjection() throws Exception {
+    Table table =
+        catalogResource
+            .catalog()
+            .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC);
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
+    new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER)
+        .appendToTable(org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), expectedRecords);
+    // select the "data" field (fieldId == 1)
+    Schema projectedSchema = TypeUtil.select(TestFixtures.SCHEMA, Sets.newHashSet(1));
+    List<Row> expectedRows =
+        Arrays.asList(Row.of(expectedRecords.get(0).get(0)), Row.of(expectedRecords.get(1).get(0)));
+    TestHelpers.assertRows(
+        run(projectedSchema, Collections.emptyList(), Collections.emptyMap()), expectedRows);
+  }
+
+  private List<Row> run() throws Exception {
+    return run(null, Collections.emptyList(), Collections.emptyMap());
+  }
+
+  private List<Row> run(
+      Schema projectedSchema, List<Expression> filters, Map<String, String> options)
+      throws Exception {
+
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(parallelism);
+    env.getConfig().enableObjectReuse();
+
+    Configuration config = new Configuration();
+    config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128);
+    Table table;
+    try (TableLoader tableLoader = catalogResource.tableLoader()) {
+      tableLoader.open();
+      table = tableLoader.loadTable();
+    }
+
+    AvroGenericRecordReaderFunction readerFunction =
+        new AvroGenericRecordReaderFunction(
+            TestFixtures.TABLE_IDENTIFIER.name(),
+            new Configuration(),
+            table.schema(),
+            null,
+            null,
+            false,
+            table.io(),
+            table.encryption());
+
+    IcebergSource.Builder<GenericRecord> sourceBuilder =
+        IcebergSource.<GenericRecord>builder()
+            .tableLoader(catalogResource.tableLoader())
+            .readerFunction(readerFunction)
+            .assignerFactory(new SimpleSplitAssignerFactory())
+            .flinkConfig(config);
+    if (projectedSchema != null) {
+      sourceBuilder.project(projectedSchema);
+    }
+
+    sourceBuilder.filters(filters);
+    sourceBuilder.properties(options);

Review Comment:
   nit: `properties()` is deprecated. we can use `setAll()` instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
hililiwei commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1080723408


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Read Iceberg rows as {@link GenericRecord}. */
+public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<GenericRecord> {
+  private final String tableName;
+  private final Schema readSchema;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final RowDataFileScanTaskReader rowDataReader;
+
+  private transient RowDataToAvroGenericRecordConverter converter;
+
+  /**
+   * Create a reader function without projection and name mapping. Column name is case-insensitive.
+   */
+  public static AvroGenericRecordReaderFunction fromTable(Table table) {
+    return new AvroGenericRecordReaderFunction(
+        table.name(),
+        new Configuration(),
+        table.schema(),
+        null,
+        null,
+        false,

Review Comment:
   Agree with you. Can we provide a similar entry where the user simply tells the Builder that he wants to use `AvroGenericRecordReaderFunction` , and then lets the Builder create it?
   
   > users probably only indirectly use via Flink SQL for inspecting metadata tables, right? I doubt users would ever need to construct it programmatically.
   
   Yes, that is right. However, you do remind me that we should also allow users to query metadata tables in the flink java api, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1071798337


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.data.RowDataToRowMapper;
+import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
+import org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestIcebergSourceBoundedGenericRecord {
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Rule
+  public final HadoopCatalogResource catalogResource =
+      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+  @Parameterized.Parameters(name = "format={0}, parallelism = {1}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {"avro", 2},
+      {"parquet", 2},
+      {"orc", 2}
+    };
+  }
+
+  private final FileFormat fileFormat;
+  private final int parallelism;
+
+  public TestIcebergSourceBoundedGenericRecord(String format, int parallelism) {
+    this.fileFormat = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+    this.parallelism = parallelism;
+  }
+
+  @Test
+  public void testUnpartitionedTable() throws Exception {
+    Table table =
+        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
+    new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(expectedRecords);
+    TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA);
+  }
+
+  @Test
+  public void testPartitionedTable() throws Exception {
+    Table table =
+        catalogResource
+            .catalog()
+            .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC);
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
+    for (int i = 0; i < expectedRecords.size(); ++i) {
+      expectedRecords.get(i).setField("dt", "2020-03-20");

Review Comment:
   Nit: don't we have a constant for `dt`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
hililiwei commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1073030444


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Read Iceberg rows as {@link GenericRecord}. */
+public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<GenericRecord> {
+  private final String tableName;
+  private final Schema readSchema;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final RowDataFileScanTaskReader rowDataReader;
+
+  private transient RowDataToAvroGenericRecordConverter converter;
+
+  /**
+   * Create a reader function without projection and name mapping. Column name is case-insensitive.
+   */
+  public static AvroGenericRecordReaderFunction fromTable(Table table) {
+    return new AvroGenericRecordReaderFunction(
+        table.name(),
+        new Configuration(),
+        table.schema(),
+        null,
+        null,
+        false,

Review Comment:
   I suggest the following method because my personal feeling is that `caseSensitive` is used a little more frequently. Of course, what you said is also reasonable, it is ok for me.
   ```
     public static AvroGenericRecordReaderFunction fromTable(Table table, boolean caseSensitive) {
       return new AvroGenericRecordReaderFunction(
           table.name(),
           new Configuration(),
           table.schema(),
           null,
           null,
           caseSensitive,
           table.io(),
           table.encryption());
     }
   ```
   Regarding using `FlinkReadOptions`. Essentially, I'm interested in whether it can be built internally, rather than by the user.
   So far, we've had three ReaderFuntion: `AvroGenericRecordReaderFunction` \ `MetaDataReaderFunction` \ `RowDataReaderFunction`, maybe we'll have new members in the future. Could we provide an enum for the user to choose from and then help build it automatically in the `IcebergSourceBuilder` instead of letting the user create it themselves?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
hililiwei commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1080723408


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Read Iceberg rows as {@link GenericRecord}. */
+public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<GenericRecord> {
+  private final String tableName;
+  private final Schema readSchema;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final RowDataFileScanTaskReader rowDataReader;
+
+  private transient RowDataToAvroGenericRecordConverter converter;
+
+  /**
+   * Create a reader function without projection and name mapping. Column name is case-insensitive.
+   */
+  public static AvroGenericRecordReaderFunction fromTable(Table table) {
+    return new AvroGenericRecordReaderFunction(
+        table.name(),
+        new Configuration(),
+        table.schema(),
+        null,
+        null,
+        false,

Review Comment:
   Agree with you. Can we provide a similar entry where the user simply tells the Builder that he wants to use `AvroGenericRecordReaderFunction` , and then lets the Builder create it?
   
   > users probably only indirectly use via Flink SQL for inspecting metadata tables, right? I doubt users would ever need to construct it programmatically.
   
   Yes, that is right. However, you do remind me that we should also allow users to query metadata tables via the flink java api, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6584:
URL: https://github.com/apache/iceberg/pull/6584#discussion_r1082782121


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Read Iceberg rows as {@link GenericRecord}. */
+public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<GenericRecord> {
+  private final String tableName;
+  private final Schema readSchema;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final RowDataFileScanTaskReader rowDataReader;
+
+  private transient RowDataToAvroGenericRecordConverter converter;
+
+  /**
+   * Create a reader function without projection and name mapping. Column name is case-insensitive.
+   */
+  public static AvroGenericRecordReaderFunction fromTable(Table table) {
+    return new AvroGenericRecordReaderFunction(
+        table.name(),
+        new Configuration(),
+        table.schema(),
+        null,
+        null,
+        false,

Review Comment:
   @hililiwei I am merging this PR. please continue to comment on this thread on reader function. I will follow up with a separate PR on the `withOutputType` idea if we agree that is the right direction. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu merged pull request #6584: Flink: support reading as Avro GenericRecord for FLIP-27 IcebergSource

Posted by GitBox <gi...@apache.org>.
stevenzwu merged PR #6584:
URL: https://github.com/apache/iceberg/pull/6584


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org