You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/03/08 17:48:53 UTC

[GitHub] [iceberg] stevenzwu opened a new pull request #2305: Flink: FLIP-27 source split and reader

stevenzwu opened a new pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305


   This is the first feature PR for FLIP-27 source.
   
   Currently, there are at least two open questions to be addressed. Since I will be out for the rest of the week, I like to put this out first.
   
   1) @openinx suggested that we break the `DataIterator` into two levels (combined and file tasks). I have a question that maybe @openinx can confirm [in the comment from the uber PR](https://github.com/apache/iceberg/pull/2105#discussion_r568303231).
   
   2) Reader is currently implemented on top of `FileSourceSplit` and `BulkFormat`. The original reason is that Jingsong mentioned that we may be able to take advantage of the high-performant vectorized readers from Flink. But I am revisiting that decision. It is unlikely Flink's vectorized readers will support deletes. It seems that Iceberg is also adding vectorized readers and I assume Iceberg implementations will support deletes.
   
   @openinx @sundargates @tweise 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r704925686



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -27,23 +27,39 @@
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
-class FlinkSplitGenerator {
+public class FlinkSplitGenerator {
   private FlinkSplitGenerator() {
   }
 
   static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
-    List<CombinedScanTask> tasks = tasks(table, context);
-    FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
+    try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
+      List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
+      FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
+      for (int i = 0; i < tasks.size(); i++) {
+        splits[i] = new FlinkInputSplit(i, tasks.get(i));
+      }
+      return splits;
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to process tasks iterable", e);
+    }
+  }
+
+  public static List<IcebergSourceSplit> planIcebergSourceSplits(

Review comment:
       Should we add a javadoc to indicate why do we need to add an extra `planIcebergSourceSplits` (compared to `createInputSplits`) ?  I think it's used for implementing the flip27's SourceSplit, right ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] tweise commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r716115030



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/FileRecords.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.io.CloseableIterator;
+
+/**
+ * A batch of recrods for one split
+ */
+@Internal
+public class FileRecords<T> implements RecordsWithSplitIds<RecordAndPosition<T>> {

Review comment:
       Found the class name not intuitive. From usage, this appears to be a "fetch result"?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.split;
+
+import java.io.IOException;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.InstantiationUtil;
+
+/**
+ * TODO: use Java serialization for now.
+ * will switch to more stable serializer from issue-1698.

Review comment:
       Please add full link. With new version such subsequent change will be backward compatible?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.split;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+
+public class IcebergSourceSplit extends FileSourceSplit {
+
+  public enum Status {
+    UNASSIGNED,
+    ASSIGNED,
+    COMPLETED
+  }
+
+  private final CombinedScanTask task;
+  @Nullable
+  private final CheckpointedPosition checkpointedPosition;
+
+  /**
+   * The splits are frequently serialized into checkpoints.
+   * Caching the byte representation makes repeated serialization cheap.
+   */
+  @Nullable private transient byte[] serializedFormCache;
+
+  public IcebergSourceSplit(CombinedScanTask task, CheckpointedPosition checkpointedPosition) {
+    // Supply dummy values so that IcebergSourceSplit extend from FileSourceSplit,
+    // as required by using BulkFormat interface in IcebergSource.
+    // We are hoping to clean this up after FLINK-20174 is resolved.
+    super("", new Path("file:///dummy"), 0L, 0L);
+    this.task = task;
+    this.checkpointedPosition = checkpointedPosition;
+  }
+
+  public static IcebergSourceSplit fromCombinedScanTask(CombinedScanTask combinedScanTask) {
+    return new IcebergSourceSplit(combinedScanTask, null);
+  }
+
+  public static IcebergSourceSplit fromSplitState(MutableIcebergSourceSplit state) {
+    return new IcebergSourceSplit(state.task(), new CheckpointedPosition(
+        state.offset(), state.recordsToSkipAfterOffset()));
+  }
+
+  public CombinedScanTask task() {
+    return task;
+  }
+
+  public CheckpointedPosition checkpointedPosition() {
+    return checkpointedPosition;
+  }
+
+  public byte[] serializedFormCache() {

Review comment:
       You mean by `IcebergSourceSplitSerializer`. Since that's same package it doesn't need to be public?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/DataIteratorReaderFunction.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.CloseableIterator;
+
+public abstract class DataIteratorReaderFunction<T> implements ReaderFunction<T> {

Review comment:
       internal or javadoc?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/RecyclableArrayIterator.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import javax.annotation.Nullable;
+import org.apache.flink.connector.file.src.util.ArrayResultIterator;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.io.CloseableIterator;
+
+/**
+ * Similar to the {@link ArrayResultIterator}.
+ * Main difference is the records array can be recycled back to a pool.
+ */
+final class RecyclableArrayIterator<E> implements CloseableIterator<RecordAndPosition<E>> {
+  private final Pool.Recycler<E[]> recycler;
+  private final E[] records;
+  private final int num;
+  private final MutableRecordAndPosition<E> recordAndPosition;
+
+  private int pos;
+
+  RecyclableArrayIterator(Pool.Recycler<E[]> recycler) {
+    this(recycler, null, 0, CheckpointedPosition.NO_OFFSET, 0L);
+  }
+
+  /**
+   * Each record's {@link RecordAndPosition} will have the same fileOffset (for {@link RecordAndPosition#getOffset()}.

Review comment:
       should this move to class level doc?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import javax.annotation.Nullable;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.CloseableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, IcebergSourceSplit> {

Review comment:
       Did you consider the need to subclass the reader for customization? Maybe it should be protected?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/DataIteratorBatcher.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.io.CloseableIterator;
+
+@FunctionalInterface

Review comment:
       javadoc?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r632266391



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/CombinedScanTaskIterator.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Iterator;
+import javax.annotation.Nullable;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Base class of Flink iterators.
+ *
+ * @param <T> is the Java class returned by this iterator whose objects contain one or more rows.
+ */
+public class CombinedScanTaskIterator<T> implements CloseableIterator<T> {
+
+  private final FileIteratorReader<T> fileIteratorReader;
+  private final DecryptedInputFiles decryptedInputFiles;
+  private final Iterator<FileScanTask> tasks;
+  private final Position position;
+
+  private CloseableIterator<T> currentIterator;
+
+  public CombinedScanTaskIterator(CombinedScanTask combinedTask, FileIO io, EncryptionManager encryption,
+                                  FileIteratorReader<T> fileIteratorReader) {
+    this(combinedTask, io, encryption, fileIteratorReader, null);
+  }
+
+  public CombinedScanTaskIterator(CombinedScanTask combinedTask, FileIO io, EncryptionManager encryption,
+                                  FileIteratorReader<T> fileIteratorReader, @Nullable Position startingPosition) {
+    this.fileIteratorReader = fileIteratorReader;
+    this.decryptedInputFiles = new DecryptedInputFiles(combinedTask, io, encryption);
+    this.tasks = combinedTask.files().iterator();
+
+    if (startingPosition != null) {
+      this.position = startingPosition;
+      // skip files
+      Preconditions.checkArgument(position.fileOffset() < combinedTask.files().size(),
+          String.format("Starting file offset is %d, while CombinedScanTask has %d files",
+              position.fileOffset(), combinedTask.files().size()));
+      for (long i = 0L; i < position.fileOffset; ++i) {
+        tasks.next();
+      }
+    } else {
+      this.position = new Position();
+    }
+
+    final FileScanTask startingFileTask = tasks.next();
+    this.currentIterator = fileIteratorReader.open(startingFileTask, decryptedInputFiles);
+
+    // skip records
+    for (int i = 0; i < position.recordOffset(); ++i) {
+      if (currentIterator.hasNext()) {
+        currentIterator.next();
+      } else {
+        throw new IllegalArgumentException(String.format(
+            "File has less than %d records: %s", position.recordOffset, startingFileTask.file().path()));
+      }
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    updateCurrentIterator();
+    return currentIterator.hasNext();
+  }
+
+  @Override
+  public T next() {
+    updateCurrentIterator();
+    position.advanceRecord();
+    return currentIterator.next();
+  }
+
+  public boolean isCurrentIteratorDone() {
+    return !currentIterator.hasNext();
+  }
+
+  /**
+   * Updates the current iterator field to ensure that the current Iterator is not exhausted.
+   */
+  private void updateCurrentIterator() {
+    try {
+      while (!currentIterator.hasNext() && tasks.hasNext()) {
+        currentIterator.close();
+        currentIterator = fileIteratorReader.open(tasks.next(), decryptedInputFiles);
+        position.advanceFile();
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    // close the current iterator
+    currentIterator.close();
+  }
+
+  public Position position() {
+    return position;
+  }
+
+  public static class Position {

Review comment:
       @openinx following up your comment from the uber PR: https://github.com/apache/iceberg/pull/2105/files#r630834205.
   
   The reason I introduce this mutable `Position` class is to avoid the construction of a <fileOffset, recordOffset> object. It is the current cursor for the iterator. 
   
   Didn't track the recordOffset inside the `FileIteratorReader` for the same reason. Otherwise, `position()` getter will construct a new object each time.
   
   We can't use CheckpointedPosition from Flink for two reasons: (1) it is immutable (2) we want to return the current position (not necessarily CheckpointedPosition).
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739952693



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -41,16 +42,45 @@
   private final FileScanTaskReader<T> fileScanTaskReader;
 
   private final InputFilesDecryptor inputFilesDecryptor;
+  private final CombinedScanTask combinedTask;
+
   private Iterator<FileScanTask> tasks;
   private CloseableIterator<T> currentIterator;
+  private Position position;
 
   public DataIterator(FileScanTaskReader<T> fileScanTaskReader, CombinedScanTask task,
                       FileIO io, EncryptionManager encryption) {
     this.fileScanTaskReader = fileScanTaskReader;
 
     this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
+    this.combinedTask = task;
+
     this.tasks = task.files().iterator();
     this.currentIterator = CloseableIterator.empty();
+    // fileOffset starts at -1 because we started
+    // from an empty iterator that is not from the split files.
+    this.position = new Position(-1L, 0L);
+  }
+
+  public void seek(Position startingPosition) {
+    // skip files
+    Preconditions.checkArgument(startingPosition.fileOffset() < combinedTask.files().size(),
+        "Checkpointed file offset is %d, while CombinedScanTask has %d files",
+        startingPosition.fileOffset(), combinedTask.files().size());
+    for (long i = 0L; i < startingPosition.fileOffset(); ++i) {

Review comment:
       same as another comment. will update




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r740413976



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -41,18 +42,47 @@
   private final FileScanTaskReader<T> fileScanTaskReader;
 
   private final InputFilesDecryptor inputFilesDecryptor;
-  private Iterator<FileScanTask> tasks;
+  private final CombinedScanTask combinedTask;
+  private final Position position;
+
+  private Iterator<FileScanTask> fileTasksIterator;
   private CloseableIterator<T> currentIterator;
 
   public DataIterator(FileScanTaskReader<T> fileScanTaskReader, CombinedScanTask task,
                       FileIO io, EncryptionManager encryption) {
     this.fileScanTaskReader = fileScanTaskReader;
 
     this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
-    this.tasks = task.files().iterator();
+    this.combinedTask = task;
+    // fileOffset starts at -1 because we started
+    // from an empty iterator that is not from the split files.
+    this.position = new Position(-1, 0L);

Review comment:
       I wouldn't say that `seek` capability is FLIP-27 specific. If we think `DataIterator` as reading a list of files/splits from `CombinedScanTask`, it is like a file API where `seek` is pretty common. It is needed to achieve exactly-once processing semantics. e.g., if we were to implement exactly once semantics for the current streaming source, I would imagine we need this as well.
   
   Thanks a lot for the `SeekableDataIterator`. I feel that leaving these two empty abstract methods in the base `DataIterator` is a little weird
   ```
   protected void advanceRecord()
   protected void advanceTask()
   ```
   
   Overall, I still think adding `seek` capability to `DataIterator` is natural (for file-like read APIs)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739931222



##########
File path: flink-runtime/build.gradle
##########
@@ -42,6 +42,8 @@ project(':iceberg-flink-runtime') {
     implementation(project(':iceberg-nessie')) {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
+
+    implementation "org.apache.flink:flink-connector-base"

Review comment:
       added comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r742176377



##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       yeah. that is my plan too. Once 1.12 support is removed, we should be able to move files back to the common module. We just need to be diligent with these efforts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r705903904



##########
File path: build.gradle
##########
@@ -320,6 +321,7 @@ project(':iceberg-flink') {
     compile project(':iceberg-parquet')
     compile project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       `SplitReader`, `RecordsWithSplitIds` are part of the core API that source implementation needs to extend/implement from. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739953521



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
##########
@@ -48,7 +48,6 @@
 
 @Internal
 public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
-

Review comment:
       will revert




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739957456



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayPoolDataIteratorBatcher.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.Position;
+import org.apache.iceberg.io.CloseableIterator;
+
+class ArrayPoolDataIteratorBatcher<T> implements DataIteratorBatcher<T> {
+  private final Configuration config;
+  private final RecordFactory<T> recordFactory;
+
+  ArrayPoolDataIteratorBatcher(Configuration config, RecordFactory<T> recordFactory) {
+    this.config = config;
+    this.recordFactory = recordFactory;
+  }
+
+  @Override
+  public CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> apply(
+      String splitId, DataIterator<T> inputIterator) {
+    return new ArrayPoolBatchIterator(splitId, inputIterator);
+  }
+
+  private class ArrayPoolBatchIterator implements CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> {
+
+    private final String splitId;
+    private final DataIterator<T> inputIterator;
+    private final int batchSize;
+    private final Pool<T[]> pool;
+
+    ArrayPoolBatchIterator(String splitId, DataIterator<T> inputIterator) {
+      this.splitId = splitId;
+      this.inputIterator = inputIterator;
+      this.batchSize = config.getInteger(FlinkConfigOptions.SOURCE_READER_FETCH_RECORD_BATCH_SIZE);
+      this.pool = createPoolOfBatches(config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
+    }
+
+    @Override
+    public boolean hasNext() {
+      return inputIterator.hasNext();
+    }
+
+    @Override
+    public RecordsWithSplitIds<RecordAndPosition<T>> next() {
+      final T[] batch = getCachedEntry();
+      int num = 0;
+      while (inputIterator.hasNext() && num < batchSize) {
+        T nextRecord = inputIterator.next();
+        recordFactory.clone(nextRecord, batch[num]);

Review comment:
       you are exactly right. will add code comment

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayPoolDataIteratorBatcher.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.Position;
+import org.apache.iceberg.io.CloseableIterator;
+
+class ArrayPoolDataIteratorBatcher<T> implements DataIteratorBatcher<T> {
+  private final Configuration config;
+  private final RecordFactory<T> recordFactory;
+
+  ArrayPoolDataIteratorBatcher(Configuration config, RecordFactory<T> recordFactory) {
+    this.config = config;
+    this.recordFactory = recordFactory;
+  }
+
+  @Override
+  public CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> apply(
+      String splitId, DataIterator<T> inputIterator) {
+    return new ArrayPoolBatchIterator(splitId, inputIterator);
+  }
+
+  private class ArrayPoolBatchIterator implements CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> {
+
+    private final String splitId;
+    private final DataIterator<T> inputIterator;
+    private final int batchSize;
+    private final Pool<T[]> pool;
+
+    ArrayPoolBatchIterator(String splitId, DataIterator<T> inputIterator) {
+      this.splitId = splitId;
+      this.inputIterator = inputIterator;
+      this.batchSize = config.getInteger(FlinkConfigOptions.SOURCE_READER_FETCH_RECORD_BATCH_SIZE);
+      this.pool = createPoolOfBatches(config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
+    }
+
+    @Override
+    public boolean hasNext() {
+      return inputIterator.hasNext();
+    }
+
+    @Override
+    public RecordsWithSplitIds<RecordAndPosition<T>> next() {
+      final T[] batch = getCachedEntry();
+      int num = 0;
+      while (inputIterator.hasNext() && num < batchSize) {
+        T nextRecord = inputIterator.next();
+        recordFactory.clone(nextRecord, batch[num]);
+        num++;
+        if (inputIterator.isCurrentIteratorDone()) {
+          // break early so that records in the ArrayResultIterator
+          // have the same fileOffset.
+          break;
+        }
+      }
+      if (num == 0) {

Review comment:
       will do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r740076138



##########
File path: flink-runtime/build.gradle
##########
@@ -42,6 +42,8 @@ project(':iceberg-flink-runtime') {
     implementation(project(':iceberg-nessie')) {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
+
+    implementation "org.apache.flink:flink-connector-base"

Review comment:
       @stevenzwu  After we got this PR merged: https://github.com/apache/iceberg/pull/3364,  we don't need to introduce a common `iceberg-flink-runtime` for all different flink versions, instead we have a different `iceberg-flink:iceberg-flink-<MAJOR.MINO>-runtime` module for different `<MAJRO.MINOR>` flink releases so that we could build the features on top of the latest flink API. 
   
   You may want to add the transitive dependency `org.apache.flink:flink-connector-base` in[ this line for flink 1.12](https://github.com/apache/iceberg/blob/master/flink/v1.12/build.gradle#L154) , and[ this line for flink 1.13.](https://github.com/apache/iceberg/blob/master/flink/v1.13/build.gradle#L154)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739337259



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +40,10 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Integer> SOURCE_READER_FETCH_BATCH_SIZE = ConfigOptions
+      .key("source.iceberg.reader.fetch-batch-size")
+      .intType()
+      .defaultValue(2048)

Review comment:
       Yeah, the description should specify that this is some number of rows.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739936992



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -41,16 +42,45 @@
   private final FileScanTaskReader<T> fileScanTaskReader;
 
   private final InputFilesDecryptor inputFilesDecryptor;
+  private final CombinedScanTask combinedTask;
+
   private Iterator<FileScanTask> tasks;
   private CloseableIterator<T> currentIterator;
+  private Position position;
 
   public DataIterator(FileScanTaskReader<T> fileScanTaskReader, CombinedScanTask task,
                       FileIO io, EncryptionManager encryption) {
     this.fileScanTaskReader = fileScanTaskReader;
 
     this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
+    this.combinedTask = task;
+
     this.tasks = task.files().iterator();
     this.currentIterator = CloseableIterator.empty();
+    // fileOffset starts at -1 because we started
+    // from an empty iterator that is not from the split files.
+    this.position = new Position(-1L, 0L);
+  }
+
+  public void seek(Position startingPosition) {
+    // skip files
+    Preconditions.checkArgument(startingPosition.fileOffset() < combinedTask.files().size(),
+        "Checkpointed file offset is %d, while CombinedScanTask has %d files",
+        startingPosition.fileOffset(), combinedTask.files().size());
+    for (long i = 0L; i < startingPosition.fileOffset(); ++i) {

Review comment:
       integer would certainly be sufficient. I was using `long` to match the type in `RecordAndPosition` from flink-connector-files module. Looking at it again. The `long offset` from Flink's `RecordAndPosition` actually meant byte offset within a file. I will define our own `RecordAndPosition` and change `fileOffset` to `int` type.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r742176377



##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       yeah. that is my plan too. Once 1.12 support is removed, we should be able to move files back to the common module. We just need to be diligent with these efforts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r661613574



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.split;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+
+public class IcebergSourceSplit extends FileSourceSplit {
+
+  public enum Status {
+    UNASSIGNED,
+    ASSIGNED,
+    COMPLETED
+  }
+
+  private final CombinedScanTask task;
+  @Nullable
+  private final CheckpointedPosition checkpointedPosition;
+
+  /**
+   * The splits are frequently serialized into checkpoints.
+   * Caching the byte representation makes repeated serialization cheap.
+   */
+  @Nullable private transient byte[] serializedFormCache;
+
+  public IcebergSourceSplit(CombinedScanTask task, CheckpointedPosition checkpointedPosition) {
+    // Supply dummy values so that IcebergSourceSplit extend from FileSourceSplit,
+    // as required by using BulkFormat interface in IcebergSource.
+    // We are hoping to clean this up after FLINK-20174 is resolved.
+    super("", new Path("file:///dummy"), 0L, 0L);
+    this.task = task;
+    this.checkpointedPosition = checkpointedPosition;
+  }
+
+  public static IcebergSourceSplit fromCombinedScanTask(CombinedScanTask combinedScanTask) {
+    return new IcebergSourceSplit(combinedScanTask, null);
+  }
+
+  public static IcebergSourceSplit fromSplitState(MutableIcebergSourceSplit state) {
+    return new IcebergSourceSplit(state.task(), new CheckpointedPosition(
+        state.offset(), state.recordsToSkipAfterOffset()));
+  }
+
+  public CombinedScanTask task() {
+    return task;
+  }
+
+  public CheckpointedPosition checkpointedPosition() {
+    return checkpointedPosition;
+  }
+
+  public byte[] serializedFormCache() {

Review comment:
       it is actually used by `IcebergSourceSplit`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r666413152



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -114,4 +145,43 @@ public void close() throws IOException {
     currentIterator.close();
     tasks = null;
   }
+
+  public Position position() {

Review comment:
       That is a good point. It is also related to your question above. Let me see how to unify them and maybe move away from Flink's CheckpointedPosition




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] sundargates commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
sundargates commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r659247283



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.split;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+
+public class IcebergSourceSplit extends FileSourceSplit {

Review comment:
       what's the rationale for extending from FileSourceSplit given that it is used for representing a file itself and based on the passed args from the constructor in line 56, it doesn't seem to map well to the iceberg file split?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.split;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+
+public class IcebergSourceSplit extends FileSourceSplit {
+
+  public enum Status {

Review comment:
       nit: Given that this Status is not a field in the Split itself, wondering if we should move this to a separate file?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.split;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+
+public class IcebergSourceSplit extends FileSourceSplit {
+
+  public enum Status {
+    UNASSIGNED,
+    ASSIGNED,
+    COMPLETED
+  }
+
+  private final CombinedScanTask task;
+  @Nullable
+  private final CheckpointedPosition checkpointedPosition;
+
+  /**
+   * The splits are frequently serialized into checkpoints.
+   * Caching the byte representation makes repeated serialization cheap.
+   */
+  @Nullable private transient byte[] serializedFormCache;
+
+  public IcebergSourceSplit(CombinedScanTask task, CheckpointedPosition checkpointedPosition) {
+    // Supply dummy values so that IcebergSourceSplit extend from FileSourceSplit,
+    // as required by using BulkFormat interface in IcebergSource.
+    // We are hoping to clean this up after FLINK-20174 is resolved.
+    super("", new Path("file:///dummy"), 0L, 0L);
+    this.task = task;
+    this.checkpointedPosition = checkpointedPosition;
+  }
+
+  public static IcebergSourceSplit fromCombinedScanTask(CombinedScanTask combinedScanTask) {
+    return new IcebergSourceSplit(combinedScanTask, null);
+  }
+
+  public static IcebergSourceSplit fromSplitState(MutableIcebergSourceSplit state) {
+    return new IcebergSourceSplit(state.task(), new CheckpointedPosition(
+        state.offset(), state.recordsToSkipAfterOffset()));
+  }
+
+  public CombinedScanTask task() {
+    return task;
+  }
+
+  public CheckpointedPosition checkpointedPosition() {
+    return checkpointedPosition;
+  }
+
+  public byte[] serializedFormCache() {

Review comment:
       nit: I'm guessing this need not be public given that it's an optimization primarily around making serialization cheap?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] holdenk commented on pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
holdenk commented on pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#issuecomment-868725115


   Hey @stevenzwu can you rebase or merge in master branch github is showing a conflicting file.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r666430823



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderFactory.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+
+public interface ReaderFactory<T> extends Serializable {
+
+  /**
+   * An iterator over records with their position in the file. The iterator is closeable to
+   * support clean resource release and recycling.
+   *
+   * @param <T> The type of the record.
+   */
+  interface RecordIterator<T> {
+
+    /**
+     * Gets the next record from the file, together with its position.
+     *
+     * <p>The position information returned with the record point to the record AFTER the
+     * returned record, because it defines the point where the reading should resume once the
+     * current record is emitted. The position information is put in the source's state when the
+     * record is emitted. If a checkpoint is taken directly after the record is emitted, the
+     * checkpoint must to describe where to resume the source reading from after that record.
+     *
+     * <p>Objects returned by this method may be reused by the iterator. By the time that this
+     * method is called again, no object returned from the previous call will be referenced any
+     * more. That makes it possible to have a single {@link MutableRecordAndPosition} object and
+     * return the same instance (with updated record and position) on every call.
+     */
+    @Nullable
+    RecordAndPosition<T> next();
+
+    /**
+     * Releases the batch that this iterator iterated over. This is not supposed to close the
+     * reader and its resources, but is simply a signal that this iterator is no used any more.
+     * This method can be used as a hook to recycle/reuse heavyweight object structures.
+     */
+    void releaseBatch();
+  }
+
+  /**
+   * A batch reader for a {@link IcebergSourceSplit}
+   *
+   * @param <T> output record type
+   */
+  interface Reader<T> {

Review comment:
       I would need `FileRecords`, as it is an implementation of the `RecordsWithSplitIds` interface. 
   
   I see your point on the complexity. let me see how to simplify the abstractions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r591410354



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -79,6 +83,27 @@ InputFile getInputFile(String location) {
     return inputFiles.get(location);
   }
 
+  public void seek(CheckpointedPosition checkpointedPosition)  {
+    // skip files
+    Preconditions.checkArgument(checkpointedPosition.getOffset() < combinedTask.files().size(),

Review comment:
       Nit:   could simplify it as: 
   
   ```java
       Preconditions.checkArgument(checkpointedPosition.getOffset() < combinedTask.files().size(),
           "Checkpointed file offset is %s, while CombinedScanTask has %s files",
           checkpointedPosition.getOffset(), combinedTask.files().size());
   ```

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/FlinkBulkFormatAdaptor.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This adaptor use Flink BulkFormat implementation to read data file.
+ * Note that Flink BulkFormat may not support row deletes like {@link RowDataIteratorBulkFormat}
+ */
+public class FlinkBulkFormatAdaptor<T> implements BulkFormat<T, IcebergSourceSplit> {
+
+  private final Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider;
+  private final TypeInformation<T> producedType;
+
+  public FlinkBulkFormatAdaptor(Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider) {
+    this.bulkFormatProvider = bulkFormatProvider;
+    // validate that all BulkFormat produce the same type
+    List<TypeInformation<T>> uniqueTypes = bulkFormatProvider.values().stream()
+        .map(bulkFormat -> bulkFormat.getProducedType())
+        .distinct()
+        .collect(Collectors.toList());
+    Preconditions.checkArgument(uniqueTypes.size() == 1,
+        "BulkFormats have the different producedType: " + uniqueTypes);
+    producedType = uniqueTypes.get(0);
+  }
+
+  @Override
+  public Reader<T> createReader(Configuration config, IcebergSourceSplit split) throws IOException {
+    return new ReaderAdaptor<T>(bulkFormatProvider, config, split, false);
+  }
+
+  @Override
+  public Reader<T> restoreReader(Configuration config, IcebergSourceSplit split) throws IOException {
+    return new ReaderAdaptor<T>(bulkFormatProvider, config, split, true);
+  }
+
+  @Override
+  public boolean isSplittable() {
+    return false;
+  }
+
+  @Override
+  public TypeInformation<T> getProducedType() {
+    return producedType;
+  }
+
+  private static final class ReaderAdaptor<T> implements BulkFormat.Reader<T> {
+
+    private final Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider;
+    private final Configuration config;
+    private final Iterator<FileScanTask> fileIterator;
+    private final boolean isRestored;
+
+    // file offset in CombinedScanTask
+    private int fileOffset = -1;
+    private Reader<T> currentReader;
+
+    ReaderAdaptor(
+        Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider,
+        Configuration config,
+        IcebergSourceSplit icebergSplit,
+        boolean isRestored) throws IOException {
+      this.config = config;
+      this.bulkFormatProvider = bulkFormatProvider;
+      this.fileIterator = icebergSplit.task().files().iterator();
+      this.isRestored = isRestored;
+
+      final CheckpointedPosition position = icebergSplit.checkpointedPosition();
+      if (position != null) {
+        // skip files based on offset in checkpointed position
+        Preconditions.checkArgument(position.getOffset() < icebergSplit.task().files().size(),

Review comment:
       Nit:  use the method 
   
   ```java
   checkArgument(boolean expression, @Nullable String errorMessageTemplate, @Nullable Object... errorMessageArgs)
   ```

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.IcebergSourceEvents;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.MutableIcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+public class IcebergSourceReader<T> extends

Review comment:
       This class don't have to be introduced in this PR ? I see there's no usage.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/FlinkBulkFormatAdaptor.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This adaptor use Flink BulkFormat implementation to read data file.
+ * Note that Flink BulkFormat may not support row deletes like {@link RowDataIteratorBulkFormat}
+ */
+public class FlinkBulkFormatAdaptor<T> implements BulkFormat<T, IcebergSourceSplit> {
+
+  private final Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider;
+  private final TypeInformation<T> producedType;
+
+  public FlinkBulkFormatAdaptor(Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider) {
+    this.bulkFormatProvider = bulkFormatProvider;
+    // validate that all BulkFormat produce the same type
+    List<TypeInformation<T>> uniqueTypes = bulkFormatProvider.values().stream()
+        .map(bulkFormat -> bulkFormat.getProducedType())
+        .distinct()
+        .collect(Collectors.toList());
+    Preconditions.checkArgument(uniqueTypes.size() == 1,
+        "BulkFormats have the different producedType: " + uniqueTypes);
+    producedType = uniqueTypes.get(0);
+  }
+
+  @Override
+  public Reader<T> createReader(Configuration config, IcebergSourceSplit split) throws IOException {
+    return new ReaderAdaptor<T>(bulkFormatProvider, config, split, false);

Review comment:
       Nit:  `new ReaderAdaptor<>(...)` ? 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/IcebergSourceOptions.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public interface IcebergSourceOptions {

Review comment:
       We've introduced a `FlinkTableOptions` , I think it's not friendly to create options classes for source, sink, table etc.  Maybe we could rename the `FlinkTableOptions` to `FlinkConfigOptions`, and put all the options into that class.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/FlinkBulkFormatAdaptor.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This adaptor use Flink BulkFormat implementation to read data file.
+ * Note that Flink BulkFormat may not support row deletes like {@link RowDataIteratorBulkFormat}
+ */
+public class FlinkBulkFormatAdaptor<T> implements BulkFormat<T, IcebergSourceSplit> {
+
+  private final Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider;
+  private final TypeInformation<T> producedType;
+
+  public FlinkBulkFormatAdaptor(Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider) {
+    this.bulkFormatProvider = bulkFormatProvider;
+    // validate that all BulkFormat produce the same type
+    List<TypeInformation<T>> uniqueTypes = bulkFormatProvider.values().stream()
+        .map(bulkFormat -> bulkFormat.getProducedType())
+        .distinct()
+        .collect(Collectors.toList());
+    Preconditions.checkArgument(uniqueTypes.size() == 1,
+        "BulkFormats have the different producedType: " + uniqueTypes);
+    producedType = uniqueTypes.get(0);
+  }
+
+  @Override
+  public Reader<T> createReader(Configuration config, IcebergSourceSplit split) throws IOException {
+    return new ReaderAdaptor<T>(bulkFormatProvider, config, split, false);
+  }
+
+  @Override
+  public Reader<T> restoreReader(Configuration config, IcebergSourceSplit split) throws IOException {
+    return new ReaderAdaptor<T>(bulkFormatProvider, config, split, true);
+  }
+
+  @Override
+  public boolean isSplittable() {
+    return false;
+  }
+
+  @Override
+  public TypeInformation<T> getProducedType() {
+    return producedType;
+  }
+
+  private static final class ReaderAdaptor<T> implements BulkFormat.Reader<T> {
+
+    private final Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider;
+    private final Configuration config;
+    private final Iterator<FileScanTask> fileIterator;
+    private final boolean isRestored;
+
+    // file offset in CombinedScanTask
+    private int fileOffset = -1;
+    private Reader<T> currentReader;
+
+    ReaderAdaptor(
+        Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider,
+        Configuration config,
+        IcebergSourceSplit icebergSplit,
+        boolean isRestored) throws IOException {
+      this.config = config;
+      this.bulkFormatProvider = bulkFormatProvider;
+      this.fileIterator = icebergSplit.task().files().iterator();
+      this.isRestored = isRestored;
+
+      final CheckpointedPosition position = icebergSplit.checkpointedPosition();
+      if (position != null) {
+        // skip files based on offset in checkpointed position
+        Preconditions.checkArgument(position.getOffset() < icebergSplit.task().files().size(),
+            String.format("Checkpointed file offset is %d, while CombinedScanTask has %d files",
+                position.getOffset(), icebergSplit.task().files().size()));
+        for (int i = 0; i < position.getOffset(); ++i) {
+          fileIterator.next();
+          fileOffset++;
+        }
+        // first file may need to skip records
+        setupReader(position.getRecordsAfterOffset());
+      } else {
+        setupReader(0L);
+      }
+    }
+
+    /**
+     * TODO: we can't return RecordIterator with empty data.
+     * Otherwise, caller may assume it is end of input.
+     * We probably need to add a {@code hasNext()} API to
+     * {@link RecordIterator} to achieve the goal.
+     */
+    @Nullable
+    @Override
+    public RecordIterator<T> readBatch() throws IOException {
+      RecordIterator<T> result = null;
+      while (currentReader != null || fileIterator.hasNext()) {
+        if (currentReader == null) {
+          setupReader(0L);
+        } else {
+          result = currentReader.readBatch();
+          if (result != null) {
+            break;
+          } else {
+            closeCurrentReader();
+          }
+        }
+      }
+      if (result == null) {
+        return null;
+      } else {
+        return new RecordIteratorAdaptor(fileOffset, result);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      closeCurrentReader();
+    }
+
+    private void closeCurrentReader() throws IOException {
+      if (currentReader != null) {
+        currentReader.close();
+        currentReader = null;
+      }
+    }
+
+    private void setupReader(long skipRecordCount) throws IOException {
+      if (fileIterator.hasNext()) {
+        final FileScanTask fileScanTask = fileIterator.next();
+        final FileFormat fileFormat = fileScanTask.file().format();
+        if (!bulkFormatProvider.containsKey(fileFormat)) {
+          throw new IOException("Unsupported file format: " + fileFormat);
+        }
+        final BulkFormat<T, FileSourceSplit> bulkFormat = bulkFormatProvider.get(fileFormat);
+        fileOffset++;
+        final FileSourceSplit fileSourceSplit = new FileSourceSplit(
+            "",
+            new Path(URI.create(fileScanTask.file().path().toString())),

Review comment:
       So the  `FileSourceSplit` will use the flink's fs interface to access the underlying files ?   We iceberg currently has our own `FileIO` interface,  the object storage services are implementing this interface to write/read data to cloud.  If we introduce flink fs here ,  I'm concerning that we have to implement both flink fs interfaces and iceberg FileIO interfaces for making the experimental unified source work.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/FlinkBulkFormatAdaptor.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This adaptor use Flink BulkFormat implementation to read data file.
+ * Note that Flink BulkFormat may not support row deletes like {@link RowDataIteratorBulkFormat}
+ */
+public class FlinkBulkFormatAdaptor<T> implements BulkFormat<T, IcebergSourceSplit> {
+
+  private final Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider;
+  private final TypeInformation<T> producedType;
+
+  public FlinkBulkFormatAdaptor(Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider) {
+    this.bulkFormatProvider = bulkFormatProvider;
+    // validate that all BulkFormat produce the same type
+    List<TypeInformation<T>> uniqueTypes = bulkFormatProvider.values().stream()
+        .map(bulkFormat -> bulkFormat.getProducedType())
+        .distinct()
+        .collect(Collectors.toList());
+    Preconditions.checkArgument(uniqueTypes.size() == 1,
+        "BulkFormats have the different producedType: " + uniqueTypes);
+    producedType = uniqueTypes.get(0);
+  }
+
+  @Override
+  public Reader<T> createReader(Configuration config, IcebergSourceSplit split) throws IOException {
+    return new ReaderAdaptor<T>(bulkFormatProvider, config, split, false);
+  }
+
+  @Override
+  public Reader<T> restoreReader(Configuration config, IcebergSourceSplit split) throws IOException {
+    return new ReaderAdaptor<T>(bulkFormatProvider, config, split, true);

Review comment:
       ditto

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/FlinkBulkFormatAdaptor.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This adaptor use Flink BulkFormat implementation to read data file.
+ * Note that Flink BulkFormat may not support row deletes like {@link RowDataIteratorBulkFormat}
+ */
+public class FlinkBulkFormatAdaptor<T> implements BulkFormat<T, IcebergSourceSplit> {
+
+  private final Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider;
+  private final TypeInformation<T> producedType;
+
+  public FlinkBulkFormatAdaptor(Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider) {
+    this.bulkFormatProvider = bulkFormatProvider;
+    // validate that all BulkFormat produce the same type
+    List<TypeInformation<T>> uniqueTypes = bulkFormatProvider.values().stream()
+        .map(bulkFormat -> bulkFormat.getProducedType())
+        .distinct()
+        .collect(Collectors.toList());
+    Preconditions.checkArgument(uniqueTypes.size() == 1,
+        "BulkFormats have the different producedType: " + uniqueTypes);
+    producedType = uniqueTypes.get(0);
+  }
+
+  @Override
+  public Reader<T> createReader(Configuration config, IcebergSourceSplit split) throws IOException {
+    return new ReaderAdaptor<T>(bulkFormatProvider, config, split, false);
+  }
+
+  @Override
+  public Reader<T> restoreReader(Configuration config, IcebergSourceSplit split) throws IOException {
+    return new ReaderAdaptor<T>(bulkFormatProvider, config, split, true);
+  }
+
+  @Override
+  public boolean isSplittable() {
+    return false;
+  }
+
+  @Override
+  public TypeInformation<T> getProducedType() {
+    return producedType;
+  }
+
+  private static final class ReaderAdaptor<T> implements BulkFormat.Reader<T> {
+
+    private final Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider;
+    private final Configuration config;
+    private final Iterator<FileScanTask> fileIterator;
+    private final boolean isRestored;
+
+    // file offset in CombinedScanTask
+    private int fileOffset = -1;
+    private Reader<T> currentReader;
+
+    ReaderAdaptor(
+        Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider,
+        Configuration config,
+        IcebergSourceSplit icebergSplit,
+        boolean isRestored) throws IOException {
+      this.config = config;
+      this.bulkFormatProvider = bulkFormatProvider;
+      this.fileIterator = icebergSplit.task().files().iterator();
+      this.isRestored = isRestored;
+
+      final CheckpointedPosition position = icebergSplit.checkpointedPosition();
+      if (position != null) {
+        // skip files based on offset in checkpointed position
+        Preconditions.checkArgument(position.getOffset() < icebergSplit.task().files().size(),
+            String.format("Checkpointed file offset is %d, while CombinedScanTask has %d files",
+                position.getOffset(), icebergSplit.task().files().size()));
+        for (int i = 0; i < position.getOffset(); ++i) {
+          fileIterator.next();
+          fileOffset++;
+        }
+        // first file may need to skip records
+        setupReader(position.getRecordsAfterOffset());
+      } else {
+        setupReader(0L);
+      }
+    }
+
+    /**
+     * TODO: we can't return RecordIterator with empty data.
+     * Otherwise, caller may assume it is end of input.
+     * We probably need to add a {@code hasNext()} API to
+     * {@link RecordIterator} to achieve the goal.
+     */
+    @Nullable
+    @Override
+    public RecordIterator<T> readBatch() throws IOException {
+      RecordIterator<T> result = null;
+      while (currentReader != null || fileIterator.hasNext()) {
+        if (currentReader == null) {
+          setupReader(0L);
+        } else {
+          result = currentReader.readBatch();
+          if (result != null) {
+            break;
+          } else {
+            closeCurrentReader();
+          }
+        }
+      }
+      if (result == null) {
+        return null;
+      } else {
+        return new RecordIteratorAdaptor(fileOffset, result);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      closeCurrentReader();
+    }
+
+    private void closeCurrentReader() throws IOException {
+      if (currentReader != null) {
+        currentReader.close();
+        currentReader = null;
+      }
+    }
+
+    private void setupReader(long skipRecordCount) throws IOException {
+      if (fileIterator.hasNext()) {
+        final FileScanTask fileScanTask = fileIterator.next();
+        final FileFormat fileFormat = fileScanTask.file().format();
+        if (!bulkFormatProvider.containsKey(fileFormat)) {
+          throw new IOException("Unsupported file format: " + fileFormat);
+        }
+        final BulkFormat<T, FileSourceSplit> bulkFormat = bulkFormatProvider.get(fileFormat);
+        fileOffset++;
+        final FileSourceSplit fileSourceSplit = new FileSourceSplit(
+            "",
+            new Path(URI.create(fileScanTask.file().path().toString())),
+            fileScanTask.start(),
+            fileScanTask.length(),
+            new String[0],
+            // Since this is always for a single data file and some format
+            // (like ParquetVectorizedInputFormat) requires NO_OFFSET,
+            // we just always set the file offset to NO_OFFSET.
+            new CheckpointedPosition(CheckpointedPosition.NO_OFFSET, skipRecordCount));
+        if (isRestored) {
+          currentReader = bulkFormat.restoreReader(config, fileSourceSplit);
+        } else {
+          currentReader = bulkFormat.createReader(config, fileSourceSplit);
+        }
+      } else {
+        closeCurrentReader();
+      }
+    }
+  }
+
+  private static final class RecordIteratorAdaptor<T> implements RecordIterator<T> {
+
+    private final long fileOffset;
+    private final RecordIterator<T> iterator;
+    private final MutableRecordAndPosition mutableRecordAndPosition;

Review comment:
       Nit:  use `MutableRecordAndPosition<T>` here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r704916391



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +40,10 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Integer> SOURCE_READER_FETCH_BATCH_SIZE = ConfigOptions
+      .key("source.iceberg.reader.fetch-batch-size")
+      .intType()
+      .defaultValue(2048)

Review comment:
       It's byte size , or row size (I guess it's row-size) ?  From the name,  I did not get the meaning at the first glance. Maybe we will need a more clear name ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739335609



##########
File path: flink-runtime/build.gradle
##########
@@ -42,6 +42,8 @@ project(':iceberg-flink-runtime') {
     implementation(project(':iceberg-nessie')) {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
+
+    implementation "org.apache.flink:flink-connector-base"

Review comment:
       For other dependencies that we expect to be present at runtime, we use compileOnly so the dependency doesn't leak into the runtime Jar. Is that something we should do here as well? This looks like it would add the Flink Jar into `runtimeClasspath`, which would get included in the Jar.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739337847



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +40,10 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Integer> SOURCE_READER_FETCH_RECORD_BATCH_SIZE = ConfigOptions
+      .key("source.iceberg.reader.fetch-record-batch-size")

Review comment:
       Is there precedent for this config key? What other keys are similar? The others in this file start with `table.exec.iceberg`. Is there a reason for not continuing with that convention?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739338158



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -37,20 +38,46 @@
  */
 @Internal
 public class DataIterator<T> implements CloseableIterator<T> {
-

Review comment:
       Can you revert whitespace changes? They are likely to cause unnecessary commit conflicts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#issuecomment-963553042


   closing this PR for now. Will further break it down to smaller PR as suggested by @openinx 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#issuecomment-868805669


   @holdenk thx a lot for taking a look. @rdblue @JingsongLi please help take a look and move the new FLIP-27 based Iceberg source move forward if you can. It is part 3 of this uber PR #2105
   
   Right now, there is a pending decision. Currently, this PR is based on the premise of reusing the BulkFormat from Flink for vectorized readers (for Parquet, Orc etc.), originally suggested by @JingsongLi . I am rethinking that choice. It is unlikely Flink (vectorized) readers with support delete filters like Iceberg readers. Maybe iceberg-flink module needs to have its own vectorized readers to support deletes. @zhangjun0x01 already submitted a PR #2566 for Orc. Then this PR needs to be adjusted and break away from flink file source.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r705907404



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -27,23 +27,39 @@
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
-class FlinkSplitGenerator {
+public class FlinkSplitGenerator {
   private FlinkSplitGenerator() {
   }
 
   static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
-    List<CombinedScanTask> tasks = tasks(table, context);
-    FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
+    try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
+      List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
+      FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
+      for (int i = 0; i < tasks.size(); i++) {
+        splits[i] = new FlinkInputSplit(i, tasks.get(i));
+      }
+      return splits;
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to process tasks iterable", e);
+    }
+  }
+
+  public static List<IcebergSourceSplit> planIcebergSourceSplits(

Review comment:
       actually, I think we should rename `createInputSplits` to `planFlinkInputSplit`. We are not `creating` splits out of nowhere. Both are just discover/plan splits from table and call the same `planTasks`. I can add some javadoc on the new `planIcebergSourceSplits` method.
   
   Since `createInputSplits` is non-public, we should be safe to rename.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r705900345



##########
File path: build.gradle
##########
@@ -141,6 +141,7 @@ subprojects {
     def buildLog = new File(logFile)
     addTestOutputListener(new TestOutputListener() {
       def lastDescriptor
+

Review comment:
       sorry. that was a mistake. will fix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r659013229



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -23,9 +23,9 @@
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
-public class FlinkTableOptions {
+public class FlinkConfigOptions {

Review comment:
       the class is use internally. the config keys are public contract and weren't be changed. Let me move this one out to a separate small PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#issuecomment-795362250


   Actually,  I did not fully understand the whole PR (https://github.com/apache/iceberg/pull/2105) before, I think I will need more time to understand the whole codes firstly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r666431755



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderFactory.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+
+public interface ReaderFactory<T> extends Serializable {
+
+  /**
+   * An iterator over records with their position in the file. The iterator is closeable to
+   * support clean resource release and recycling.
+   *
+   * @param <T> The type of the record.
+   */
+  interface RecordIterator<T> {
+
+    /**
+     * Gets the next record from the file, together with its position.
+     *
+     * <p>The position information returned with the record point to the record AFTER the
+     * returned record, because it defines the point where the reading should resume once the
+     * current record is emitted. The position information is put in the source's state when the
+     * record is emitted. If a checkpoint is taken directly after the record is emitted, the
+     * checkpoint must to describe where to resume the source reading from after that record.
+     *
+     * <p>Objects returned by this method may be reused by the iterator. By the time that this
+     * method is called again, no object returned from the previous call will be referenced any
+     * more. That makes it possible to have a single {@link MutableRecordAndPosition} object and
+     * return the same instance (with updated record and position) on every call.
+     */
+    @Nullable
+    RecordAndPosition<T> next();
+
+    /**
+     * Releases the batch that this iterator iterated over. This is not supposed to close the
+     * reader and its resources, but is simply a signal that this iterator is no used any more.
+     * This method can be used as a hook to recycle/reuse heavyweight object structures.
+     */
+    void releaseBatch();
+  }
+
+  /**
+   * A batch reader for a {@link IcebergSourceSplit}
+   *
+   * @param <T> output record type
+   */
+  interface Reader<T> {
+
+    /**
+     * Reads one batch. The method should return null when reaching the end of the input. The
+     * returned batch will be handed over to the processing threads as one.
+     *
+     * <p>The returned iterator object and any contained objects may be held onto by the
+     * source for some time, so it should not be immediately reused by the reader.
+     *
+     * <p>To implement reuse and to save object allocation, consider using a {@link
+     * org.apache.flink.connector.file.src.util.Pool} and recycle objects into the Pool in the
+     * the {@link RecordIterator#releaseBatch()} method.
+     */
+    @Nullable
+    RecordIterator<T> readBatch() throws IOException;
+
+    /**
+     * Closes the reader and release all resources
+     */
+    void close() throws IOException;
+  }
+
+
+  /**
+   * Create a batch reader for the input split
+   *
+   * @param config Flink configuration
+   * @param split  Iceberg source split
+   * @return a batch reader
+   */
+  Reader<T> create(Configuration config, IcebergSourceSplit split);

Review comment:
       make sense. will change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r742176377



##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       yeah. that is my plan too. Once 1.12 support is removed, we should be able to move files back to the common module. We just need to be diligent with these efforts.

##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       yeah. that is my plan too. Once 1.12 support is removed, we should be able to move files back to the common module. We just need to be diligent with these efforts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r742094267



##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       @stevenzwu, I think that copying the parts that change is reasonable. And once we remove support for 1.12, you can move the files back into the common module.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739954231



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
##########
@@ -68,6 +68,9 @@
   private static final ConfigOption<Duration> MONITOR_INTERVAL =
       ConfigOptions.key("monitor-interval").durationType().defaultValue(Duration.ofSeconds(10));
 
+  private static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS =
+      ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);

Review comment:
       this is needed for event time aligned assigner / rough ordering where we use the min-max stats from timestamp column to order the splits and assignment. it is not directly related to the reader part. It is just part of the sub PR as I copied the classes from the the uber PR #2105 for this sub PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739349320



##########
File path: flink-runtime/build.gradle
##########
@@ -42,6 +42,8 @@ project(':iceberg-flink-runtime') {
     implementation(project(':iceberg-nessie')) {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
+
+    implementation "org.apache.flink:flink-connector-base"

Review comment:
       this is the flink-runtime module. Hence we used `implementation` dep to pull in `flink-connector-base` for the `iceberg-flink-runtime` jar. In the flink module below, it is `compileOnly`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] tweise commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739376715



##########
File path: build.gradle
##########
@@ -406,6 +409,8 @@ project(':iceberg-flink-runtime') {
     compile(project(':iceberg-nessie')) {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
+
+    compile "org.apache.flink:flink-connector-base"

Review comment:
       It would be good to add a comment since this question is almost certain to come up again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739571571



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
##########
@@ -68,6 +68,9 @@
   private static final ConfigOption<Duration> MONITOR_INTERVAL =
       ConfigOptions.key("monitor-interval").durationType().defaultValue(Duration.ofSeconds(10));
 
+  private static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS =
+      ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);

Review comment:
       How is this addition related to FLIP-27? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739335609



##########
File path: flink-runtime/build.gradle
##########
@@ -42,6 +42,8 @@ project(':iceberg-flink-runtime') {
     implementation(project(':iceberg-nessie')) {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
+
+    implementation "org.apache.flink:flink-connector-base"

Review comment:
       For other dependencies that we expect to be present at runtime, we use compileOnly so the dependency doesn't leak into the runtime Jar. Is that something we should do here as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r659009523



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.IcebergSourceEvents;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.MutableIcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+public class IcebergSourceReader<T> extends

Review comment:
       sounds good. will do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r659017556



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
##########
@@ -34,7 +34,7 @@
 /**
  * Context object with optional arguments for a Flink Scan.
  */
-class ScanContext implements Serializable {
+public class ScanContext implements Serializable {

Review comment:
       because now this class is accessed by classes in sub package (like `reader/RowDataIteratorBulkFormat.java`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739349320



##########
File path: flink-runtime/build.gradle
##########
@@ -42,6 +42,8 @@ project(':iceberg-flink-runtime') {
     implementation(project(':iceberg-nessie')) {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
+
+    implementation "org.apache.flink:flink-connector-base"

Review comment:
       this is the flink-runtime module. In the flink module below, it is `compileOnly`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739571342



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
##########
@@ -48,7 +48,6 @@
 
 @Internal
 public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
-

Review comment:
       Looks like this file doesn't need to change. Can you remove this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739571861



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
##########
@@ -140,7 +140,7 @@ private void monitorAndForwardSplits() {
         newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId);
       }
 
-      FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext);
+      FlinkInputSplit[] splits = FlinkSplitPlanner.planInputSplits(table, newScanContext);

Review comment:
       Does this keep the monitor function so that it can maintain the old API? Do we need to maintain the old API?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739573099



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayPoolDataIteratorBatcher.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.Position;
+import org.apache.iceberg.io.CloseableIterator;
+
+class ArrayPoolDataIteratorBatcher<T> implements DataIteratorBatcher<T> {
+  private final Configuration config;
+  private final RecordFactory<T> recordFactory;
+
+  ArrayPoolDataIteratorBatcher(Configuration config, RecordFactory<T> recordFactory) {
+    this.config = config;
+    this.recordFactory = recordFactory;
+  }
+
+  @Override
+  public CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> apply(
+      String splitId, DataIterator<T> inputIterator) {
+    return new ArrayPoolBatchIterator(splitId, inputIterator);
+  }
+
+  private class ArrayPoolBatchIterator implements CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> {
+
+    private final String splitId;
+    private final DataIterator<T> inputIterator;
+    private final int batchSize;
+    private final Pool<T[]> pool;
+
+    ArrayPoolBatchIterator(String splitId, DataIterator<T> inputIterator) {
+      this.splitId = splitId;
+      this.inputIterator = inputIterator;
+      this.batchSize = config.getInteger(FlinkConfigOptions.SOURCE_READER_FETCH_RECORD_BATCH_SIZE);
+      this.pool = createPoolOfBatches(config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
+    }
+
+    @Override
+    public boolean hasNext() {
+      return inputIterator.hasNext();
+    }
+
+    @Override
+    public RecordsWithSplitIds<RecordAndPosition<T>> next() {
+      final T[] batch = getCachedEntry();
+      int num = 0;
+      while (inputIterator.hasNext() && num < batchSize) {
+        T nextRecord = inputIterator.next();
+        recordFactory.clone(nextRecord, batch[num]);

Review comment:
       What is this doing? Is `clone` expensive?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739570663



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -41,16 +42,45 @@
   private final FileScanTaskReader<T> fileScanTaskReader;
 
   private final InputFilesDecryptor inputFilesDecryptor;
+  private final CombinedScanTask combinedTask;
+
   private Iterator<FileScanTask> tasks;
   private CloseableIterator<T> currentIterator;
+  private Position position;
 
   public DataIterator(FileScanTaskReader<T> fileScanTaskReader, CombinedScanTask task,
                       FileIO io, EncryptionManager encryption) {
     this.fileScanTaskReader = fileScanTaskReader;
 
     this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
+    this.combinedTask = task;
+
     this.tasks = task.files().iterator();

Review comment:
       I'd probably rename this to `fileTasks` since there is now a `combinedTask`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739570881



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -41,16 +42,45 @@
   private final FileScanTaskReader<T> fileScanTaskReader;
 
   private final InputFilesDecryptor inputFilesDecryptor;
+  private final CombinedScanTask combinedTask;
+
   private Iterator<FileScanTask> tasks;
   private CloseableIterator<T> currentIterator;
+  private Position position;
 
   public DataIterator(FileScanTaskReader<T> fileScanTaskReader, CombinedScanTask task,
                       FileIO io, EncryptionManager encryption) {
     this.fileScanTaskReader = fileScanTaskReader;
 
     this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
+    this.combinedTask = task;
+
     this.tasks = task.files().iterator();
     this.currentIterator = CloseableIterator.empty();
+    // fileOffset starts at -1 because we started
+    // from an empty iterator that is not from the split files.
+    this.position = new Position(-1L, 0L);
+  }
+
+  public void seek(Position startingPosition) {
+    // skip files
+    Preconditions.checkArgument(startingPosition.fileOffset() < combinedTask.files().size(),
+        "Checkpointed file offset is %d, while CombinedScanTask has %d files",
+        startingPosition.fileOffset(), combinedTask.files().size());
+    for (long i = 0L; i < startingPosition.fileOffset(); ++i) {
+      tasks.next();
+    }
+    updateCurrentIterator();
+    // skip records within the file
+    for (long i = 0; i < startingPosition.recordOffset(); ++i) {
+      if (hasNext()) {
+        next();
+      } else {
+        throw new IllegalStateException("Not enough records to skip: " +
+            startingPosition.recordOffset());
+      }
+    }
+    this.position.update(startingPosition.fileOffset(), startingPosition.recordOffset());

Review comment:
       Can `position` be final since this is using `update`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739952562



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
##########
@@ -22,33 +22,58 @@
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
+import org.apache.flink.annotation.Internal;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
-class FlinkSplitGenerator {
-  private FlinkSplitGenerator() {
+@Internal
+public class FlinkSplitPlanner {
+  private FlinkSplitPlanner() {
   }
 
-  static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
-    List<CombinedScanTask> tasks = tasks(table, context);
-    FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
+  static FlinkInputSplit[] planInputSplits(Table table, ScanContext context) {

Review comment:
       `create/generate` implies creating sth new. This is actually plan/discover splits from table. Hence changed the method name. I actually also renamed the class name from `FlinkSplitGenerator` to `FlinkSplitPlanner`. This is an internal class. So it shouldn't break user code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r716345243



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/RecyclableArrayIterator.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import javax.annotation.Nullable;
+import org.apache.flink.connector.file.src.util.ArrayResultIterator;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.io.CloseableIterator;
+
+/**
+ * Similar to the {@link ArrayResultIterator}.
+ * Main difference is the records array can be recycled back to a pool.
+ */
+final class RecyclableArrayIterator<E> implements CloseableIterator<RecordAndPosition<E>> {
+  private final Pool.Recycler<E[]> recycler;
+  private final E[] records;
+  private final int num;
+  private final MutableRecordAndPosition<E> recordAndPosition;
+
+  private int pos;
+
+  RecyclableArrayIterator(Pool.Recycler<E[]> recycler) {
+    this(recycler, null, 0, CheckpointedPosition.NO_OFFSET, 0L);
+  }
+
+  /**
+   * Each record's {@link RecordAndPosition} will have the same fileOffset (for {@link RecordAndPosition#getOffset()}.

Review comment:
       will move




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r716344462



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/FileRecords.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.io.CloseableIterator;
+
+/**
+ * A batch of recrods for one split
+ */
+@Internal
+public class FileRecords<T> implements RecordsWithSplitIds<RecordAndPosition<T>> {

Review comment:
       I am going to rename it to `SplitRecords`. Javadoc explains the purpose of this class. yes, it is for "fetch result"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r666404654



##########
File path: build.gradle
##########
@@ -140,10 +140,11 @@ subprojects {
     def buildLog = new File(logFile)
     addTestOutputListener(new TestOutputListener() {
       def lastDescriptor
+
       @Override
       void onOutput(TestDescriptor testDescriptor, TestOutputEvent testOutputEvent) {
         if (lastDescriptor != testDescriptor) {
-          buildLog << "--------\n- Test log for: "<< testDescriptor << "\n--------\n"
+          buildLog << "--------\n- Test log for: " << testDescriptor << "\n--------\n"

Review comment:
       yeah. annoying auto-formatting from Intellij. will fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r667701790



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/DataIteratorReaderFactory.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.CloseableIterator;
+
+public abstract class DataIteratorReaderFactory<T> implements ReaderFactory<T> {
+
+  private final Configuration config;
+  private final DataIteratorBatcher<T> batcher;
+
+  public DataIteratorReaderFactory(Configuration config, DataIteratorBatcher<T> batcher) {

Review comment:
       @sundargates I introduced this Batcher interface. RowDataIterator reuses RowData object so that we need to use array pool for the reader. Avro iterator doesn't reuse object. So we don't need to use array pool and clone object. A different Batcher can be plugged in for Avro record output type reader




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739573099



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayPoolDataIteratorBatcher.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.Position;
+import org.apache.iceberg.io.CloseableIterator;
+
+class ArrayPoolDataIteratorBatcher<T> implements DataIteratorBatcher<T> {
+  private final Configuration config;
+  private final RecordFactory<T> recordFactory;
+
+  ArrayPoolDataIteratorBatcher(Configuration config, RecordFactory<T> recordFactory) {
+    this.config = config;
+    this.recordFactory = recordFactory;
+  }
+
+  @Override
+  public CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> apply(
+      String splitId, DataIterator<T> inputIterator) {
+    return new ArrayPoolBatchIterator(splitId, inputIterator);
+  }
+
+  private class ArrayPoolBatchIterator implements CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> {
+
+    private final String splitId;
+    private final DataIterator<T> inputIterator;
+    private final int batchSize;
+    private final Pool<T[]> pool;
+
+    ArrayPoolBatchIterator(String splitId, DataIterator<T> inputIterator) {
+      this.splitId = splitId;
+      this.inputIterator = inputIterator;
+      this.batchSize = config.getInteger(FlinkConfigOptions.SOURCE_READER_FETCH_RECORD_BATCH_SIZE);
+      this.pool = createPoolOfBatches(config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
+    }
+
+    @Override
+    public boolean hasNext() {
+      return inputIterator.hasNext();
+    }
+
+    @Override
+    public RecordsWithSplitIds<RecordAndPosition<T>> next() {
+      final T[] batch = getCachedEntry();
+      int num = 0;
+      while (inputIterator.hasNext() && num < batchSize) {
+        T nextRecord = inputIterator.next();
+        recordFactory.clone(nextRecord, batch[num]);

Review comment:
       What is this doing? Is `clone` expensive?
   
   It could be that the record produced by `inputIterator` is reused and `clone` call is making a copy because this can't call `inputIterator.next()` until the copy is made since the record is not consumed immediately. If that's the case, then I think there should be a comment to point out what's going on.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739955288



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayPoolDataIteratorBatcher.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.Position;
+import org.apache.iceberg.io.CloseableIterator;
+
+class ArrayPoolDataIteratorBatcher<T> implements DataIteratorBatcher<T> {

Review comment:
       will add Javadoc to explain




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r704914760



##########
File path: build.gradle
##########
@@ -406,6 +409,8 @@ project(':iceberg-flink-runtime') {
     compile(project(':iceberg-nessie')) {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
+
+    compile "org.apache.flink:flink-connector-base"

Review comment:
       This jar was not included into the flink-dist binary package, so we have to include it into iceberg-flink-runtime jar explicitly ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r705904185



##########
File path: build.gradle
##########
@@ -320,6 +321,7 @@ project(':iceberg-flink') {
     compile project(':iceberg-parquet')
     compile project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       I did a little research and saw Stephan recently merged the `@PublicEvolve` annotation to all 3 interfaces that you pointed out: https://issues.apache.org/jira/browse/FLINK-22358




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r716344755



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import javax.annotation.Nullable;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.CloseableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, IcebergSourceSplit> {

Review comment:
       right now, I don't anticipate any need of extending from this class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r661611900



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.split;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+
+public class IcebergSourceSplit extends FileSourceSplit {

Review comment:
       Was hoping to extend from flink-connector-files to leverage the vectorized readers in Flink. I have been revisiting that decision. After discussion with @openinx offline, we agreed that iceberg-flink probably should have its own vectorized readers (mainly for delete filters in the future). flink-connector-files' vectorized readers won't support delete filter, as it is a concept for Iceberg only (not the raw file format like Parquet or Orc). Will update this PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r704925981



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -27,23 +27,39 @@
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
-class FlinkSplitGenerator {
+public class FlinkSplitGenerator {
   private FlinkSplitGenerator() {
   }
 
   static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
-    List<CombinedScanTask> tasks = tasks(table, context);
-    FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
+    try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
+      List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
+      FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
+      for (int i = 0; i < tasks.size(); i++) {
+        splits[i] = new FlinkInputSplit(i, tasks.get(i));
+      }
+      return splits;
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to process tasks iterable", e);
+    }
+  }
+
+  public static List<IcebergSourceSplit> planIcebergSourceSplits(

Review comment:
       It's good to align with the `createInputSplits` by naming this as `createIcebergSourceSplits`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r740416875



##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       @openinx Maybe follow-up on the other [comment discussion](https://github.com/apache/iceberg/pull/3354/files#r740053509) here.
   
   With the SplitEnumerator API change, looks like I need to put FLIP-27 source in the `v1.13` folder. What should we do with future versions (like 1.14)? do we copy the FLIP-27 source code from `v1.13` to `v1.14` folder?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r593986709



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/FlinkBulkFormatAdaptor.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This adaptor use Flink BulkFormat implementation to read data file.
+ * Note that Flink BulkFormat may not support row deletes like {@link RowDataIteratorBulkFormat}
+ */
+public class FlinkBulkFormatAdaptor<T> implements BulkFormat<T, IcebergSourceSplit> {
+
+  private final Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider;
+  private final TypeInformation<T> producedType;
+
+  public FlinkBulkFormatAdaptor(Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider) {
+    this.bulkFormatProvider = bulkFormatProvider;
+    // validate that all BulkFormat produce the same type
+    List<TypeInformation<T>> uniqueTypes = bulkFormatProvider.values().stream()
+        .map(bulkFormat -> bulkFormat.getProducedType())
+        .distinct()
+        .collect(Collectors.toList());
+    Preconditions.checkArgument(uniqueTypes.size() == 1,
+        "BulkFormats have the different producedType: " + uniqueTypes);
+    producedType = uniqueTypes.get(0);
+  }
+
+  @Override
+  public Reader<T> createReader(Configuration config, IcebergSourceSplit split) throws IOException {
+    return new ReaderAdaptor<T>(bulkFormatProvider, config, split, false);
+  }
+
+  @Override
+  public Reader<T> restoreReader(Configuration config, IcebergSourceSplit split) throws IOException {
+    return new ReaderAdaptor<T>(bulkFormatProvider, config, split, true);
+  }
+
+  @Override
+  public boolean isSplittable() {
+    return false;
+  }
+
+  @Override
+  public TypeInformation<T> getProducedType() {
+    return producedType;
+  }
+
+  private static final class ReaderAdaptor<T> implements BulkFormat.Reader<T> {
+
+    private final Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider;
+    private final Configuration config;
+    private final Iterator<FileScanTask> fileIterator;
+    private final boolean isRestored;
+
+    // file offset in CombinedScanTask
+    private int fileOffset = -1;
+    private Reader<T> currentReader;
+
+    ReaderAdaptor(
+        Map<FileFormat, BulkFormat<T, FileSourceSplit>> bulkFormatProvider,
+        Configuration config,
+        IcebergSourceSplit icebergSplit,
+        boolean isRestored) throws IOException {
+      this.config = config;
+      this.bulkFormatProvider = bulkFormatProvider;
+      this.fileIterator = icebergSplit.task().files().iterator();
+      this.isRestored = isRestored;
+
+      final CheckpointedPosition position = icebergSplit.checkpointedPosition();
+      if (position != null) {
+        // skip files based on offset in checkpointed position
+        Preconditions.checkArgument(position.getOffset() < icebergSplit.task().files().size(),
+            String.format("Checkpointed file offset is %d, while CombinedScanTask has %d files",
+                position.getOffset(), icebergSplit.task().files().size()));
+        for (int i = 0; i < position.getOffset(); ++i) {
+          fileIterator.next();
+          fileOffset++;
+        }
+        // first file may need to skip records
+        setupReader(position.getRecordsAfterOffset());
+      } else {
+        setupReader(0L);
+      }
+    }
+
+    /**
+     * TODO: we can't return RecordIterator with empty data.
+     * Otherwise, caller may assume it is end of input.
+     * We probably need to add a {@code hasNext()} API to
+     * {@link RecordIterator} to achieve the goal.
+     */
+    @Nullable
+    @Override
+    public RecordIterator<T> readBatch() throws IOException {
+      RecordIterator<T> result = null;
+      while (currentReader != null || fileIterator.hasNext()) {
+        if (currentReader == null) {
+          setupReader(0L);
+        } else {
+          result = currentReader.readBatch();
+          if (result != null) {
+            break;
+          } else {
+            closeCurrentReader();
+          }
+        }
+      }
+      if (result == null) {
+        return null;
+      } else {
+        return new RecordIteratorAdaptor(fileOffset, result);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      closeCurrentReader();
+    }
+
+    private void closeCurrentReader() throws IOException {
+      if (currentReader != null) {
+        currentReader.close();
+        currentReader = null;
+      }
+    }
+
+    private void setupReader(long skipRecordCount) throws IOException {
+      if (fileIterator.hasNext()) {
+        final FileScanTask fileScanTask = fileIterator.next();
+        final FileFormat fileFormat = fileScanTask.file().format();
+        if (!bulkFormatProvider.containsKey(fileFormat)) {
+          throw new IOException("Unsupported file format: " + fileFormat);
+        }
+        final BulkFormat<T, FileSourceSplit> bulkFormat = bulkFormatProvider.get(fileFormat);
+        fileOffset++;
+        final FileSourceSplit fileSourceSplit = new FileSourceSplit(
+            "",
+            new Path(URI.create(fileScanTask.file().path().toString())),

Review comment:
       We extended from `FileSourceSplit` mainly for the `BulkFormat` batch reader interface so that we can plug in vectorized readers from Flink. I am also debating if this is the right thing to do as mentioned in the original description.
   
   But this is not really relevant to FileIO, which deals with underneath filesystem like S3. Here we are mainly talking about file format reader (like Parquet, Orc).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r742093418



##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       @openinx, the tests run against the iceberg-flink module. They aren't present in the 1.12 or 1.13 modules. If you want them to be run for those modules, you'd need to add the source folder like you do for `src/main/java`. If you choose to do that, let's also remove CI for the common module since we don't need to run the tests outside of 1.12 and 1.13 if they are run in those modules.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r705907404



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -27,23 +27,39 @@
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
-class FlinkSplitGenerator {
+public class FlinkSplitGenerator {
   private FlinkSplitGenerator() {
   }
 
   static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
-    List<CombinedScanTask> tasks = tasks(table, context);
-    FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
+    try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
+      List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
+      FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
+      for (int i = 0; i < tasks.size(); i++) {
+        splits[i] = new FlinkInputSplit(i, tasks.get(i));
+      }
+      return splits;
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to process tasks iterable", e);
+    }
+  }
+
+  public static List<IcebergSourceSplit> planIcebergSourceSplits(

Review comment:
       actually, I think we should rename `createInputSplits` to `planFlinkInputSplit`. We are not `creating` splits out of nowhere. Both are just discover/plan splits from table by calling the same `planTasks`. I can add some javadoc on the new `planIcebergSourceSplits` method.
   
   Since `createInputSplits` is non-public, we should be safe to rename.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r716343043



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/DataIteratorBatcher.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.io.CloseableIterator;
+
+@FunctionalInterface

Review comment:
       will add




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r667701790



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/DataIteratorReaderFactory.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.CloseableIterator;
+
+public abstract class DataIteratorReaderFactory<T> implements ReaderFactory<T> {
+
+  private final Configuration config;
+  private final DataIteratorBatcher<T> batcher;
+
+  public DataIteratorReaderFactory(Configuration config, DataIteratorBatcher<T> batcher) {

Review comment:
       @sundargates I introduced this Batcher interface. RowDataIterator reuses RowData object so that we need to use array pool for the reader. Avro iterator doesn't reuse object. So we don't need to use array pool and clone object. A different Batcher can be plugged in for Avro record




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r661609683



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.split;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+
+public class IcebergSourceSplit extends FileSourceSplit {
+
+  public enum Status {

Review comment:
       make sense. will move it out




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739570507



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -41,16 +42,45 @@
   private final FileScanTaskReader<T> fileScanTaskReader;
 
   private final InputFilesDecryptor inputFilesDecryptor;
+  private final CombinedScanTask combinedTask;
+
   private Iterator<FileScanTask> tasks;
   private CloseableIterator<T> currentIterator;
+  private Position position;
 
   public DataIterator(FileScanTaskReader<T> fileScanTaskReader, CombinedScanTask task,
                       FileIO io, EncryptionManager encryption) {
     this.fileScanTaskReader = fileScanTaskReader;
 
     this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
+    this.combinedTask = task;
+
     this.tasks = task.files().iterator();
     this.currentIterator = CloseableIterator.empty();
+    // fileOffset starts at -1 because we started
+    // from an empty iterator that is not from the split files.
+    this.position = new Position(-1L, 0L);
+  }
+
+  public void seek(Position startingPosition) {
+    // skip files
+    Preconditions.checkArgument(startingPosition.fileOffset() < combinedTask.files().size(),
+        "Checkpointed file offset is %d, while CombinedScanTask has %d files",
+        startingPosition.fileOffset(), combinedTask.files().size());
+    for (long i = 0L; i < startingPosition.fileOffset(); ++i) {

Review comment:
       Is `fileOffset()` a `long`? That seems odd to me. When would you need to address more than 2 billion files in a single combined scan task?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r740147179



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -41,18 +42,47 @@
   private final FileScanTaskReader<T> fileScanTaskReader;
 
   private final InputFilesDecryptor inputFilesDecryptor;
-  private Iterator<FileScanTask> tasks;
+  private final CombinedScanTask combinedTask;
+  private final Position position;
+
+  private Iterator<FileScanTask> fileTasksIterator;
   private CloseableIterator<T> currentIterator;
 
   public DataIterator(FileScanTaskReader<T> fileScanTaskReader, CombinedScanTask task,
                       FileIO io, EncryptionManager encryption) {
     this.fileScanTaskReader = fileScanTaskReader;
 
     this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
-    this.tasks = task.files().iterator();
+    this.combinedTask = task;
+    // fileOffset starts at -1 because we started
+    // from an empty iterator that is not from the split files.
+    this.position = new Position(-1, 0L);

Review comment:
       The general `DataIterator` don't use the position or `seek` method to skip tasks or records.  Putting all the flip-27 related logics in the flink common read path does not make sense to me, because every times when I read this class, I need to see which part is related to flip-27, which is the unrelated part. 
   
   I will suggest to introduce a separate SeekableDataIterator to isolate the two code path, I made a simple commit for this: https://github.com/openinx/incubator-iceberg/commit/b08dde86aae0c718d9d72acb347dffb3a836b336, you may want to take a look.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739950191



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -41,16 +42,45 @@
   private final FileScanTaskReader<T> fileScanTaskReader;
 
   private final InputFilesDecryptor inputFilesDecryptor;
+  private final CombinedScanTask combinedTask;
+
   private Iterator<FileScanTask> tasks;
   private CloseableIterator<T> currentIterator;
+  private Position position;
 
   public DataIterator(FileScanTaskReader<T> fileScanTaskReader, CombinedScanTask task,
                       FileIO io, EncryptionManager encryption) {
     this.fileScanTaskReader = fileScanTaskReader;
 
     this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
+    this.combinedTask = task;
+
     this.tasks = task.files().iterator();
     this.currentIterator = CloseableIterator.empty();
+    // fileOffset starts at -1 because we started
+    // from an empty iterator that is not from the split files.
+    this.position = new Position(-1L, 0L);
+  }
+
+  public void seek(Position startingPosition) {
+    // skip files
+    Preconditions.checkArgument(startingPosition.fileOffset() < combinedTask.files().size(),
+        "Checkpointed file offset is %d, while CombinedScanTask has %d files",
+        startingPosition.fileOffset(), combinedTask.files().size());
+    for (long i = 0L; i < startingPosition.fileOffset(); ++i) {
+      tasks.next();
+    }
+    updateCurrentIterator();
+    // skip records within the file
+    for (long i = 0; i < startingPosition.recordOffset(); ++i) {
+      if (hasNext()) {
+        next();
+      } else {
+        throw new IllegalStateException("Not enough records to skip: " +
+            startingPosition.recordOffset());
+      }
+    }
+    this.position.update(startingPosition.fileOffset(), startingPosition.recordOffset());

Review comment:
       yes, will change `Position` to final




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r740086825



##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       It's strange the the build for flink 1.12 & flink 1.13 has been passed, because I don't see the same dependency are added to flink 1.12 build.gradle and 1.13 build.gradle.  Maybe I need to check the 1.12's build.gradle again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#issuecomment-939561484


   @stevenzwu, I'll try to review this in the next week. Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx edited a comment on pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
openinx edited a comment on pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#issuecomment-795362250


   Actually,  I did not fully understand the whole PR (https://github.com/apache/iceberg/pull/2105) when reviewing this separate PR, I think I will need more time to understand the whole codes firstly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r704912925



##########
File path: build.gradle
##########
@@ -320,6 +321,7 @@ project(':iceberg-flink') {
     compile project(':iceberg-parquet')
     compile project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       The interfaces (such as `SplitReader`, `RecordsWithSplitIds`, `SourceReaderOptions`) from this jar  are stable enough to expose for downstream project like iceberg , ha ?   I rise this Q because I don't see it's marked with a `Public` annotation and seems don't have any compatibility guarantee ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu edited a comment on pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu edited a comment on pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#issuecomment-868805669


   @holdenk thx a lot for taking a look. will rebase it after the decision on the vectorized readers below.
   
   @rdblue @JingsongLi please help take a look and move the new FLIP-27 based Iceberg source move forward if you can. It is part 3 of this uber PR #2105
   
   Right now, there is a pending decision. Currently, this PR is based on the premise of reusing the BulkFormat from Flink for vectorized readers (for Parquet, Orc etc.), originally suggested by @JingsongLi . I am rethinking that choice. It is unlikely Flink (vectorized) readers with support delete filters like Iceberg readers. Maybe iceberg-flink module needs to have its own vectorized readers to support deletes. @zhangjun0x01 already submitted a PR #2566 for Orc. Then this PR needs to be adjusted and break away from flink file source.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#issuecomment-840902665


   @openinx if we are committed to have vectorized reader in `iceberg-flink` that @zhangjun0x01 is working on (PR #2566 ), then I will update this PR and avoid the dep/extension from `FileSourceSplit` and `BulkFormat`. The only reason I did it is to reuse the vectorized readers from Flink.
   
   I am also wondering if the vectorized readers support deletes filtering? I know Flink vectorized readers impl for sure won't support as it is an Iceberg concept.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#issuecomment-872291815


   @openinx @sundargates @holdenk @tweise Now this sub PR is ready for review. 
   
   I have changed the code not to extend from FileSourceSplit and BulkFormat, as we are aligned that Iceberg source reader probably can't reuse the vectorized readers from Flink. The main reason is that future Iceberg V2 format supports deletes, which is a concept applicable to Iceberg (not the raw file formats like Orc, Parquet etc.). Hence we can't reuse Flink's vectorized readers with delete filters.
   
   I also moved some unrelated refactoring out of this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r666430823



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderFactory.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+
+public interface ReaderFactory<T> extends Serializable {
+
+  /**
+   * An iterator over records with their position in the file. The iterator is closeable to
+   * support clean resource release and recycling.
+   *
+   * @param <T> The type of the record.
+   */
+  interface RecordIterator<T> {
+
+    /**
+     * Gets the next record from the file, together with its position.
+     *
+     * <p>The position information returned with the record point to the record AFTER the
+     * returned record, because it defines the point where the reading should resume once the
+     * current record is emitted. The position information is put in the source's state when the
+     * record is emitted. If a checkpoint is taken directly after the record is emitted, the
+     * checkpoint must to describe where to resume the source reading from after that record.
+     *
+     * <p>Objects returned by this method may be reused by the iterator. By the time that this
+     * method is called again, no object returned from the previous call will be referenced any
+     * more. That makes it possible to have a single {@link MutableRecordAndPosition} object and
+     * return the same instance (with updated record and position) on every call.
+     */
+    @Nullable
+    RecordAndPosition<T> next();
+
+    /**
+     * Releases the batch that this iterator iterated over. This is not supposed to close the
+     * reader and its resources, but is simply a signal that this iterator is no used any more.
+     * This method can be used as a hook to recycle/reuse heavyweight object structures.
+     */
+    void releaseBatch();
+  }
+
+  /**
+   * A batch reader for a {@link IcebergSourceSplit}
+   *
+   * @param <T> output record type
+   */
+  interface Reader<T> {

Review comment:
       I would need `FileRecords`, as it is an implementation of the `RecordsWithSplitIds` interface




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739953087



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
##########
@@ -22,33 +22,58 @@
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
+import org.apache.flink.annotation.Internal;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
-class FlinkSplitGenerator {
-  private FlinkSplitGenerator() {
+@Internal
+public class FlinkSplitPlanner {
+  private FlinkSplitPlanner() {
   }
 
-  static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
-    List<CombinedScanTask> tasks = tasks(table, context);
-    FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
+  static FlinkInputSplit[] planInputSplits(Table table, ScanContext context) {

Review comment:
       `create/generate` implies creating sth new. This is actually plan/discover splits from table. Hence changed the method name. I actually also renamed the class name from `FlinkSplitGenerator` to `FlinkSplitPlanner`. This is an internal class. So it shouldn't break user code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739954541



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
##########
@@ -140,7 +140,7 @@ private void monitorAndForwardSplits() {
         newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId);
       }
 
-      FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext);
+      FlinkInputSplit[] splits = FlinkSplitPlanner.planInputSplits(table, newScanContext);

Review comment:
       FlinkSplitGenerator/FlinkSplitPlanner is an internal class. it doesn't affect the public API for user code




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739371220



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +40,10 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Integer> SOURCE_READER_FETCH_RECORD_BATCH_SIZE = ConfigOptions
+      .key("source.iceberg.reader.fetch-record-batch-size")

Review comment:
       I didn't use `table.exec`, as this config isn't about table/SQL execution behavior. This is the Iceberg source behavior (DataStream API or SQL).
   
   I checked the two FLIP-27 source impls (Kafka and file) in Flink repo. 
   
   - Kafka source option doesn't contain any prefix, e.g. "partition.discovery.interval.ms"
   - file source does contain a prefix. e.g. "source.file.records.fetch-size"
   
   This is following the file source convention.
   
   @openinx any suggestion?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739374036



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -37,20 +38,46 @@
  */
 @Internal
 public class DataIterator<T> implements CloseableIterator<T> {
-

Review comment:
       sure. this is done to match the code style in Iceberg (no empty line btw class def and the next code line. but I will revert it to avoid conflicts




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739931366



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +40,10 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Integer> SOURCE_READER_FETCH_BATCH_SIZE = ConfigOptions
+      .key("source.iceberg.reader.fetch-batch-size")
+      .intType()
+      .defaultValue(2048)

Review comment:
       updated description with `The target number of records ...`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r716343479



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/DataIteratorReaderFunction.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.CloseableIterator;
+
+public abstract class DataIteratorReaderFunction<T> implements ReaderFunction<T> {

Review comment:
       will add Javadoc. In this PR, there is only one implementation of `RowDataReaderFunction`. but we can extend from it with `AvroGenericRecordReaderFunction` that directly reads Parquet files into Avro GenericRecord. That is why this is left as `public`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] tweise commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r714388247



##########
File path: build.gradle
##########
@@ -320,6 +321,7 @@ project(':iceberg-flink') {
     compile project(':iceberg-parquet')
     compile project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       It would be too restrictive to only depend on `@Public` interfaces. FLIP-27 connectors are becoming the default and these interface should mature soon though. Iceberg project will need a story to support multiple Flink versions at some point.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r736866975



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +40,10 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Integer> SOURCE_READER_FETCH_BATCH_SIZE = ConfigOptions
+      .key("source.iceberg.reader.fetch-batch-size")
+      .intType()
+      .defaultValue(2048)

Review comment:
       +1 to specifying the unit in the description, and also to keeping the default value the same as upstream until we get a handle on what is a good value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu edited a comment on pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu edited a comment on pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#issuecomment-868805669


   @holdenk thx a lot for taking a look. will rebase it after the decision on the vectorized readers below.
   
   @rdblue @JingsongLi please help take a look and move the new FLIP-27 based Iceberg source move forward if you can. It is part 3 of this uber PR #2105
   
   Right now, there is a pending decision before this PR can be reviewed. Currently, this PR is based on the premise of reusing the BulkFormat from Flink for vectorized readers (for Parquet, Orc etc.), originally suggested by @JingsongLi . I am rethinking that choice. It is unlikely Flink (vectorized) readers with support delete filters like Iceberg readers. Maybe iceberg-flink module needs to have its own vectorized readers to support deletes. @zhangjun0x01 already submitted a PR #2566 for Orc. Then this PR needs to be adjusted and break away from flink file source.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r659029992



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -23,9 +23,9 @@
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
-public class FlinkTableOptions {
+public class FlinkConfigOptions {

Review comment:
       https://github.com/apache/iceberg/pull/2742




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r704925686



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -27,23 +27,39 @@
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
-class FlinkSplitGenerator {
+public class FlinkSplitGenerator {
   private FlinkSplitGenerator() {
   }
 
   static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
-    List<CombinedScanTask> tasks = tasks(table, context);
-    FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
+    try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
+      List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
+      FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
+      for (int i = 0; i < tasks.size(); i++) {
+        splits[i] = new FlinkInputSplit(i, tasks.get(i));
+      }
+      return splits;
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to process tasks iterable", e);
+    }
+  }
+
+  public static List<IcebergSourceSplit> planIcebergSourceSplits(

Review comment:
       Should we add a javadoc (Or replace it with a more clear name) to indicate why do we need to add an extra `planIcebergSourceSplits` (compared to `createInputSplits`) ?  Seems it's not easy to identify the difference between the name 'InputSplits' and name 'IcebergSourceSplits'.  I think it's used for implementing the flip27's SourceSplit, right ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] sundargates commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
sundargates commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r663150205



##########
File path: build.gradle
##########
@@ -140,10 +140,11 @@ subprojects {
     def buildLog = new File(logFile)
     addTestOutputListener(new TestOutputListener() {
       def lastDescriptor
+
       @Override
       void onOutput(TestDescriptor testDescriptor, TestOutputEvent testOutputEvent) {
         if (lastDescriptor != testDescriptor) {
-          buildLog << "--------\n- Test log for: "<< testDescriptor << "\n--------\n"
+          buildLog << "--------\n- Test log for: " << testDescriptor << "\n--------\n"

Review comment:
       looks like extraneous changes unrelated to the diff?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +40,10 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Integer> SOURCE_READER_FETCH_BATCH_SIZE = ConfigOptions
+      .key("source.iceberg.reader.fetch-batch-size")
+      .intType()
+      .defaultValue(2048)

Review comment:
       nit: seems rather large. 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderFactory.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+
+public interface ReaderFactory<T> extends Serializable {
+
+  /**
+   * An iterator over records with their position in the file. The iterator is closeable to
+   * support clean resource release and recycling.
+   *
+   * @param <T> The type of the record.
+   */
+  interface RecordIterator<T> {
+
+    /**
+     * Gets the next record from the file, together with its position.
+     *
+     * <p>The position information returned with the record point to the record AFTER the
+     * returned record, because it defines the point where the reading should resume once the
+     * current record is emitted. The position information is put in the source's state when the
+     * record is emitted. If a checkpoint is taken directly after the record is emitted, the
+     * checkpoint must to describe where to resume the source reading from after that record.
+     *
+     * <p>Objects returned by this method may be reused by the iterator. By the time that this
+     * method is called again, no object returned from the previous call will be referenced any
+     * more. That makes it possible to have a single {@link MutableRecordAndPosition} object and
+     * return the same instance (with updated record and position) on every call.
+     */
+    @Nullable
+    RecordAndPosition<T> next();
+
+    /**
+     * Releases the batch that this iterator iterated over. This is not supposed to close the
+     * reader and its resources, but is simply a signal that this iterator is no used any more.
+     * This method can be used as a hook to recycle/reuse heavyweight object structures.
+     */
+    void releaseBatch();
+  }
+
+  /**
+   * A batch reader for a {@link IcebergSourceSplit}
+   *
+   * @param <T> output record type
+   */
+  interface Reader<T> {
+
+    /**
+     * Reads one batch. The method should return null when reaching the end of the input. The
+     * returned batch will be handed over to the processing threads as one.
+     *
+     * <p>The returned iterator object and any contained objects may be held onto by the
+     * source for some time, so it should not be immediately reused by the reader.
+     *
+     * <p>To implement reuse and to save object allocation, consider using a {@link
+     * org.apache.flink.connector.file.src.util.Pool} and recycle objects into the Pool in the
+     * the {@link RecordIterator#releaseBatch()} method.
+     */
+    @Nullable
+    RecordIterator<T> readBatch() throws IOException;
+
+    /**
+     * Closes the reader and release all resources
+     */
+    void close() throws IOException;
+  }
+
+
+  /**
+   * Create a batch reader for the input split
+   *
+   * @param config Flink configuration
+   * @param split  Iceberg source split
+   * @return a batch reader
+   */
+  Reader<T> create(Configuration config, IcebergSourceSplit split);

Review comment:
       Why is the configuration passed per split? Wouldn't it be the same for the full table? If so, should it be passed to the constructor as a property of the implementation rather than per split?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/FileRecords.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.Collections;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+
+public class FileRecords<T> implements RecordsWithSplitIds<RecordAndPosition<T>> {
+
+  @Nullable
+  private final ReaderFactory.RecordIterator recordsForSplit;
+  private final Set<String> finishedSplits;
+
+  @Nullable
+  private String splitId;
+  @Nullable
+  private ReaderFactory.RecordIterator recordsForSplitCurrent;
+
+  private FileRecords(
+      @Nullable String splitId,
+      @Nullable ReaderFactory.RecordIterator recordsForSplit,

Review comment:
       Same as above.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -114,4 +145,43 @@ public void close() throws IOException {
     currentIterator.close();
     tasks = null;
   }
+
+  public Position position() {

Review comment:
       It appears that you are using CheckpointedPosition DS to communicate the position that the iterator has to seek to from outside. However, in order to communicate the current position to the outside world, you are using the internal Position DS. Wondering if we can keep this consistent to be either CheckpointedPosition or the mutable Position?

##########
File path: build.gradle
##########
@@ -329,8 +330,17 @@ project(':iceberg-flink') {
       exclude group: 'org.apache.avro', module: 'avro'
     }
 
+    // flink-dist doesn't include these two modules (FLINK-20098).
+    // Hence we should add them as compile deps.
+    // This doesn't affect iceberg-flink-runtime shadow jar,
+    // since it excludes all flink jars.
+    compile "org.apache.flink:flink-connector-base"
+    compile "org.apache.flink:flink-connector-files"

Review comment:
       nit: is this still needed?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderFactory.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+
+public interface ReaderFactory<T> extends Serializable {
+
+  /**
+   * An iterator over records with their position in the file. The iterator is closeable to
+   * support clean resource release and recycling.
+   *
+   * @param <T> The type of the record.
+   */
+  interface RecordIterator<T> {

Review comment:
       I'm not sure we need this internal interface given RecordsWithSplitIds is pretty much a superset of this DS. 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -79,6 +83,27 @@ InputFile getInputFile(String location) {
     return inputFiles.get(location);
   }
 
+  public void seek(CheckpointedPosition checkpointedPosition)  {

Review comment:
       is there a need to make use of this DS given that BulkFormat is not going to be supported initially?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderFactory.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+
+public interface ReaderFactory<T> extends Serializable {

Review comment:
       I think it might be better to avoid this interface and replace it with an existing Java type such as java.util.Function,  as it would lead to less need for understanding new types. If you want to have this abstraction, then you can just define this interface as 
   ```
   public interface ReaderFactory<T> extends Function<IcebergSourceSplit, CloseableIterable<RecordsWithSplitIds<T>> {}
   ```
   

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/FileRecords.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.Collections;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+
+public class FileRecords<T> implements RecordsWithSplitIds<RecordAndPosition<T>> {
+
+  @Nullable
+  private final ReaderFactory.RecordIterator recordsForSplit;

Review comment:
       why is the RecordIterator type here not parameterized? 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataIteratorReaderFactory.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.data.RowDataUtil;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.RowDataIterator;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+
+public class RowDataIteratorReaderFactory implements ReaderFactory<RowData> {

Review comment:
       Can we make this generic by asking the user to provide the factory for generating T (RowData in this specific case) from a given CombinedScanTask or make that abstract and have a default implementation for RowData that uses the DataIterator?
   
   ```
   class BatchDataIteratorFactory<T> implements Function<FileScanTaskSplit, CloseableIterable<RecordsWithSplitIds<T>> {
     protected abstract DataIterator<T> getIteratorFor(CombinedScanTask task);
   }
   ```

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderFactory.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+
+public interface ReaderFactory<T> extends Serializable {
+
+  /**
+   * An iterator over records with their position in the file. The iterator is closeable to
+   * support clean resource release and recycling.
+   *
+   * @param <T> The type of the record.
+   */
+  interface RecordIterator<T> {
+
+    /**
+     * Gets the next record from the file, together with its position.
+     *
+     * <p>The position information returned with the record point to the record AFTER the
+     * returned record, because it defines the point where the reading should resume once the
+     * current record is emitted. The position information is put in the source's state when the
+     * record is emitted. If a checkpoint is taken directly after the record is emitted, the
+     * checkpoint must to describe where to resume the source reading from after that record.
+     *
+     * <p>Objects returned by this method may be reused by the iterator. By the time that this
+     * method is called again, no object returned from the previous call will be referenced any
+     * more. That makes it possible to have a single {@link MutableRecordAndPosition} object and
+     * return the same instance (with updated record and position) on every call.
+     */
+    @Nullable
+    RecordAndPosition<T> next();
+
+    /**
+     * Releases the batch that this iterator iterated over. This is not supposed to close the
+     * reader and its resources, but is simply a signal that this iterator is no used any more.
+     * This method can be used as a hook to recycle/reuse heavyweight object structures.
+     */
+    void releaseBatch();
+  }
+
+  /**
+   * A batch reader for a {@link IcebergSourceSplit}
+   *
+   * @param <T> output record type
+   */
+  interface Reader<T> {

Review comment:
       Would it make sense to have the reader extend CloseableIterable<RecordsWithSplitIds<T>>? This way the intermediate DS called FileRecords can be completely avoided. A lot of the abstraction makes it slightly complex to read otherwise IMO. 
   
   ```
   interface Reader<T> extends CloseableIterable<RecordsWithSplitIds<T>> {
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r704913666



##########
File path: build.gradle
##########
@@ -141,6 +141,7 @@ subprojects {
     def buildLog = new File(logFile)
     addTestOutputListener(new TestOutputListener() {
       def lastDescriptor
+

Review comment:
       Nit:  please don't introduce any unrelated changes for avoiding conflicts when rebase to people's own repo.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] tweise commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r714389058



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +40,10 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Integer> SOURCE_READER_FETCH_BATCH_SIZE = ConfigOptions
+      .key("source.iceberg.reader.fetch-batch-size")
+      .intType()
+      .defaultValue(2048)

Review comment:
       Or specify the unit in the description.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#issuecomment-804636106


   @openinx I updated the PR on refactoring the DataIterator: using composition instead of inheritance (like the old RowDataIterator). Please take a look when you got a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r666411366



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +40,10 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Integer> SOURCE_READER_FETCH_BATCH_SIZE = ConfigOptions
+      .key("source.iceberg.reader.fetch-batch-size")
+      .intType()
+      .defaultValue(2048)

Review comment:
       I copied this default value from Flink. I don't actually know what would be the good default value here. We probably need to test it and update accordingly.
   
   ```
   public class VectorizedColumnBatch implements Serializable {
       /**
        * This number is carefully chosen to minimize overhead and typically allows one
        * VectorizedColumnBatch to fit in cache.
        */
       public static final int DEFAULT_SIZE = 2048;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] holdenk commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
holdenk commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r658931457



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -23,9 +23,9 @@
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
-public class FlinkTableOptions {
+public class FlinkConfigOptions {

Review comment:
       This looks like an existing public API, renaming might have some challenges or at the very least need a comment in the migration guide. Another option would be to extend FlinkConfigOptions into a deprecated FlinkTableOptions so as not to break compatiblity.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
##########
@@ -34,7 +34,7 @@
 /**
  * Context object with optional arguments for a Flink Scan.
  */
-class ScanContext implements Serializable {
+public class ScanContext implements Serializable {

Review comment:
       Why do we need to make so much more of this ScanContext public?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.IcebergSourceEvents;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.MutableIcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+public class IcebergSourceReader<T> extends

Review comment:
       Lets drop it for now and do it in a follow up PR, this one is already really really large.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r740416875



##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       @openinx Maybe follow-up on the other [comment discussion](https://github.com/apache/iceberg/pull/3354/files#r740053509) here.
   
   With the SplitEnumerator API change, looks like I need to put FLIP-27 source in the `v1.13` folder. What should we do with future versions (like 1.14)? do we copy the FLIP-27 source code `v1.13` to `v1.14` folder?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739936992



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -41,16 +42,45 @@
   private final FileScanTaskReader<T> fileScanTaskReader;
 
   private final InputFilesDecryptor inputFilesDecryptor;
+  private final CombinedScanTask combinedTask;
+
   private Iterator<FileScanTask> tasks;
   private CloseableIterator<T> currentIterator;
+  private Position position;
 
   public DataIterator(FileScanTaskReader<T> fileScanTaskReader, CombinedScanTask task,
                       FileIO io, EncryptionManager encryption) {
     this.fileScanTaskReader = fileScanTaskReader;
 
     this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
+    this.combinedTask = task;
+
     this.tasks = task.files().iterator();
     this.currentIterator = CloseableIterator.empty();
+    // fileOffset starts at -1 because we started
+    // from an empty iterator that is not from the split files.
+    this.position = new Position(-1L, 0L);
+  }
+
+  public void seek(Position startingPosition) {
+    // skip files
+    Preconditions.checkArgument(startingPosition.fileOffset() < combinedTask.files().size(),
+        "Checkpointed file offset is %d, while CombinedScanTask has %d files",
+        startingPosition.fileOffset(), combinedTask.files().size());
+    for (long i = 0L; i < startingPosition.fileOffset(); ++i) {

Review comment:
       integer would certainly be sufficient. I was using `long` to match the type in `RecordAndPosition` from flink-connector-files module. I have already tried to move away from depending on code from flink-connector-files. This looks like a miss due to the flink-table-planner-blink_2.12 pulling in flink-connector-files transitively.
   
   The `long offset` from Flink's `RecordAndPosition` actually meant byte offset within a file. I will define our own `RecordAndPosition` and change `fileOffset` to `int` type.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r740413976



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -41,18 +42,47 @@
   private final FileScanTaskReader<T> fileScanTaskReader;
 
   private final InputFilesDecryptor inputFilesDecryptor;
-  private Iterator<FileScanTask> tasks;
+  private final CombinedScanTask combinedTask;
+  private final Position position;
+
+  private Iterator<FileScanTask> fileTasksIterator;
   private CloseableIterator<T> currentIterator;
 
   public DataIterator(FileScanTaskReader<T> fileScanTaskReader, CombinedScanTask task,
                       FileIO io, EncryptionManager encryption) {
     this.fileScanTaskReader = fileScanTaskReader;
 
     this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
-    this.tasks = task.files().iterator();
+    this.combinedTask = task;
+    // fileOffset starts at -1 because we started
+    // from an empty iterator that is not from the split files.
+    this.position = new Position(-1, 0L);

Review comment:
       I wouldn't say that `seek` capability is FLIP-27 specific. If we think `DataIterator` as reading a list of files/splits from `CombinedScanTask`, it is like a file API where `seek` is pretty common. It is needed to achieve exactly-once semantics on source reading. 
   
   Thanks a lot for the `SeekableDataIterator`. I feel that leaving these two empty abstract methods in the base `DataIterator` is a little weird
   ```
   protected void advanceRecord()
   protected void advanceTask()
   ```
   
   Overall, I still think adding `seek` capability to `DataIterator` is natural (for file-like read APIs)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739569894



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
##########
@@ -22,33 +22,58 @@
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
+import org.apache.flink.annotation.Internal;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
-class FlinkSplitGenerator {
-  private FlinkSplitGenerator() {
+@Internal
+public class FlinkSplitPlanner {
+  private FlinkSplitPlanner() {
   }
 
-  static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
-    List<CombinedScanTask> tasks = tasks(table, context);
-    FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
+  static FlinkInputSplit[] planInputSplits(Table table, ScanContext context) {

Review comment:
       Why change the name of this method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739572031



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayPoolDataIteratorBatcher.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.Position;
+import org.apache.iceberg.io.CloseableIterator;
+
+class ArrayPoolDataIteratorBatcher<T> implements DataIteratorBatcher<T> {

Review comment:
       There isn't much context for me to go on here. Should there be Javadoc to explain what's going on?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739572641



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/DataIteratorBatcher.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.io.CloseableIterator;
+
+/**
+ * Batcher converts iterator of T into iterator of batched {@code RecordsWithSplitIds<RecordAndPosition<T>>},
+ * which is what FLIP-27's {@link SplitReader#fetch()} returns.
+ */
+@FunctionalInterface
+public interface DataIteratorBatcher<T> extends Serializable {
+  CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> apply(String splitId, DataIterator<T> inputIterator);

Review comment:
       Why name this `apply`? Is there a more specific verb you could use here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739573788



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayPoolDataIteratorBatcher.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.Position;
+import org.apache.iceberg.io.CloseableIterator;
+
+class ArrayPoolDataIteratorBatcher<T> implements DataIteratorBatcher<T> {
+  private final Configuration config;
+  private final RecordFactory<T> recordFactory;
+
+  ArrayPoolDataIteratorBatcher(Configuration config, RecordFactory<T> recordFactory) {
+    this.config = config;
+    this.recordFactory = recordFactory;
+  }
+
+  @Override
+  public CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> apply(
+      String splitId, DataIterator<T> inputIterator) {
+    return new ArrayPoolBatchIterator(splitId, inputIterator);
+  }
+
+  private class ArrayPoolBatchIterator implements CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> {
+
+    private final String splitId;
+    private final DataIterator<T> inputIterator;
+    private final int batchSize;
+    private final Pool<T[]> pool;
+
+    ArrayPoolBatchIterator(String splitId, DataIterator<T> inputIterator) {
+      this.splitId = splitId;
+      this.inputIterator = inputIterator;
+      this.batchSize = config.getInteger(FlinkConfigOptions.SOURCE_READER_FETCH_RECORD_BATCH_SIZE);
+      this.pool = createPoolOfBatches(config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
+    }
+
+    @Override
+    public boolean hasNext() {
+      return inputIterator.hasNext();
+    }
+
+    @Override
+    public RecordsWithSplitIds<RecordAndPosition<T>> next() {
+      final T[] batch = getCachedEntry();
+      int num = 0;
+      while (inputIterator.hasNext() && num < batchSize) {
+        T nextRecord = inputIterator.next();
+        recordFactory.clone(nextRecord, batch[num]);
+        num++;
+        if (inputIterator.isCurrentIteratorDone()) {
+          // break early so that records in the ArrayResultIterator
+          // have the same fileOffset.
+          break;
+        }
+      }
+      if (num == 0) {

Review comment:
       Could you add newlines between control flow blocks?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r716347826



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.split;
+
+import java.io.IOException;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.InstantiationUtil;
+
+/**
+ * TODO: use Java serialization for now.
+ * will switch to more stable serializer from issue-1698.

Review comment:
       will add link.
   
   Regarding the future change with stable serialize, it will be backward compatible as we can bump up the serializer version to v2 and can continue to deserialize state written by v1
   
   ```
     @Override
     public IcebergSourceSplit deserialize(int version, byte[] serialized) throws IOException {
       switch (version) {
         case 1:
           return deserializeV1(serialized);
         default:
           throw new IOException("Unknown version: " + version);
       }
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r705907404



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -27,23 +27,39 @@
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
-class FlinkSplitGenerator {
+public class FlinkSplitGenerator {
   private FlinkSplitGenerator() {
   }
 
   static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
-    List<CombinedScanTask> tasks = tasks(table, context);
-    FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
+    try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
+      List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
+      FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
+      for (int i = 0; i < tasks.size(); i++) {
+        splits[i] = new FlinkInputSplit(i, tasks.get(i));
+      }
+      return splits;
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to process tasks iterable", e);
+    }
+  }
+
+  public static List<IcebergSourceSplit> planIcebergSourceSplits(

Review comment:
       actually, I think we should rename `createInputSplits` to `planFlinkInputSplit`. We are not creating splits out of nowhere. Both are just discover/plan splits from table and call the same `planTasks`. I can add some javadoc on the new `planIcebergSourceSplits` method.
   
   Since `createInputSplits` is non-public, we should be safe to rename.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r705899764



##########
File path: build.gradle
##########
@@ -406,6 +409,8 @@ project(':iceberg-flink-runtime') {
     compile(project(':iceberg-nessie')) {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
+
+    compile "org.apache.flink:flink-connector-base"

Review comment:
       that is correct




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r666413592



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/FileRecords.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.Collections;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+
+public class FileRecords<T> implements RecordsWithSplitIds<RecordAndPosition<T>> {
+
+  @Nullable
+  private final ReaderFactory.RecordIterator recordsForSplit;

Review comment:
       they should be. will fix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r594051568



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.iceberg.flink.source.IcebergSourceEvents;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.MutableIcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+public class IcebergSourceReader<T> extends

Review comment:
       Yes, it is not used in this PR. It is added for the completeness of the `split reader` module




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu closed pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu closed pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739932152



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -41,16 +42,45 @@
   private final FileScanTaskReader<T> fileScanTaskReader;
 
   private final InputFilesDecryptor inputFilesDecryptor;
+  private final CombinedScanTask combinedTask;
+
   private Iterator<FileScanTask> tasks;
   private CloseableIterator<T> currentIterator;
+  private Position position;
 
   public DataIterator(FileScanTaskReader<T> fileScanTaskReader, CombinedScanTask task,
                       FileIO io, EncryptionManager encryption) {
     this.fileScanTaskReader = fileScanTaskReader;
 
     this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
+    this.combinedTask = task;
+
     this.tasks = task.files().iterator();

Review comment:
       agree. will rename it to `fileTasksIterator`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r740092461



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +40,10 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Integer> SOURCE_READER_FETCH_RECORD_BATCH_SIZE = ConfigOptions
+      .key("source.iceberg.reader.fetch-record-batch-size")

Review comment:
       Yes,  this is unrelated to the table/SQL execution. Both datastream job and table SQL job are using the same configuration keys.  So I'm okay to keep the current name. 
   
   (In fact, if we don't consider the flink's configuration name,  I'd prefer to name it `iceberg.source.reader.fetch-record-batch-size`. But iceberg is also a kind of flink connector, all the other flip-27 source connector has named it as `source.<connector>.xxx`,  so I think we can follow the naming).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] tweise commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739378847



##########
File path: flink-runtime/build.gradle
##########
@@ -42,6 +42,8 @@ project(':iceberg-flink-runtime') {
     implementation(project(':iceberg-nessie')) {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
+
+    implementation "org.apache.flink:flink-connector-base"

Review comment:
       Please add comment.
   
   @rdblue `flink-connector-base` needs to be a transitive dependency of the iceberg connector (or shaded/relocated). It is not part of the Flink runtime.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r740413976



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -41,18 +42,47 @@
   private final FileScanTaskReader<T> fileScanTaskReader;
 
   private final InputFilesDecryptor inputFilesDecryptor;
-  private Iterator<FileScanTask> tasks;
+  private final CombinedScanTask combinedTask;
+  private final Position position;
+
+  private Iterator<FileScanTask> fileTasksIterator;
   private CloseableIterator<T> currentIterator;
 
   public DataIterator(FileScanTaskReader<T> fileScanTaskReader, CombinedScanTask task,
                       FileIO io, EncryptionManager encryption) {
     this.fileScanTaskReader = fileScanTaskReader;
 
     this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
-    this.tasks = task.files().iterator();
+    this.combinedTask = task;
+    // fileOffset starts at -1 because we started
+    // from an empty iterator that is not from the split files.
+    this.position = new Position(-1, 0L);

Review comment:
       I wouldn't say that `seek` capability is FLIP-27 specific. If we think `DataIterator` as reading a list of files/splits from `CombinedScanTask`, it is like a file API where `seek` is pretty common. It is needed to achieve exactly-once semantics on source reading. 
   
   Thanks a lot for the `SeekableDataIterator`. I feel that leaving these two empty abstract methods in the base `DataIterator` is a little weird
   ```
   protected void advanceRecord()
   protected void advanceTask()
   ```
   
   Overall, I still think adding `seek` capability to `DataIterator` is natural (for file-like read APIs)
   

##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       @openinx Maybe follow-up on the other [comment discussion](https://github.com/apache/iceberg/pull/3354/files#r740053509) here.
   
   With the SplitEnumerator API change, looks like I need to put FLIP-27 source in the `v1.13` folder. What should we do with future versions (like 1.14)? do we copy the FLIP-27 source code `v1.13` to `v1.14` folder?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -41,18 +42,47 @@
   private final FileScanTaskReader<T> fileScanTaskReader;
 
   private final InputFilesDecryptor inputFilesDecryptor;
-  private Iterator<FileScanTask> tasks;
+  private final CombinedScanTask combinedTask;
+  private final Position position;
+
+  private Iterator<FileScanTask> fileTasksIterator;
   private CloseableIterator<T> currentIterator;
 
   public DataIterator(FileScanTaskReader<T> fileScanTaskReader, CombinedScanTask task,
                       FileIO io, EncryptionManager encryption) {
     this.fileScanTaskReader = fileScanTaskReader;
 
     this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
-    this.tasks = task.files().iterator();
+    this.combinedTask = task;
+    // fileOffset starts at -1 because we started
+    // from an empty iterator that is not from the split files.
+    this.position = new Position(-1, 0L);

Review comment:
       I wouldn't say that `seek` capability is FLIP-27 specific. If we think `DataIterator` as reading a list of files/splits from `CombinedScanTask`, it is like a file API where `seek` is pretty common. It is needed to achieve exactly-once processing semantics. e.g., if we were to implement exactly once semantics for the current streaming source, I would imagine we need this as well.
   
   Thanks a lot for the `SeekableDataIterator`. I feel that leaving these two empty abstract methods in the base `DataIterator` is a little weird
   ```
   protected void advanceRecord()
   protected void advanceTask()
   ```
   
   Overall, I still think adding `seek` capability to `DataIterator` is natural (for file-like read APIs)
   

##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       @openinx Maybe follow-up on the other [comment discussion](https://github.com/apache/iceberg/pull/3354/files#r740053509) here.
   
   With the SplitEnumerator API change, looks like I need to put FLIP-27 source in the `v1.13` folder. What should we do with future versions (like 1.14)? do we copy the FLIP-27 source code from `v1.13` to `v1.14` folder?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r742093418



##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       @openinx, the tests run against the iceberg-flink module. They aren't present in the 1.12 or 1.13 modules. If you want them to be run for those modules, you'd need to add the source folder like you do for `src/main/java`. If you choose to do that, let's also remove CI for the common module since we don't need to run the tests outside of 1.12 and 1.13 if they are run in those modules.

##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       @stevenzwu, I think that copying the parts that change is reasonable. And once we remove support for 1.12, you can move the files back into the common module.

##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       @openinx, the tests run against the iceberg-flink module. They aren't present in the 1.12 or 1.13 modules. If you want them to be run for those modules, you'd need to add the source folder like you do for `src/main/java`. If you choose to do that, let's also remove CI for the common module since we don't need to run the tests outside of 1.12 and 1.13 if they are run in those modules.

##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       @stevenzwu, I think that copying the parts that change is reasonable. And once we remove support for 1.12, you can move the files back into the common module.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2305: Flink: FLIP-27 source split and reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r742093418



##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       @openinx, the tests run against the iceberg-flink module. They aren't present in the 1.12 or 1.13 modules. If you want them to be run for those modules, you'd need to add the source folder like you do for `src/main/java`. If you choose to do that, let's also remove CI for the common module since we don't need to run the tests outside of 1.12 and 1.13 if they are run in those modules.

##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
+    compileOnly "org.apache.flink:flink-connector-base"

Review comment:
       @stevenzwu, I think that copying the parts that change is reasonable. And once we remove support for 1.12, you can move the files back into the common module.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org