You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/01/06 20:43:55 UTC

[GitHub] [iceberg] kbendick commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

kbendick commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779844460



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -225,7 +227,8 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) {
         int maxInferParallelism = readableConfig.get(FlinkConfigOptions
             .TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
-        Preconditions.checkState(maxInferParallelism >= 1,
+        Preconditions.checkState(
+            maxInferParallelism >= 1,

Review comment:
       Nit: It seems like this change isn't updating anything.
   
   Can we ensure we don't update a line / cause a diff where there's no change in the source code? This greatly helps people who maintain forks and have to `git cherry-pick` most commits.

##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +250,16 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    private boolean localityEnabled() {
+      FileIO fileIO = table.io();
+      if (fileIO instanceof HadoopFileIO) {
+        Boolean localityConfig = readableConfig.get(FlinkConfigOptions
+            .TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+        return localityConfig != null ? localityConfig : true;

Review comment:
       Nit / open question:
   
   On the subject of having a default value, it seems like the default for `HadoopFileIO` is `true`.
   
   Should we also check that the path has a scheme that would make sense with locality enabled (e.g. the user isn't using HadoopFileIO with s3a or some other object store underneath)? Sort of like how Spark has the `FILESYSTEM_SUPPORT_LOCALITY` check?
   
   I'm marking this as an open question, as it might be difficult to pass this information through, but I know some people who use Azure or even GCS who use the `HadoopFileIO` with the shim for their object storage (whatever is similar to `s3a` for their object storage), and they would not benefit from a default value of `true`.

##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
##########
@@ -19,28 +19,19 @@
 
 package org.apache.iceberg.flink.source;
 
-import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 
-/**
- * TODO Implement {@link LocatableInputSplit}.
- */
-public class FlinkInputSplit implements InputSplit {
+public class FlinkInputSplit extends LocatableInputSplit {
 
-  private final int splitNumber;
   private final CombinedScanTask task;
 
-  FlinkInputSplit(int splitNumber, CombinedScanTask task) {
-    this.splitNumber = splitNumber;
+  FlinkInputSplit(int splitNumber, CombinedScanTask task, String[] hosts) {

Review comment:
       What value will `hosts` be if locality is disabled? Simply `null`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org