You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/12/29 04:51:18 UTC

[GitHub] [iceberg] hililiwei opened a new pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

hililiwei opened a new pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817


   Closes https://github.com/apache/iceberg/issues/3816
   
   > FlinkInputSplit extends LocatableInputSplit instread of InputSplit, get the location of all files in CombinedScanTask and replace DefaultInputSplitAssigner with LocatableInputSplitAssigner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779608687



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -37,9 +40,20 @@ private FlinkSplitGenerator() {
   static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
     List<CombinedScanTask> tasks = tasks(table, context);
     FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
-    }
+    boolean localityPreferred = context.locality();
+
+    Tasks.range(tasks.size())
+        .stopOnFailure()
+        .executeWith(localityPreferred ? ThreadPools.getWorkerPool() : null)

Review comment:
       thx @stevenzwu, I mainly refer to the following code: :
   https://github.com/apache/iceberg/blob/1a11038baaca9f4cf03674654a6d9a85f39a1ce1/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java#L135-L144




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r778400379



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -19,15 +19,11 @@
 
 package org.apache.iceberg.flink;
 
-
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
 public class FlinkConfigOptions {
 
-  private FlinkConfigOptions() {

Review comment:
       why removing this private constructor? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779649695



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -19,15 +19,14 @@
 
 package org.apache.iceberg.flink;
 
-
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
 public class FlinkConfigOptions {
 
   private FlinkConfigOptions() {
   }
-
+  

Review comment:
       revert unnecessary whitespace change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779972943



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
##########
@@ -19,28 +19,19 @@
 
 package org.apache.iceberg.flink.source;
 
-import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 
-/**
- * TODO Implement {@link LocatableInputSplit}.
- */
-public class FlinkInputSplit implements InputSplit {
+public class FlinkInputSplit extends LocatableInputSplit {
 
-  private final int splitNumber;
   private final CombinedScanTask task;
 
-  FlinkInputSplit(int splitNumber, CombinedScanTask task) {
-    this.splitNumber = splitNumber;
+  FlinkInputSplit(int splitNumber, CombinedScanTask task, String[] hosts) {

Review comment:
       yes, It's a `null`. And `org.apache.flink.core.io.LocatableInputSplit#LocatableInputSplit(int, java.lang.String[])` converts `null `to a empty array.
   
   refer
   https://github.com/apache/flink/blob/137b65c5f23e5ba546ce52bd15fa7aa528ab95de/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java#L61-L64
   
   Maybe add an @Nullable to it? like this:
   ```
     FlinkInputSplit(int splitNumber, CombinedScanTask task, @Nullable String[] hosts) {
       super(splitNumber, hosts);
       this.task = task;
     }
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r778558211



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
##########
@@ -19,20 +19,17 @@
 
 package org.apache.iceberg.flink.source;
 
-import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 
-/**
- * TODO Implement {@link LocatableInputSplit}.
- */
-public class FlinkInputSplit implements InputSplit {
+public class FlinkInputSplit extends LocatableInputSplit {
 
   private final int splitNumber;

Review comment:
       thx. done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r786969209



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -195,6 +211,7 @@ public FlinkInputFormat buildFormat() {
       } else {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
       }
+      contextBuilder.exposeLocality(localityEnabled());

Review comment:
       Where is the `exposeLocality` variable used?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#issuecomment-1022895317


   @rdblue @kbendick Please take a look, do I need any further changes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r785606407



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
##########
@@ -178,6 +179,38 @@ public void testInferedParallelism() throws IOException {
     Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
   }
 
+  @Test
+  public void testExposeLocality() throws Exception {

Review comment:
       Yes, during the test phase, I printed some logs to see if it was working properly, such as this one:
   code:
   ![2022-1-17-2](https://user-images.githubusercontent.com/59213263/149700947-bea570d1-8cb3-4c16-9dbb-e690eb69535d.PNG)
   Logs:
   ![2022-1-17](https://user-images.githubusercontent.com/59213263/149699989-0dfb1440-0896-407f-9c10-775d887e6179.PNG)
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779593170



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +261,24 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    private boolean localityEnabled() {
+      InputFile file = table.io().newInputFile(table.location());

Review comment:
       When I run locally debug, it also seems to be HadoopFileIO.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r780427503



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -195,6 +205,7 @@ public FlinkInputFormat buildFormat() {
       } else {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
       }
+      contextBuilder.exposeLocality(localityEnabled());

Review comment:
       I think it should be possible to override `exposeLocality` in this builder so that you can set it differently for different sources. Keeping a boolean in this builder and passing that as an override for the environment property in `localityEnabled()` should work.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r780425675



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +258,24 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    boolean localityEnabled() {
+      FileIO fileIO = table.io();
+      if (fileIO instanceof HadoopFileIO) {
+        Boolean localityConfig = readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+
+        HadoopFileIO hadoopFileIO = (HadoopFileIO) fileIO;
+        try {
+          String scheme = new Path(table.location()).getFileSystem(hadoopFileIO.getConf()).getScheme();
+          boolean defaultValue = FILE_SYSTEM_SUPPORT_LOCALITY.contains(scheme);

Review comment:
       It would be nice to avoid the dependency on `Path` in this class - so people who don't have hadoop needs don't need the dependency, you might be able to do the following to get the same result.
   
   If you call `((HadoopFileIO) fileIO).newInputFile(table.location()).location()`, I believe you can check the scheme on the location sting returned by `InputFile`.
   
   There could very possibly be.a cleaner way to do it. But if this class doesn't already pull in `org.apache.hadoop.*`, then it would be best to avoid that if possible so that users who run only on cloud environments for example don't have to have HDFS on their classpath.
   
   In this case, just be having the import, even if it's `S3FileIO`, users still need the hadoop jars on the classpath (which is sometimes a pain on Flink).

##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +258,24 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    boolean localityEnabled() {
+      FileIO fileIO = table.io();
+      if (fileIO instanceof HadoopFileIO) {
+        Boolean localityConfig = readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+
+        HadoopFileIO hadoopFileIO = (HadoopFileIO) fileIO;
+        try {
+          String scheme = new Path(table.location()).getFileSystem(hadoopFileIO.getConf()).getScheme();
+          boolean defaultValue = FILE_SYSTEM_SUPPORT_LOCALITY.contains(scheme);

Review comment:
       It would be nice to avoid the dependency on `Path` in this class - so people who don't have HDFS requirements don't need the dependency, you might be able to do the following to get the same result.
   
   If you call `((HadoopFileIO) fileIO).newInputFile(table.location()).location()`, I believe you can check the scheme on the location sting returned by `InputFile`.
   
   There could very possibly be.a cleaner way to do it. But if this class doesn't already pull in `org.apache.hadoop.*`, then it would be best to avoid that if possible so that users who run only on cloud environments for example don't have to have hadoop jars on their user classpath.
   
   In this case, just be having the import, even if it's `S3FileIO`, users still need the hadoop jars on the classpath (which is sometimes a pain on Flink).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r780678775



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -195,6 +205,7 @@ public FlinkInputFormat buildFormat() {
       } else {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
       }
+      contextBuilder.exposeLocality(localityEnabled());

Review comment:
       Please review whether meet expectations, thx.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r780865082



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
##########
@@ -178,6 +179,38 @@ public void testInferedParallelism() throws IOException {
     Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
   }
 
+  @Test
+  public void testExposeLocality() throws Exception {
+    Table table =
+        catalog.createTable(TableIdentifier.of("default", "t"), TestFixtures.SCHEMA, TestFixtures.SPEC);
+
+    TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 10, 0L);
+    expectedRecords.forEach(expectedRecord -> expectedRecord.set(2, "2020-03-20"));
+
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+    DataFile dataFile = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), expectedRecords);
+    helper.appendToTable(dataFile);
+
+    // test sql api
+    Configuration tableConf = getTableEnv().getConfig().getConfiguration();
+    tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO.key(), false);
+
+    List<Row> results = sql("select * from t");
+    org.apache.iceberg.flink.TestHelpers.assertRecords(results, expectedRecords, TestFixtures.SCHEMA);

Review comment:
       Conflict with org.apache.iceberg.TestHelpers.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779657429



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -37,9 +40,20 @@ private FlinkSplitGenerator() {
   static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
     List<CombinedScanTask> tasks = tasks(table, context);
     FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
-    }
+    boolean localityPreferred = context.locality();
+
+    Tasks.range(tasks.size())
+        .stopOnFailure()
+        .executeWith(localityPreferred ? ThreadPools.getWorkerPool() : null)

Review comment:
       @hililiwei Thanks for the Spark code reference. As I said earlier, using thread pool does make sense to me. I just want to double check the behavior in iceberg-mr module. Hence like to double check with folks with more context.
   
   My 2nd part of the comment is not addressed. please take a look.
   ```
   public static String[] blockLocations(FileIO io, CombinedScanTask task)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779714028



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -37,9 +40,20 @@ private FlinkSplitGenerator() {
   static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
     List<CombinedScanTask> tasks = tasks(table, context);
     FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
-    }
+    boolean exposeLocality = context.exposeLocality();
+
+    Tasks.range(tasks.size())
+        .stopOnFailure()
+        .executeWith(exposeLocality ? ThreadPools.getWorkerPool() : null)
+        .run(index -> {
+          CombinedScanTask task = tasks.get(index);
+          String[] hosts = new String[0];

Review comment:
       Such is the case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779844460



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -225,7 +227,8 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) {
         int maxInferParallelism = readableConfig.get(FlinkConfigOptions
             .TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
-        Preconditions.checkState(maxInferParallelism >= 1,
+        Preconditions.checkState(
+            maxInferParallelism >= 1,

Review comment:
       Nit: It seems like this change isn't updating anything.
   
   Can we ensure we don't update a line / cause a diff where there's no change in the source code? This greatly helps people who maintain forks and have to `git cherry-pick` most commits.

##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +250,16 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    private boolean localityEnabled() {
+      FileIO fileIO = table.io();
+      if (fileIO instanceof HadoopFileIO) {
+        Boolean localityConfig = readableConfig.get(FlinkConfigOptions
+            .TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+        return localityConfig != null ? localityConfig : true;

Review comment:
       Nit / open question:
   
   On the subject of having a default value, it seems like the default for `HadoopFileIO` is `true`.
   
   Should we also check that the path has a scheme that would make sense with locality enabled (e.g. the user isn't using HadoopFileIO with s3a or some other object store underneath)? Sort of like how Spark has the `FILESYSTEM_SUPPORT_LOCALITY` check?
   
   I'm marking this as an open question, as it might be difficult to pass this information through, but I know some people who use Azure or even GCS who use the `HadoopFileIO` with the shim for their object storage (whatever is similar to `s3a` for their object storage), and they would not benefit from a default value of `true`.

##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
##########
@@ -19,28 +19,19 @@
 
 package org.apache.iceberg.flink.source;
 
-import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 
-/**
- * TODO Implement {@link LocatableInputSplit}.
- */
-public class FlinkInputSplit implements InputSplit {
+public class FlinkInputSplit extends LocatableInputSplit {
 
-  private final int splitNumber;
   private final CombinedScanTask task;
 
-  FlinkInputSplit(int splitNumber, CombinedScanTask task) {
-    this.splitNumber = splitNumber;
+  FlinkInputSplit(int splitNumber, CombinedScanTask task, String[] hosts) {

Review comment:
       What value will `hosts` be if locality is disabled? Simply `null`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r781726733



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
##########
@@ -178,6 +179,38 @@ public void testInferedParallelism() throws IOException {
     Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
   }
 
+  @Test
+  public void testExposeLocality() throws Exception {

Review comment:
       @hililiwei How does Flink code base test this feature?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r780850848



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
##########
@@ -178,6 +179,38 @@ public void testInferedParallelism() throws IOException {
     Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
   }
 
+  @Test
+  public void testExposeLocality() throws Exception {
+    Table table =
+        catalog.createTable(TableIdentifier.of("default", "t"), TestFixtures.SCHEMA, TestFixtures.SPEC);
+
+    TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 10, 0L);
+    expectedRecords.forEach(expectedRecord -> expectedRecord.set(2, "2020-03-20"));
+
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+    DataFile dataFile = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), expectedRecords);
+    helper.appendToTable(dataFile);
+
+    // test sql api
+    Configuration tableConf = getTableEnv().getConfig().getConfiguration();
+    tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO.key(), false);
+
+    List<Row> results = sql("select * from t");
+    org.apache.iceberg.flink.TestHelpers.assertRecords(results, expectedRecords, TestFixtures.SCHEMA);

Review comment:
       nit: import




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r780426329



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +258,24 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    boolean localityEnabled() {
+      FileIO fileIO = table.io();
+      if (fileIO instanceof HadoopFileIO) {
+        Boolean localityConfig = readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);

Review comment:
       I think this should return early if the value of this config is `false`. That way you don't needlessly attempt to create a file system when locality won't be used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r780428020



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -46,7 +46,7 @@
  */
 public class SparkReadConf {
 
-  private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");

Review comment:
       Let's not change Spark here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r778993369



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -19,15 +19,11 @@
 
 package org.apache.iceberg.flink;
 
-
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
 public class FlinkConfigOptions {
 
-  private FlinkConfigOptions() {

Review comment:
       I think we should revert this change. Iceberg coding style has the private constructor at the beginning. see `TableProperties` class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779659099



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
##########
@@ -161,6 +162,10 @@ long limit() {
     return limit;
   }
 
+  boolean locality() {

Review comment:
       this should also be renamed to `exposeLocality`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779003457



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +36,14 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_INFER_SOURCE_LOCALITY =
+      ConfigOptions.key("table.exec.iceberg.infer-source-locality")

Review comment:
       this is not a `infer`, which means calculating sth based on other information. We need a more accurate config name. I am thinking `expose-split-locality-info`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r776770808



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -38,7 +48,22 @@ private FlinkSplitGenerator() {
     List<CombinedScanTask> tasks = tasks(table, context);
     FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
     for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
+      Set<String> hosts = Sets.newHashSet();
+      CombinedScanTask combinedScanTask = tasks.get(i);
+      combinedScanTask.files().forEach(fileScanTask -> {
+        try {
+          final FileSystem fs = new HadoopFileSystem(DistributedFileSystem.get(new Configuration()));

Review comment:
       - I added this flag to the Build of FlinkSource  and determined the value of this flag based on the FS and configuration. But I'm quit not sure it's appropriate to add a flag there.
   - `Util.blockLocations`  is indeed a better choice.
   
   PTAL, thx.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r780426329



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +258,24 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    boolean localityEnabled() {
+      FileIO fileIO = table.io();
+      if (fileIO instanceof HadoopFileIO) {
+        Boolean localityConfig = readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);

Review comment:
       I think this should return early if the value of this config is `false`. That way you don't needlessly attempt to create a file system when locality won't be used.

##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -195,6 +205,7 @@ public FlinkInputFormat buildFormat() {
       } else {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
       }
+      contextBuilder.exposeLocality(localityEnabled());

Review comment:
       I think it should be possible to override `exposeLocality` in this builder so that you can set it differently for different sources. Keeping a boolean in this builder and passing that as an override for the environment property in `localityEnabled()` should work.

##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -46,7 +46,7 @@
  */
 public class SparkReadConf {
 
-  private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");

Review comment:
       Let's not change Spark here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r780678570



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -46,7 +46,7 @@
  */
 public class SparkReadConf {
 
-  private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r780849993



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -157,6 +168,12 @@ public Builder streaming(boolean streaming) {
       return this;
     }
 
+    public Builder exposeLocality(boolean newExposeLocality) {
+      contextBuilder.exposeLocality(newExposeLocality);

Review comment:
       we set exposeLocality to contextBuilder in the `buildFormat` method. we probably don't need to do it here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r780850848



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
##########
@@ -178,6 +179,38 @@ public void testInferedParallelism() throws IOException {
     Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
   }
 
+  @Test
+  public void testExposeLocality() throws Exception {
+    Table table =
+        catalog.createTable(TableIdentifier.of("default", "t"), TestFixtures.SCHEMA, TestFixtures.SPEC);
+
+    TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 10, 0L);
+    expectedRecords.forEach(expectedRecord -> expectedRecord.set(2, "2020-03-20"));
+
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+    DataFile dataFile = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), expectedRecords);
+    helper.appendToTable(dataFile);
+
+    // test sql api
+    Configuration tableConf = getTableEnv().getConfig().getConfiguration();
+    tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO.key(), false);
+
+    List<Row> results = sql("select * from t");
+    org.apache.iceberg.flink.TestHelpers.assertRecords(results, expectedRecords, TestFixtures.SCHEMA);

Review comment:
       nit: import




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r786967990



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -19,7 +19,6 @@
 
 package org.apache.iceberg.flink;
 
-

Review comment:
       Can you revert this unnecessary whitespace change?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r776882113



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +36,14 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_INFER_SOURCE_LOCALITY =
+      ConfigOptions.key("table.exec.iceberg.infer-source-locality")
+          .booleanType()
+          .defaultValue(true)

Review comment:
       Is it possible to remove the default here? The purpose of this is to override the default (hdfs=on, everything else=off) and disable locality in splits, so I don't think that adding a default here makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r776883074



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
##########
@@ -226,7 +226,7 @@ private void writeRecords(List<Record> records) throws IOException {
   }
 
   private StreamingMonitorFunction createFunction(ScanContext scanContext) {
-    return new StreamingMonitorFunction(TestTableLoader.of(tableDir.getAbsolutePath()), scanContext);
+    return new StreamingMonitorFunction(TestTableLoader.of(tableDir.getAbsolutePath()), scanContext, false);

Review comment:
       Where is this called from? Is it okay to do this or should locality be part of the scan context?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r799682062



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +265,27 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    boolean localityEnabled() {

Review comment:
       This should be private?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r780029735



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +250,16 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    private boolean localityEnabled() {
+      FileIO fileIO = table.io();
+      if (fileIO instanceof HadoopFileIO) {
+        Boolean localityConfig = readableConfig.get(FlinkConfigOptions
+            .TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+        return localityConfig != null ? localityConfig : true;

Review comment:
       I thought the [discussion](https://lists.apache.org/thread/cknhjhvf63vp7xq7gwdbj9mlp0z0wj8b) is to use S3FileIO for Azure or GCS because they implements S3 like API.
   
   With that said, I do agree to check the `scheme` like the Spark `Reader` class does. but @hililiwei we shouldn't need to open an InputFile for that purpose. We can just follow the Spark code.
   ```
    FileSystem fs = new Path(table.location()).getFileSystem(conf);
           fsscheme = fs.getScheme().toLowerCase(Locale.ENGLISH);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779652985



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +39,11 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO =
+      ConfigOptions.key("table.exec.iceberg.expose-split-locality-info")
+          .booleanType()
+          .noDefaultValue()
+          .withDescription("Controls whether to report locality information to Flink while allocating input " +
+              "partitions.");

Review comment:
       I assume this is broken into two lines because the sentence exceeds the checkstyle length limit. I also think we can make the description more concise.
   
   ```
   Expose locality host information to use Flink's locality aware split assigner.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779701565



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +39,11 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO =
+      ConfigOptions.key("table.exec.iceberg.expose-split-locality-info")
+          .booleanType()
+          .noDefaultValue()

Review comment:
       oh. I got it now. We are retrieving the config as a boxed type `Boolean`. Yeah. In that case the default value is not needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779004416



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -70,11 +74,14 @@ public static Builder forRowData() {
    * Source builder to build {@link DataStream}.
    */
   public static class Builder {
+    private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");

Review comment:
       we need a better name for the constant. maybe `FILE_SYSTEM_SUPPORT_LOCALITY`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r781694155



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +258,24 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    boolean localityEnabled() {
+      FileIO fileIO = table.io();
+      if (fileIO instanceof HadoopFileIO) {
+        Boolean localityConfig = readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r780850803



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
##########
@@ -178,6 +179,38 @@ public void testInferedParallelism() throws IOException {
     Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
   }
 
+  @Test
+  public void testExposeLocality() throws Exception {

Review comment:
       It only verifies data read. this doesn't really verify locality aware assignment. Ideally, we need 2 files stored in 2 hosts with HDFS and run a cluster of TMs on those two hosts. Then we can verify the assigned files/splits are from the same host. But I am not sure if this can be done in a unit test setup.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r781689702



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -157,6 +168,12 @@ public Builder streaming(boolean streaming) {
       return this;
     }
 
+    public Builder exposeLocality(boolean newExposeLocality) {
+      contextBuilder.exposeLocality(newExposeLocality);

Review comment:
       removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r784540128



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
##########
@@ -178,6 +179,38 @@ public void testInferedParallelism() throws IOException {
     Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
   }
 
+  @Test
+  public void testExposeLocality() throws Exception {

Review comment:
       this is for unit test the assigner, not an e2e test of the whole thing




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r800285135



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +265,27 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    boolean localityEnabled() {

Review comment:
       done. Test case was also modified.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779650842



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +39,11 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO =
+      ConfigOptions.key("table.exec.iceberg.expose-split-locality-info")
+          .booleanType()
+          .noDefaultValue()

Review comment:
       I think we should have a default value of `false` to be compatible with the existing behavior




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r776912415



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -195,8 +201,15 @@ public FlinkInputFormat buildFormat() {
       } else {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
       }
-
-      return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build());
+      localityPreferred = localityEnabled();
+
+      return new FlinkInputFormat(

Review comment:
       got it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r776882858



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -22,25 +22,36 @@
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
+import java.util.stream.IntStream;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 class FlinkSplitGenerator {
   private FlinkSplitGenerator() {
   }
 
-  static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
+  static FlinkInputSplit[] createInputSplits(Table table, ScanContext context, boolean localityPreferred) {
     List<CombinedScanTask> tasks = tasks(table, context);
-    FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
-    }
-    return splits;
+
+    return IntStream.range(0, tasks.size()).parallel().mapToObj(i -> {

Review comment:
       Please use `Tasks` for parallelism like the Spark code, not Java parallelism. That's much more reliable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r776882536



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -70,12 +74,14 @@ public static Builder forRowData() {
    * Source builder to build {@link DataStream}.
    */
   public static class Builder {
+    private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");
+    private final ScanContext.Builder contextBuilder = ScanContext.builder();

Review comment:
       Can you move this back to where it was?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r778557082



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -19,15 +19,11 @@
 
 package org.apache.iceberg.flink;
 
-
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
 public class FlinkConfigOptions {
 
-  private FlinkConfigOptions() {

Review comment:
       It wasn't removed,  the code auto-formatted moved it below.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779974831



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +250,16 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    private boolean localityEnabled() {
+      FileIO fileIO = table.io();
+      if (fileIO instanceof HadoopFileIO) {
+        Boolean localityConfig = readableConfig.get(FlinkConfigOptions
+            .TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+        return localityConfig != null ? localityConfig : true;

Review comment:
       > I know some people who use Azure or even GCS who use the HadoopFileIO with the shim for their object storage
   
   If this is the case, I think we should check the `scheme` like spark. @stevenzwu , could you please take a look at this?
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r780679067



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +258,24 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    boolean localityEnabled() {
+      FileIO fileIO = table.io();
+      if (fileIO instanceof HadoopFileIO) {
+        Boolean localityConfig = readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+
+        HadoopFileIO hadoopFileIO = (HadoopFileIO) fileIO;
+        try {
+          String scheme = new Path(table.location()).getFileSystem(hadoopFileIO.getConf()).getScheme();
+          boolean defaultValue = FILE_SYSTEM_SUPPORT_LOCALITY.contains(scheme);

Review comment:
       Because fileio is an instance of HadoopFileIO, when we call the `HadoopFileIO#newInputFile` method, does this still not addressed? It still depends on `org.apache.hadoop.`
   
   https://github.com/apache/iceberg/blob/80fd7fd4f0ec365f3d002aaae40329bb7eed77d4/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java#L57-L61
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r799680233



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +265,27 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    boolean localityEnabled() {
+      Boolean localityConfig =
+          this.exposeLocality != null ? this.exposeLocality :
+              readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+
+      if (localityConfig != null && !localityConfig) {
+        return false;
+      }
+      FileIO fileIO = table.io();
+      if (fileIO instanceof HadoopFileIO) {
+        HadoopFileIO hadoopFileIO = (HadoopFileIO) fileIO;
+        try {
+          String scheme = new Path(table.location()).getFileSystem(hadoopFileIO.getConf()).getScheme();
+          return FILE_SYSTEM_SUPPORT_LOCALITY.contains(scheme);
+        } catch (IOException e) {
+          LOG.warn("Failed to determine whether the locality information can be exposed for table: {}", table, e);
+        }
+      }
+      return false;

Review comment:
       Style: this also needs a new line to separate the control flow from this statement.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#issuecomment-1038576600


   Thanks, @hililiwei!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r776912948



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -22,25 +22,36 @@
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
+import java.util.stream.IntStream;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 class FlinkSplitGenerator {
   private FlinkSplitGenerator() {
   }
 
-  static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
+  static FlinkInputSplit[] createInputSplits(Table table, ScanContext context, boolean localityPreferred) {
     List<CombinedScanTask> tasks = tasks(table, context);
-    FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
-    }
-    return splits;
+
+    return IntStream.range(0, tasks.size()).parallel().mapToObj(i -> {

Review comment:
       I read the Tasks, and it was well-designed and so impressive. Thanks for guidance.

##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -22,25 +22,36 @@
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
+import java.util.stream.IntStream;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 class FlinkSplitGenerator {
   private FlinkSplitGenerator() {
   }
 
-  static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
+  static FlinkInputSplit[] createInputSplits(Table table, ScanContext context, boolean localityPreferred) {
     List<CombinedScanTask> tasks = tasks(table, context);
-    FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
-    }
-    return splits;
+
+    return IntStream.range(0, tasks.size()).parallel().mapToObj(i -> {

Review comment:
       I read the `Tasks`, and it was well-designed and so impressive. Thanks for guidance.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r781692935



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
##########
@@ -178,6 +179,38 @@ public void testInferedParallelism() throws IOException {
     Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
   }
 
+  @Test
+  public void testExposeLocality() throws Exception {

Review comment:
       Yes, +1, ideally it is, but I haven't found a way to achieve it.So here only test whether it works properly when `table.exec.iceberg.expose-split-locality-info` is set to false.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#issuecomment-1003202204


   This looks like it is getting close. I'd like @openinx, @stevenzwu, or @kbendick to comment on how this should be configured, though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r776882614



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -195,8 +201,15 @@ public FlinkInputFormat buildFormat() {
       } else {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
       }
-
-      return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build());
+      localityPreferred = localityEnabled();
+
+      return new FlinkInputFormat(

Review comment:
       We don't typically put each argument on a separate line. Instead, we just put as many as we can on each line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r776882676



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -195,8 +201,15 @@ public FlinkInputFormat buildFormat() {
       } else {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
       }
-
-      return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build());
+      localityPreferred = localityEnabled();

Review comment:
       Should be `this.localityPreferred = ...` because this is assigning to an instance field, not a local variable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779967330



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -225,7 +227,8 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) {
         int maxInferParallelism = readableConfig.get(FlinkConfigOptions
             .TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
-        Preconditions.checkState(maxInferParallelism >= 1,
+        Preconditions.checkState(
+            maxInferParallelism >= 1,

Review comment:
       thx, will revert.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779019623



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -37,9 +40,20 @@ private FlinkSplitGenerator() {
   static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
     List<CombinedScanTask> tasks = tasks(table, context);
     FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
-    }
+    boolean localityPreferred = context.locality();
+
+    Tasks.range(tasks.size())
+        .stopOnFailure()
+        .executeWith(localityPreferred ? ThreadPools.getWorkerPool() : null)

Review comment:
       this avoids the thread pool when locality is disabled, which is better. But I want to double check if we need to use thread pool here. I saw `Util.blockLocations` eventually calls `getFileStatus` which probably will be a network call to hadoop name node. So thread pool does make sense to me.
   
   Looking at the current caller (`IcebergSplit.getLocations()`) of `Util.blockLocations`  in iceberg-mr module. It doesn't use thread pool. Loop in @rdsr  and @rdblue who might have more context here.
   
   @hililiwei it might be simpler to call this API from `Util` class
   ```
     public static String[] blockLocations(FileIO io, CombinedScanTask task)
   ```
   
   This is how Spark code calls it.
   ```
         if (localityPreferred) {
           Table table = tableBroadcast.value();
           this.preferredLocations = Util.blockLocations(table.io(), task);
         } else {
           this.preferredLocations = HadoopInputFile.NO_LOCATION_PREFERENCE;
         }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779654287



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +261,24 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    private boolean localityEnabled() {
+      InputFile file = table.io().newInputFile(table.location());

Review comment:
       That is expected. Local table uses HadoopFileIO. local file split has host information




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779702761



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -37,9 +40,20 @@ private FlinkSplitGenerator() {
   static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
     List<CombinedScanTask> tasks = tasks(table, context);
     FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
-    }
+    boolean exposeLocality = context.exposeLocality();
+
+    Tasks.range(tasks.size())
+        .stopOnFailure()
+        .executeWith(exposeLocality ? ThreadPools.getWorkerPool() : null)
+        .run(index -> {
+          CombinedScanTask task = tasks.get(index);
+          String[] hosts = new String[0];

Review comment:
       should we just initialize it to `null` and remove the `? : ` part later?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779698824



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -37,9 +40,20 @@ private FlinkSplitGenerator() {
   static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
     List<CombinedScanTask> tasks = tasks(table, context);
     FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
-    }
+    boolean localityPreferred = context.locality();
+
+    Tasks.range(tasks.size())
+        .stopOnFailure()
+        .executeWith(localityPreferred ? ThreadPools.getWorkerPool() : null)

Review comment:
       @hililiwei sorry for the confusion. I misread the code. so ignore the 2nd part of the comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779652985



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +39,11 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO =
+      ConfigOptions.key("table.exec.iceberg.expose-split-locality-info")
+          .booleanType()
+          .noDefaultValue()
+          .withDescription("Controls whether to report locality information to Flink while allocating input " +
+              "partitions.");

Review comment:
       I assume this is broken into two lines because the sentence exceeds the checkstyle length limit. I also think we can make the description more concise.
   
   ```
   Expose split host information to use Flink's locality aware split assigner.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779694429



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +39,11 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO =
+      ConfigOptions.key("table.exec.iceberg.expose-split-locality-info")
+          .booleanType()
+          .noDefaultValue()

Review comment:
       There was a [comment](https://github.com/apache/iceberg/pull/3817#discussion_r776882113) by @rdblue , about the default value.
   > Is it possible to remove the default here? The purpose of this is to override the default (hdfs=on, everything else=off) and disable locality in splits, so I don't think that adding a default here makes sense.
   
   in-add, If there is a `false` default value, may unable to determine whether `false` is set by user.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r780425675



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +258,24 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    boolean localityEnabled() {
+      FileIO fileIO = table.io();
+      if (fileIO instanceof HadoopFileIO) {
+        Boolean localityConfig = readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+
+        HadoopFileIO hadoopFileIO = (HadoopFileIO) fileIO;
+        try {
+          String scheme = new Path(table.location()).getFileSystem(hadoopFileIO.getConf()).getScheme();
+          boolean defaultValue = FILE_SYSTEM_SUPPORT_LOCALITY.contains(scheme);

Review comment:
       It would be nice to avoid the dependency on `Path` in this class - so people who don't have HDFS requirements don't need the dependency, you might be able to do the following to get the same result.
   
   If you call `((HadoopFileIO) fileIO).newInputFile(table.location()).location()`, I believe you can check the scheme on the location sting returned by `InputFile`.
   
   There could very possibly be.a cleaner way to do it. But if this class doesn't already pull in `org.apache.hadoop.*`, then it would be best to avoid that if possible so that users who run only on cloud environments for example don't have to have hadoop jars on their user classpath.
   
   In this case, just be having the import, even if it's `S3FileIO`, users still need the hadoop jars on the classpath (which is sometimes a pain on Flink).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779638481



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -19,15 +19,11 @@
 
 package org.apache.iceberg.flink;
 
-
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
 public class FlinkConfigOptions {
 
-  private FlinkConfigOptions() {

Review comment:
       updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r776916430



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
##########
@@ -226,7 +226,7 @@ private void writeRecords(List<Record> records) throws IOException {
   }
 
   private StreamingMonitorFunction createFunction(ScanContext scanContext) {
-    return new StreamingMonitorFunction(TestTableLoader.of(tableDir.getAbsolutePath()), scanContext);
+    return new StreamingMonitorFunction(TestTableLoader.of(tableDir.getAbsolutePath()), scanContext, false);

Review comment:
       I think it's a good idea to make it part of the Scan context.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r776943708



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +36,14 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_INFER_SOURCE_LOCALITY =
+      ConfigOptions.key("table.exec.iceberg.infer-source-locality")
+          .booleanType()
+          .defaultValue(true)

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r800285161



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +265,27 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    boolean localityEnabled() {
+      Boolean localityConfig =
+          this.exposeLocality != null ? this.exposeLocality :
+              readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+
+      if (localityConfig != null && !localityConfig) {
+        return false;
+      }
+      FileIO fileIO = table.io();

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r776943648



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
##########
@@ -226,7 +226,7 @@ private void writeRecords(List<Record> records) throws IOException {
   }
 
   private StreamingMonitorFunction createFunction(ScanContext scanContext) {
-    return new StreamingMonitorFunction(TestTableLoader.of(tableDir.getAbsolutePath()), scanContext);
+    return new StreamingMonitorFunction(TestTableLoader.of(tableDir.getAbsolutePath()), scanContext, false);

Review comment:
       I've tried in the second commit, please help to review.Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r778410134



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
##########
@@ -19,20 +19,17 @@
 
 package org.apache.iceberg.flink.source;
 
-import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 
-/**
- * TODO Implement {@link LocatableInputSplit}.
- */
-public class FlinkInputSplit implements InputSplit {
+public class FlinkInputSplit extends LocatableInputSplit {
 
   private final int splitNumber;

Review comment:
       we can remove `splitNumber` since it is in the base class now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779007158



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -107,6 +114,11 @@ public Builder limit(long newLimit) {
       return this;
     }
 
+    public Builder locality(boolean locality) {

Review comment:
       similarly we can improve the naming once we settled on the config name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779641321



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -107,6 +114,11 @@ public Builder limit(long newLimit) {
       return this;
     }
 
+    public Builder locality(boolean locality) {

Review comment:
       how about 'exposeLocality'?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779674905



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +261,24 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    private boolean localityEnabled() {
+      InputFile file = table.io().newInputFile(table.location());

Review comment:
       ok, got it.thx.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r785606407



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
##########
@@ -178,6 +179,38 @@ public void testInferedParallelism() throws IOException {
     Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
   }
 
+  @Test
+  public void testExposeLocality() throws Exception {

Review comment:
       
   Yes, during the test phase, I printed some logs to see if it was working properly, such as this one:
   
   ![2022-1-17-2](https://user-images.githubusercontent.com/59213263/149700947-bea570d1-8cb3-4c16-9dbb-e690eb69535d.PNG)
   
   ![2022-1-17](https://user-images.githubusercontent.com/59213263/149699989-0dfb1440-0896-407f-9c10-775d887e6179.PNG)
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r781818068



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
##########
@@ -178,6 +179,38 @@ public void testInferedParallelism() throws IOException {
     Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
   }
 
+  @Test
+  public void testExposeLocality() throws Exception {

Review comment:
       Seems to be tested by manually specifying the hostname. more refer: https://github.com/apache/flink/blob/master/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
   Or try to test it by introducing miniDFS, but the project doesn't seem willing to introduce it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r787356981



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -195,6 +211,7 @@ public FlinkInputFormat buildFormat() {
       } else {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
       }
+      contextBuilder.exposeLocality(localityEnabled());

Review comment:
       https://github.com/apache/iceberg/blob/52808f5740bfac4943da5be63ed3380aa865d981/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java#L269-L272
   It is used here to determine whether to use flink config.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r799680024



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +265,27 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    boolean localityEnabled() {
+      Boolean localityConfig =
+          this.exposeLocality != null ? this.exposeLocality :
+              readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+
+      if (localityConfig != null && !localityConfig) {
+        return false;
+      }
+      FileIO fileIO = table.io();

Review comment:
       Style: this needs a newline above to separate the control flow from the following statement.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r799680962



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +265,27 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    boolean localityEnabled() {
+      Boolean localityConfig =
+          this.exposeLocality != null ? this.exposeLocality :
+              readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+
+      if (localityConfig != null && !localityConfig) {

Review comment:
       It would be more clear when checking a variable if it has a name like `localityEnabled`. That is still an appropriate name and it makes this logic easier to read.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779639912



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +36,14 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_INFER_SOURCE_LOCALITY =
+      ConfigOptions.key("table.exec.iceberg.infer-source-locality")

Review comment:
       Better. Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779006260



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +261,24 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    private boolean localityEnabled() {
+      InputFile file = table.io().newInputFile(table.location());

Review comment:
       this probably creates orphaned InputFile. we can probably just check if the `FileIO` is an instance of `HadoopFileIO`. Then we also don't need the `LOCALITY_WHITELIST_FS` above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779687806



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -37,9 +40,20 @@ private FlinkSplitGenerator() {
   static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
     List<CombinedScanTask> tasks = tasks(table, context);
     FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
-    }
+    boolean localityPreferred = context.locality();
+
+    Tasks.range(tasks.size())
+        .stopOnFailure()
+        .executeWith(localityPreferred ? ThreadPools.getWorkerPool() : null)

Review comment:
       @stevenzwu sorry, I didn't reply 2nd part in time because I'm trying to understand what you mean. But I'm really confused now, the current method I used just is `blockLocations(FileIO io, CombinedScanTask task)`, what should I do?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779689262



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
##########
@@ -40,4 +39,11 @@ private FlinkConfigOptions() {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO =
+      ConfigOptions.key("table.exec.iceberg.expose-split-locality-info")
+          .booleanType()
+          .noDefaultValue()
+          .withDescription("Controls whether to report locality information to Flink while allocating input " +
+              "partitions.");

Review comment:
       ok.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r780425675



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -247,6 +258,24 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    boolean localityEnabled() {
+      FileIO fileIO = table.io();
+      if (fileIO instanceof HadoopFileIO) {
+        Boolean localityConfig = readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+
+        HadoopFileIO hadoopFileIO = (HadoopFileIO) fileIO;
+        try {
+          String scheme = new Path(table.location()).getFileSystem(hadoopFileIO.getConf()).getScheme();
+          boolean defaultValue = FILE_SYSTEM_SUPPORT_LOCALITY.contains(scheme);

Review comment:
       It would be nice to avoid the dependency on `Path` in this class - so people who don't have hadoop needs don't need the dependency, you might be able to do the following to get the same result.
   
   If you call `((HadoopFileIO) fileIO).newInputFile(table.location()).location()`, I believe you can check the scheme on the location sting returned by `InputFile`.
   
   There could very possibly be.a cleaner way to do it. But if this class doesn't already pull in `org.apache.hadoop.*`, then it would be best to avoid that if possible so that users who run only on cloud environments for example don't have to have HDFS on their classpath.
   
   In this case, just be having the import, even if it's `S3FileIO`, users still need the hadoop jars on the classpath (which is sometimes a pain on Flink).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#issuecomment-1007072575


   > Thanks for taking this on @hililiwei!
   > 
   > I left a few nits / open questions.
   > 
   > Can we also add a test that disables locality and then performs a read to ensure that the new `FlinkInputSplit` works correctly when locality is disabled?
   
   @kbendick Thanks for your review. I'll try to add a unit test, and keep reported when get any progress.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r779640655



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -70,11 +74,14 @@ public static Builder forRowData() {
    * Source builder to build {@link DataStream}.
    */
   public static class Builder {
+    private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");

Review comment:
       Modified along with the naming in spark.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r784540128



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
##########
@@ -178,6 +179,38 @@ public void testInferedParallelism() throws IOException {
     Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
   }
 
+  @Test
+  public void testExposeLocality() throws Exception {

Review comment:
       this is for unit test the assigner, not an e2e test of the whole thing. Except for this lack of e2e test, PR overall looks good to me. Have you tested this in a hadoop cluster setup manually?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r786969209



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -195,6 +211,7 @@ public FlinkInputFormat buildFormat() {
       } else {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
       }
+      contextBuilder.exposeLocality(localityEnabled());

Review comment:
       Where is the `exposeLocality` variable used? If an explicit value is passed to this builder, it should be passed into `localityEnabled()` so that method can use the setting, but only if the underlying file system is `hdfs`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r776513086



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -38,7 +48,22 @@ private FlinkSplitGenerator() {
     List<CombinedScanTask> tasks = tasks(table, context);
     FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
     for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
+      Set<String> hosts = Sets.newHashSet();
+      CombinedScanTask combinedScanTask = tasks.get(i);
+      combinedScanTask.files().forEach(fileScanTask -> {
+        try {
+          final FileSystem fs = new HadoopFileSystem(DistributedFileSystem.get(new Configuration()));

Review comment:
       This should do the same thing that Spark does. Specifically:
   * There should be a flag to enable locality
   * The flag should be defaulted based on whether the scheme identifies a FS that has locality
   * `new Configuration()` should not be called
   * Use `Util.blockLocations` to get block locations
   * Ideally, parallelize the block lookup per split because it can get expensive




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3817: Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3817:
URL: https://github.com/apache/iceberg/pull/3817#discussion_r776882400



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
##########
@@ -77,13 +79,13 @@ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
     tableLoader.open();
     try (TableLoader loader = tableLoader) {
       Table table = loader.loadTable();
-      return FlinkSplitGenerator.createInputSplits(table, context);
+      return FlinkSplitGenerator.createInputSplits(table, context, localityPreferred);
     }
   }
 
   @Override
   public InputSplitAssigner getInputSplitAssigner(FlinkInputSplit[] inputSplits) {
-    return new DefaultInputSplitAssigner(inputSplits);
+    return new LocatableInputSplitAssigner(inputSplits);

Review comment:
       Shouldn't this check `localityPreferred` and return a `DefaultInputSplitAssigner` if it is false? That way we get the same behavior for all file stores other than HDFS.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org