You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/05/12 12:24:28 UTC

[GitHub] [iceberg] openinx commented on a change in pull request #2105: Flink: Initial implementation of Flink source with the new FLIP-27 source interface

openinx commented on a change in pull request #2105:
URL: https://github.com/apache/iceberg/pull/2105#discussion_r630840840



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -79,6 +83,27 @@ InputFile getInputFile(String location) {
     return inputFiles.get(location);
   }
 
+  public void seek(CheckpointedPosition checkpointedPosition)  {
+    // skip files
+    Preconditions.checkArgument(checkpointedPosition.getOffset() < combinedTask.files().size(),

Review comment:
       Nit:  The `checkArgument` could  __format__ the error message directly,  we don't need the extra `String.format` now.

##########
File path: build.gradle
##########
@@ -318,14 +318,24 @@ project(':iceberg-flink') {
     compileOnly "org.apache.flink:flink-table-api-java-bridge_2.12"
     compileOnly "org.apache.flink:flink-table-planner-blink_2.12"
     compileOnly "org.apache.flink:flink-table-planner_2.12"
+    compileOnly "org.apache.flink:flink-parquet_2.12"
     compileOnly "org.apache.hadoop:hadoop-hdfs"
     compileOnly "org.apache.hadoop:hadoop-common"
     compileOnly("org.apache.hadoop:hadoop-minicluster") {
       exclude group: 'org.apache.avro', module: 'avro'
     }
 
+    // flink-dist doesn't include these two modules (FLINK-20098).
+    // Hence we should add them as compile deps.
+    // This doesn't affect iceberg-flink-runtime shadow jar,
+    // since it excludes all flink jars.
+    compile "org.apache.flink:flink-connector-base"
+    compile "org.apache.flink:flink-connector-files"

Review comment:
       Personally, I think those two packages could be marked as `complieOnly` because people won't need to import those classes from iceberg-flink-xx.jar . ( The difference between `complile` and `compileOnly` is [here](https://liferay.dev/en/b/gradle-compile-vs-compileonly-vs-compileinclude)).

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -79,6 +83,27 @@ InputFile getInputFile(String location) {
     return inputFiles.get(location);
   }
 
+  public void seek(CheckpointedPosition checkpointedPosition)  {
+    // skip files
+    Preconditions.checkArgument(checkpointedPosition.getOffset() < combinedTask.files().size(),
+        String.format("Checkpointed file offset is %d, while CombinedScanTask has %d files",
+            checkpointedPosition.getOffset(), combinedTask.files().size()));
+    for (long i = 0L; i < checkpointedPosition.getOffset(); ++i) {
+      tasks.next();
+    }
+    updateCurrentIterator();
+    // skip records within the file
+    for (long i = 0; i < checkpointedPosition.getRecordsAfterOffset(); ++i) {
+      if (hasNext()) {
+        next();
+      } else {
+        throw new IllegalStateException("Not enough records to skip: " +

Review comment:
       Nit: It's good to provide the `filePath` in this error message.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -114,4 +145,43 @@ public void close() throws IOException {
     currentIterator.close();
     tasks = null;
   }
+
+  public Position position() {
+    return position;
+  }
+
+  public static class Position {

Review comment:
       Why need to introduce an extra `Position` class rather than reuse the flink's `CheckpointedPosition` (if we plan to keep the two-layer iterator in the same DataIterator class) ?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/IcebergSourceOptions.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public interface IcebergSourceOptions {

Review comment:
       I'm considering moving all those flink options inside a single `FlinkOptions` class because I don't want to define options everywhere ( now we have a [FlinkTableOptions](https://github.com/apache/iceberg/blob/90225d6c9413016d611e2ce5eff37db1bc1b4fc5/flink/src/main/java/org/apache/iceberg/flink/FlinkTableOptions.java#L26))

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -62,11 +64,13 @@
     // decrypt with the batch call to avoid multiple RPCs to a key server, if possible
     Iterable<InputFile> decryptedFiles = encryption.decrypt(encrypted::iterator);
 
-    Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(task.files().size());
+    Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(combinedTask.files().size());

Review comment:
       The expected size of hash map should be `keyMetadata.size()` because the `combinedTask` may have two `FileScanTask` that come from the same big data file ( That also means the `combinedTask.size() >= keyMetadata.size()` ).

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.split;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+
+public class IcebergSourceSplit extends FileSourceSplit {
+
+  public enum Status {
+    UNASSIGNED,
+    ASSIGNED,
+    COMPLETED
+  }
+
+  private final CombinedScanTask task;
+  @Nullable
+  private final CheckpointedPosition checkpointedPosition;
+
+  /**
+   * The splits are frequently serialized into checkpoints.
+   * Caching the byte representation makes repeated serialization cheap.
+   */
+  @Nullable private transient byte[] serializedFormCache;
+
+  public IcebergSourceSplit(CombinedScanTask task, CheckpointedPosition checkpointedPosition) {
+    // Supply dummy values so that IcebergSourceSplit extend from FileSourceSplit,
+    // as required by using BulkFormat interface in IcebergSource.
+    // We are hoping to clean this up after FLINK-20174 is resolved.
+    super("", new Path("file:///dummy"), 0L, 0L);

Review comment:
       I am a little worried about how to ensure compatibility with iceberg fileio, if we plan to rely on the flink fs interfaces (such as the `Path`) in some places.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.Serializable;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+
+/**
+ * Enumerator state for checkpointing
+ */
+public class IcebergEnumeratorState implements Serializable {
+
+  @Nullable private final IcebergEnumeratorPosition enumeratorPosition;
+  private final Map<IcebergSourceSplit, IcebergSourceSplit.Status> pendingSplits;

Review comment:
       Nit: This map looks quite strange because if we get the `IcebergSourceSplit` then we could easily get its `Status`, maintaining those splits in a Set is enough I think.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssignerFactory.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.assigner;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+
+public interface SplitAssignerFactory extends Serializable {

Review comment:
       Looks like we only provide the `simple` assigner in this PR.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -79,6 +83,27 @@ InputFile getInputFile(String location) {
     return inputFiles.get(location);
   }
 
+  public void seek(CheckpointedPosition checkpointedPosition)  {

Review comment:
       I think it's worth to follow the original [comment](https://github.com/apache/iceberg/pull/2105/files#r568303231)  because it clearly decouples the file and offset iterators code.

##########
File path: build.gradle
##########
@@ -318,14 +318,24 @@ project(':iceberg-flink') {
     compileOnly "org.apache.flink:flink-table-api-java-bridge_2.12"
     compileOnly "org.apache.flink:flink-table-planner-blink_2.12"
     compileOnly "org.apache.flink:flink-table-planner_2.12"
+    compileOnly "org.apache.flink:flink-parquet_2.12"
     compileOnly "org.apache.hadoop:hadoop-hdfs"
     compileOnly "org.apache.hadoop:hadoop-common"
     compileOnly("org.apache.hadoop:hadoop-minicluster") {
       exclude group: 'org.apache.avro', module: 'avro'
     }
 
+    // flink-dist doesn't include these two modules (FLINK-20098).
+    // Hence we should add them as compile deps.
+    // This doesn't affect iceberg-flink-runtime shadow jar,
+    // since it excludes all flink jars.
+    compile "org.apache.flink:flink-connector-base"
+    compile "org.apache.flink:flink-connector-files"

Review comment:
       We also don't need to provide a comment for the two jars because they should have the same meanings as the above flink jars.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.split;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+
+public class IcebergSourceSplit extends FileSourceSplit {
+
+  public enum Status {
+    UNASSIGNED,
+    ASSIGNED,
+    COMPLETED
+  }
+
+  private final CombinedScanTask task;
+  @Nullable
+  private final CheckpointedPosition checkpointedPosition;
+
+  /**
+   * The splits are frequently serialized into checkpoints.
+   * Caching the byte representation makes repeated serialization cheap.
+   */
+  @Nullable private transient byte[] serializedFormCache;
+
+  public IcebergSourceSplit(CombinedScanTask task, CheckpointedPosition checkpointedPosition) {
+    // Supply dummy values so that IcebergSourceSplit extend from FileSourceSplit,
+    // as required by using BulkFormat interface in IcebergSource.
+    // We are hoping to clean this up after FLINK-20174 is resolved.
+    super("", new Path("file:///dummy"), 0L, 0L);

Review comment:
       Maybe we could just implement our iceberg own `SourceSplit`.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.source.assigner.SplitAssigner;
+import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory;
+import org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator;
+import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlanner;
+import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl;
+import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorConfig;
+import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
+import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
+import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
+import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
+
+@Experimental
+public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEnumeratorState> {
+
+  private final TableLoader tableLoader;
+  private final ScanContext scanContext;
+  private final BulkFormat<T, IcebergSourceSplit> bulkFormat;
+  private final SplitAssignerFactory assignerFactory;
+  private final IcebergEnumeratorConfig enumeratorConfig;
+
+  IcebergSource(
+      TableLoader tableLoader,
+      ScanContext scanContext,
+      BulkFormat<T, IcebergSourceSplit> bulkFormat,
+      SplitAssignerFactory assignerFactory,
+      IcebergEnumeratorConfig enumeratorConfig) {
+
+    this.tableLoader = tableLoader;
+    this.enumeratorConfig = enumeratorConfig;
+    this.scanContext = scanContext;
+    this.bulkFormat = bulkFormat;
+    this.assignerFactory = assignerFactory;
+  }
+
+  private static Table loadTable(TableLoader tableLoader) {
+    tableLoader.open();
+    try (TableLoader loader = tableLoader) {
+      return loader.loadTable();
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to close table loader", e);
+    }
+  }
+
+  @Override
+  public Boundedness getBoundedness() {
+    return enumeratorConfig.splitDiscoveryInterval() == null ?
+        Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
+  }
+
+  @Override
+  public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext readerContext) {
+    return new IcebergSourceReader<>(
+        readerContext,
+        bulkFormat);
+  }
+
+  @Override
+  public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumerator(
+      SplitEnumeratorContext<IcebergSourceSplit> enumContext) {
+    return createEnumerator(enumContext, null);
+  }
+
+  @Override
+  public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> restoreEnumerator(
+      SplitEnumeratorContext<IcebergSourceSplit> enumContext, IcebergEnumeratorState enumState) {
+    return createEnumerator(enumContext, enumState);
+  }
+
+  @Override
+  public SimpleVersionedSerializer<IcebergSourceSplit> getSplitSerializer() {
+    return IcebergSourceSplitSerializer.INSTANCE;
+  }
+
+  @Override
+  public SimpleVersionedSerializer<IcebergEnumeratorState> getEnumeratorCheckpointSerializer() {
+    return IcebergEnumeratorStateSerializer.INSTANCE;
+  }
+
+  private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumerator(
+      SplitEnumeratorContext<IcebergSourceSplit> enumContext,
+      @Nullable IcebergEnumeratorState enumState) {
+
+    final Table table = loadTable(tableLoader);
+
+    final SplitAssigner assigner;
+    if (enumState == null) {
+      assigner = assignerFactory.createAssigner();
+      // for batch jobs, discover splits eagerly during job initialization.
+      // As FLINK-16866 supports non-blocking job submission since 1.12,
+      // heavy job initialization won't lead to request timeout for job submission.
+      if (enumeratorConfig.splitDiscoveryInterval() == null) {

Review comment:
       Looks like the `splitDiscoveryInterval` is the option to distingush whether it's a batch job or streaming job ? Literally it's a bit hard to understand. 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.source.assigner.SplitAssigner;
+import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory;
+import org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator;
+import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlanner;
+import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl;
+import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorConfig;
+import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
+import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
+import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
+import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
+
+@Experimental
+public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEnumeratorState> {
+
+  private final TableLoader tableLoader;
+  private final ScanContext scanContext;
+  private final BulkFormat<T, IcebergSourceSplit> bulkFormat;
+  private final SplitAssignerFactory assignerFactory;
+  private final IcebergEnumeratorConfig enumeratorConfig;
+
+  IcebergSource(
+      TableLoader tableLoader,
+      ScanContext scanContext,
+      BulkFormat<T, IcebergSourceSplit> bulkFormat,
+      SplitAssignerFactory assignerFactory,
+      IcebergEnumeratorConfig enumeratorConfig) {
+
+    this.tableLoader = tableLoader;
+    this.enumeratorConfig = enumeratorConfig;
+    this.scanContext = scanContext;
+    this.bulkFormat = bulkFormat;
+    this.assignerFactory = assignerFactory;
+  }
+
+  private static Table loadTable(TableLoader tableLoader) {
+    tableLoader.open();
+    try (TableLoader loader = tableLoader) {
+      return loader.loadTable();
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to close table loader", e);
+    }
+  }
+
+  @Override
+  public Boundedness getBoundedness() {
+    return enumeratorConfig.splitDiscoveryInterval() == null ?
+        Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
+  }
+
+  @Override
+  public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext readerContext) {
+    return new IcebergSourceReader<>(
+        readerContext,
+        bulkFormat);
+  }
+
+  @Override
+  public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumerator(
+      SplitEnumeratorContext<IcebergSourceSplit> enumContext) {
+    return createEnumerator(enumContext, null);
+  }
+
+  @Override
+  public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> restoreEnumerator(
+      SplitEnumeratorContext<IcebergSourceSplit> enumContext, IcebergEnumeratorState enumState) {
+    return createEnumerator(enumContext, enumState);
+  }
+
+  @Override
+  public SimpleVersionedSerializer<IcebergSourceSplit> getSplitSerializer() {
+    return IcebergSourceSplitSerializer.INSTANCE;
+  }
+
+  @Override
+  public SimpleVersionedSerializer<IcebergEnumeratorState> getEnumeratorCheckpointSerializer() {
+    return IcebergEnumeratorStateSerializer.INSTANCE;
+  }
+
+  private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumerator(
+      SplitEnumeratorContext<IcebergSourceSplit> enumContext,
+      @Nullable IcebergEnumeratorState enumState) {
+
+    final Table table = loadTable(tableLoader);
+
+    final SplitAssigner assigner;
+    if (enumState == null) {
+      assigner = assignerFactory.createAssigner();
+      // for batch jobs, discover splits eagerly during job initialization.
+      // As FLINK-16866 supports non-blocking job submission since 1.12,
+      // heavy job initialization won't lead to request timeout for job submission.
+      if (enumeratorConfig.splitDiscoveryInterval() == null) {
+        assigner.onDiscoveredSplits(FlinkSplitGenerator.planIcebergSourceSplits(table, scanContext));

Review comment:
       We cannot access this `table` instance ( especially doing planing tasks by using this table) again once we've closed the `tableLoader` because if we close the `tableLoader` that means we've closed the connection to the catalog (such as aws glue catalog or hive catalog),  re-access the resources from a closed RPC will lead to an exception.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.iceberg.flink.source.IcebergSourceEvents;
+import org.apache.iceberg.flink.source.assigner.GetSplitResult;
+import org.apache.iceberg.flink.source.assigner.SplitAssigner;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO: publish enumerator monitor metrics like number of pending metrics
+ * after FLINK-21000 is resolved
+ */
+abstract class AbstractIcebergEnumerator implements
+    SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractIcebergEnumerator.class);
+
+  private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
+  private final SplitAssigner assigner;
+  private final IcebergEnumeratorConfig enumeratorConfig;
+  private final Map<Integer, String> readersAwaitingSplit;
+  private final AtomicReference<CompletableFuture<Void>> availableFuture;
+
+  AbstractIcebergEnumerator(
+      SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
+      SplitAssigner assigner,
+      IcebergEnumeratorConfig enumeratorConfig) {
+    this.enumeratorContext = enumeratorContext;
+    this.assigner = assigner;
+    this.enumeratorConfig = enumeratorConfig;
+    this.readersAwaitingSplit = new LinkedHashMap<>();
+    this.availableFuture = new AtomicReference<>();
+  }
+
+  @Override
+  public void start() {
+    assigner.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    assigner.close();
+  }
+
+  /**
+   * Iceberg source uses a custom event inside handleSourceEvent
+   * so that we can piggyback the finishedSplitIds in SplitRequestEvent.
+   * We can move the logic back into this method once FLINK-21364 is resolved.
+   */
+  @Override
+  public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
+    throw new UnsupportedOperationException("Iceberg source uses its own SplitRequestEvent");
+  }
+
+  @Override
+  public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+    if (sourceEvent instanceof IcebergSourceEvents.SplitRequestEvent) {
+      final IcebergSourceEvents.SplitRequestEvent splitRequestEvent =
+          (IcebergSourceEvents.SplitRequestEvent) sourceEvent;
+      LOG.info("Received request split event from subtask {}", subtaskId);
+      assigner.onCompletedSplits(splitRequestEvent.finishedSplitIds());
+      readersAwaitingSplit.put(subtaskId, splitRequestEvent.requesterHostname());
+      assignSplits();
+    } else {
+      throw new IllegalArgumentException(String.format(
+          "Received unknown event %s from subtask %d", sourceEvent, subtaskId));
+    }
+  }
+
+  @Override
+  public void addSplitsBack(List<IcebergSourceSplit> splits, int subtaskId) {
+    LOG.info("Add {} splits back to the pool for failed subtask {}: {}",
+        splits.size(), subtaskId, splits);
+    assigner.onUnassignedSplits(splits);
+    assignSplits();
+  }
+
+  @Override
+  public void addReader(int subtaskId) {
+    LOG.info("Added reader: {}", subtaskId);
+  }
+
+  private void assignSplits() {
+    LOG.info("Assigning splits for {} awaiting readers", readersAwaitingSplit.size());
+    final Iterator<Map.Entry<Integer, String>> awaitingReader =
+        readersAwaitingSplit.entrySet().iterator();
+    while (awaitingReader.hasNext()) {
+      final Map.Entry<Integer, String> nextAwaiting = awaitingReader.next();
+      // if the reader that requested another split has failed in the meantime, remove
+      // it from the list of waiting readers
+      if (!enumeratorContext.registeredReaders().containsKey(nextAwaiting.getKey())) {
+        awaitingReader.remove();
+        continue;
+      }
+
+      final int awaitingSubtask = nextAwaiting.getKey();
+      final String hostname = nextAwaiting.getValue();
+      final GetSplitResult getResult = assigner.getNext(hostname);
+      if (getResult.status() == GetSplitResult.Status.AVAILABLE) {
+        LOG.info("Assign split to subtask {}: {}", awaitingSubtask, getResult.split());
+        enumeratorContext.assignSplit(getResult.split(), awaitingSubtask);
+        awaitingReader.remove();
+      } else if (getResult.status() == GetSplitResult.Status.CONSTRAINED) {
+        getAvailableFutureIfNeeded();
+        break;
+      } else if (getResult.status() == GetSplitResult.Status.UNAVAILABLE) {
+        if (shouldWaitForMoreSplits()) {
+          getAvailableFutureIfNeeded();
+          break;
+        } else {
+          LOG.info("No more splits available for subtask {}", awaitingSubtask);
+          enumeratorContext.signalNoMoreSplits(awaitingSubtask);
+          awaitingReader.remove();
+        }
+      } else {
+        throw new IllegalArgumentException("Unsupported status: " + getResult.status());
+      }
+    }
+  }
+
+  /**
+   * @return true if enumerator should wait for splits
+   * like in the continuous enumerator case
+   */
+  protected abstract boolean shouldWaitForMoreSplits();
+
+  private synchronized void getAvailableFutureIfNeeded() {
+    if (availableFuture.get() != null) {
+      return;
+    }
+    CompletableFuture<Void> future = assigner.isAvailable();
+    future.thenAccept(ignore ->
+        // Must run assignSplits in coordinator thread
+        // because the future may be completed from other threads.
+        // E.g., in event time alignment assigner,
+        // watermark advancement from another source may
+        // cause the available future to be completed
+        enumeratorContext.runInCoordinatorThread(() -> {
+          LOG.debug("Executing callback of assignSplits");
+          availableFuture.set(null);
+          assignSplits();
+        }));
+    availableFuture.set(future);
+    LOG.debug("Registered callback for future available splits");
+  }

Review comment:
       This function is really hard to follow,  I guess it's addressing the issue from [google doc](https://docs.google.com/document/d/1q6xaBxUPFwYsW9aXWxYUh7die6O7rDeAPFQcTAMQ0GM/edit#heading=h.a8rlfx363xjc).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org