You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "singhpk234 (via GitHub)" <gi...@apache.org> on 2023/01/24 01:08:20 UTC

[GitHub] [iceberg] singhpk234 opened a new pull request, #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

singhpk234 opened a new pull request, #6655:
URL: https://github.com/apache/iceberg/pull/6655

   ### About the change
   
   Presently when determining the locality preference we check if the fileIO if it's Hadoop file IO and then only we perform subsequent checks to determine locality but we may also be using ResolvingFileIO which might be wrapping HadoopFileIO inside it and hence in that case, we might actually wanna proceed with inputFile creation and checking the scheme etc to determine locality enabled.
   
   
   ### Testing 
   
   TODO


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1096219556


##########
core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java:
##########
@@ -168,6 +168,10 @@ private static String implFromLocation(String location) {
     return SCHEME_TO_FILE_IO.getOrDefault(scheme(location), FALLBACK_IMPL);
   }
 
+  public Class<? extends FileIO> ioClass(String location) {
+    return io(location).getClass();

Review Comment:
   Sorry did not fully get what you mean here, if a FileIO extends HadoopFileIO, we can still get its class in this way and then compare it later to see if it is subclass right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1085895710


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java:
##########
@@ -67,11 +69,15 @@ public boolean caseSensitive() {
   }
 
   public boolean localityEnabled() {
-    if (table.io() instanceof HadoopFileIO) {
-      HadoopInputFile file = (HadoopInputFile) table.io().newInputFile(table.location());
-      String scheme = file.getFileSystem().getScheme();
-      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
-      return PropertyUtil.propertyAsBoolean(readOptions, SparkReadOptions.LOCALITY, defaultValue);
+    if (table.io() instanceof HadoopFileIO || table.io() instanceof ResolvingFileIO) {
+      InputFile file = table.io().newInputFile(table.location());
+      if (file instanceof HadoopInputFile) {
+        String scheme = ((HadoopInputFile) file).getFileSystem().getScheme();
+        boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+        return PropertyUtil.propertyAsBoolean(readOptions, SparkReadOptions.LOCALITY, defaultValue);
+      } else {
+        return false;

Review Comment:
   I am fine with the current way since `.io()` is quite short. Up to you Prashant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#issuecomment-1416520953

   I am not sure I understood why not simply use `Util$mayHaveBlockLocations` in Spark.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1088296400


##########
core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java:
##########
@@ -164,7 +164,7 @@ private FileIO io(String location) {
     return io;
   }
 
-  private static String implFromLocation(String location) {
+  public static String implFromLocation(String location) {

Review Comment:
   @rdblue do you have concern about making this public?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1094215115


##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -84,10 +84,16 @@ public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {
     return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
   }
 
+  public static boolean isHDFSLocation(FileIO io, String location) {
+    return io instanceof HadoopFileIO
+        || (io instanceof ResolvingFileIO
+            && ResolvingFileIO.implFromLocation(location).equals(HadoopFileIO.class.getName()));
+  }
+
   private static String[] blockLocations(FileIO io, ContentScanTask<?> task) {
-    InputFile inputFile = io.newInputFile(task.file().path().toString());
-    if (inputFile instanceof HadoopInputFile) {
-      HadoopInputFile hadoopInputFile = (HadoopInputFile) inputFile;
+    String location = task.file().path().toString();
+    if (isHDFSLocation(io, location)) {
+      HadoopInputFile hadoopInputFile = (HadoopInputFile) io.newInputFile(location);

Review Comment:
   ACk made the changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1095169075


##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -84,10 +88,38 @@ public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {
     return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
   }
 
+  public static boolean usesHadoopFileIO(FileIO io, String location) {
+    if (io instanceof HadoopFileIO) {
+      return true;
+
+    } else if (io instanceof ResolvingFileIO) {
+      ResolvingFileIO resolvingFileIO = (ResolvingFileIO) io;
+      return resolvingFileIO.ioClass(location).isAssignableFrom(HadoopFileIO.class);
+
+    } else {
+      return false;
+    }
+  }
+
+  public static boolean mayHaveBlockLocations(FileIO io, String location) {

Review Comment:
   I'd add the new methods after the `blockLocations` method below. The new code split `blockLocations` methods, which are related. It would be easier to have those next to each other. Otherwise, one would need to jump through the file.



##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -84,10 +88,38 @@ public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {
     return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
   }
 
+  public static boolean usesHadoopFileIO(FileIO io, String location) {
+    if (io instanceof HadoopFileIO) {
+      return true;
+
+    } else if (io instanceof ResolvingFileIO) {
+      ResolvingFileIO resolvingFileIO = (ResolvingFileIO) io;
+      return resolvingFileIO.ioClass(location).isAssignableFrom(HadoopFileIO.class);
+
+    } else {
+      return false;
+    }
+  }
+
+  public static boolean mayHaveBlockLocations(FileIO io, String location) {
+    if (usesHadoopFileIO(io, location)) {
+      InputFile inputFile = io.newInputFile(location);
+      if (inputFile instanceof HadoopInputFile) {
+        String scheme = ((HadoopInputFile) inputFile).getFileSystem().getScheme();
+        return LOCALITY_WHITELIST_FS.contains(scheme);
+
+      } else {
+        return false;
+      }
+    }
+
+    return false;
+  }
+
   private static String[] blockLocations(FileIO io, ContentScanTask<?> task) {
-    InputFile inputFile = io.newInputFile(task.file().path().toString());
-    if (inputFile instanceof HadoopInputFile) {
-      HadoopInputFile hadoopInputFile = (HadoopInputFile) inputFile;
+    String location = task.file().path().toString();
+    if (usesHadoopFileIO(io, location)) {
+      HadoopInputFile hadoopInputFile = (HadoopInputFile) io.newInputFile(location);

Review Comment:
   Should we keep `instanceof` check to be consistent with `mayHaveBlockLocations` and old code?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java:
##########
@@ -67,14 +62,13 @@ public boolean caseSensitive() {
   }
 
   public boolean localityEnabled() {
-    if (table.io() instanceof HadoopFileIO) {
-      HadoopInputFile file = (HadoopInputFile) table.io().newInputFile(table.location());
-      String scheme = file.getFileSystem().getScheme();
-      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+    if (Util.usesHadoopFileIO(table.io(), table.location())) {

Review Comment:
   Can this be simplified like this?
   
   ```
   boolean defaultValue = Util.mayHaveBlockLocations(table.io(), table.location());
   return PropertyUtil.propertyAsBoolean(readOptions, SparkReadOptions.LOCALITY, defaultValue);
   ```



##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -84,10 +88,38 @@ public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {
     return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
   }
 
+  public static boolean usesHadoopFileIO(FileIO io, String location) {
+    if (io instanceof HadoopFileIO) {
+      return true;
+
+    } else if (io instanceof ResolvingFileIO) {
+      ResolvingFileIO resolvingFileIO = (ResolvingFileIO) io;
+      return resolvingFileIO.ioClass(location).isAssignableFrom(HadoopFileIO.class);

Review Comment:
   Shouldn't this be the other way around?
   
   ```
   HadoopFileIO.class.isAssignableFrom(resolvingFileIO.ioClass(location));
   ```
   https://stackoverflow.com/a/6983596/4108401



##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -84,10 +88,38 @@ public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {
     return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
   }
 
+  public static boolean usesHadoopFileIO(FileIO io, String location) {

Review Comment:
   I think this can be private and we can simply use `mayHaveBlockLocations` in Spark.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi merged PR #6655:
URL: https://github.com/apache/iceberg/pull/6655


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1096228220


##########
core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java:
##########
@@ -168,6 +168,10 @@ private static String implFromLocation(String location) {
     return SCHEME_TO_FILE_IO.getOrDefault(scheme(location), FALLBACK_IMPL);
   }
 
+  public Class<? extends FileIO> ioClass(String location) {
+    return io(location).getClass();

Review Comment:
   Apologies, i see now what you meant, let me make the changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1096374242


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java:
##########
@@ -67,14 +62,13 @@ public boolean caseSensitive() {
   }
 
   public boolean localityEnabled() {
-    if (table.io() instanceof HadoopFileIO) {
-      HadoopInputFile file = (HadoopInputFile) table.io().newInputFile(table.location());
-      String scheme = file.getFileSystem().getScheme();
-      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+    if (Util.usesHadoopFileIO(table.io(), table.location())) {

Review Comment:
   @singhpk234, I think `Util.mayHaveBlockLocations` internally calls `usesHadoopFileIO` so I am not sure I got your point [here](https://github.com/apache/iceberg/pull/6655#discussion_r1095306379). If so, I'd simplify this and make `usesHadoopFileIO` private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1085900059


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java:
##########
@@ -67,11 +69,15 @@ public boolean caseSensitive() {
   }
 
   public boolean localityEnabled() {
-    if (table.io() instanceof HadoopFileIO) {
-      HadoopInputFile file = (HadoopInputFile) table.io().newInputFile(table.location());
-      String scheme = file.getFileSystem().getScheme();
-      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
-      return PropertyUtil.propertyAsBoolean(readOptions, SparkReadOptions.LOCALITY, defaultValue);
+    if (table.io() instanceof HadoopFileIO || table.io() instanceof ResolvingFileIO) {

Review Comment:
   I am a bit concerned for people using `ResolvingFileIO` with this approach, previously we will only create an input file to check locality if it's `HadoopFileIO`, but now if the user is using `ResolvingFileIO ` this operation will be done for every single file even if it is not a HadoopFileIO for the specific location.
   
   I am wondering if we should make `ResolvingFileIO.implFromLocation` a public method so we can know the FileIO used for the location without the need to open input file.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java:
##########
@@ -67,11 +69,15 @@ public boolean caseSensitive() {
   }
 
   public boolean localityEnabled() {
-    if (table.io() instanceof HadoopFileIO) {
-      HadoopInputFile file = (HadoopInputFile) table.io().newInputFile(table.location());
-      String scheme = file.getFileSystem().getScheme();
-      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
-      return PropertyUtil.propertyAsBoolean(readOptions, SparkReadOptions.LOCALITY, defaultValue);
+    if (table.io() instanceof HadoopFileIO || table.io() instanceof ResolvingFileIO) {

Review Comment:
   I am a bit concerned for people using `ResolvingFileIO` with this approach, previously we will only create an input file to check locality if it's `HadoopFileIO`, but now if the user is using `ResolvingFileIO` this operation will be done for every single file even if it is not a HadoopFileIO for the specific location.
   
   I am wondering if we should make `ResolvingFileIO.implFromLocation` a public method so we can know the FileIO used for the location without the need to open input file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1093711041


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java:
##########
@@ -67,7 +67,7 @@ public boolean caseSensitive() {
   }
 
   public boolean localityEnabled() {
-    if (table.io() instanceof HadoopFileIO) {
+    if (Util.isHDFSLocation(table.io(), table.location())) {
       HadoopInputFile file = (HadoopInputFile) table.io().newInputFile(table.location());

Review Comment:
   This bit seems repetitive in Spark and Flink. What about adding a new method to `Util`?
   
   ```
   public static boolean mayHaveBlockLocations(FileIO io, String location) {
     if (usesHadoopFileIO(io, location)) {
       ...
       return LOCALITY_WHITELIST_FS.contains(scheme);
     }
   
     return false;
   }
   ```
   
   I feel it would align well with other methods in `Util` like `blockLocations`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1093714980


##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -84,10 +84,16 @@ public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {
     return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
   }
 
+  public static boolean isHDFSLocation(FileIO io, String location) {

Review Comment:
   I like `usesHadoopFileIO`. I'd also have explicit if else statements as the current formatting is hard to read.
   
   ```
   private static boolean usesHadoopFileIO(FileIO io, String location) {
     if (io instanceof HadoopFileIO) {
       return true;
   
     } else if (io instanceof ResolvingFileIO) {
       ResolvingFileIO resolvingFileIO = (ResolvingFileIO) io;
       return ...
   
     } else {
       return false;
     }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1095304010


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java:
##########
@@ -67,14 +62,13 @@ public boolean caseSensitive() {
   }
 
   public boolean localityEnabled() {
-    if (table.io() instanceof HadoopFileIO) {
-      HadoopInputFile file = (HadoopInputFile) table.io().newInputFile(table.location());
-      String scheme = file.getFileSystem().getScheme();
-      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+    if (Util.usesHadoopFileIO(table.io(), table.location())) {

Review Comment:
   +1, this would match the flink behaviour, i refrained for making them same initially because let's say if my FileIO is S3FileIO and from spark read option i pass locality as true, I would make locality enabled as false and unnecessarily check `mayHaveBlockLocations` , earlier we would have short circuited it and made locality enabled as false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#issuecomment-1416560469

   @singhpk234, oh, now I got what you meant. The new behavior seems reasonable to me for two reasons:
   - It is only triggered if someone explicitly passes an option to enable locality. In Spark, read options is the most specific level of configs used to override all defaults.
   - It allows us to support hybrid use cases when a table stores its metadata in s3 but has some files in HDFS. Right now, the initial check if locality is enabled is based on the table root location, which may be different than some data locations.
   
   Do you think that's reasonable, @singhpk234?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1093713311


##########
core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java:
##########
@@ -164,7 +164,7 @@ private FileIO io(String location) {
     return io;
   }
 
-  private static String implFromLocation(String location) {
+  public static String implFromLocation(String location) {

Review Comment:
   I agree. What about adding a new method called `ioClass` like this?
   
   ```
   public Class<? extends FileIO> ioClass(String location) {
     return io(location).getClass();
   }
   ```
   
   We could also rename `implFromLocation` to be `ioClassName` and open it but it won't be possible to check if a particular file IO is a subtype of `HadoopFileIO`. I believe the existing code can handle that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1093715696


##########
core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java:
##########
@@ -164,7 +164,7 @@ private FileIO io(String location) {
     return io;
   }
 
-  private static String implFromLocation(String location) {
+  public static String implFromLocation(String location) {

Review Comment:
   If we have `Class<? extends FileIO>`, we can use `isAssignableFrom` to check types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1095304010


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java:
##########
@@ -67,14 +62,13 @@ public boolean caseSensitive() {
   }
 
   public boolean localityEnabled() {
-    if (table.io() instanceof HadoopFileIO) {
-      HadoopInputFile file = (HadoopInputFile) table.io().newInputFile(table.location());
-      String scheme = file.getFileSystem().getScheme();
-      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+    if (Util.usesHadoopFileIO(table.io(), table.location())) {

Review Comment:
   +1, this would match the flink behaviour, i refrained for making them same initially because let's say if my FileIO is S3FileIO and from spark read option i pass locality as true, I would make locality enabled as false and unnecessarily check `mayHaveBlockLocations` earlier we would have short circuited it and made locality enabled as false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1095304010


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java:
##########
@@ -67,14 +62,13 @@ public boolean caseSensitive() {
   }
 
   public boolean localityEnabled() {
-    if (table.io() instanceof HadoopFileIO) {
-      HadoopInputFile file = (HadoopInputFile) table.io().newInputFile(table.location());
-      String scheme = file.getFileSystem().getScheme();
-      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+    if (Util.usesHadoopFileIO(table.io(), table.location())) {

Review Comment:
   +1, this would match the flink behaviour, i refrained for making them same because let's say if my FileIO is S3FileIO and from spark read option i pass locality as true, I would make locality enabled as false and unnecessarily check `mayHaveBlockLocations` , earlier we would have short circuited it and made locality enabled as false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1093522365


##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -84,10 +84,16 @@ public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {
     return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
   }
 
+  public static boolean isHDFSLocation(FileIO io, String location) {

Review Comment:
   I am not sure the name makes sense cause we don't actually validate the location is an HDFS location. In some cases, we simply validate it is `HadoopFileIO`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#issuecomment-1416694194

   Thanks @jackye1995 @aokolnychyi @amogh-jahagirdar for the reviews !
   
   > @singhpk234, would you be interested to cherry-pick this change to other query engine versions?
   
   sure thing added the backport for lower spark and flink version : 
   
   * https://github.com/apache/iceberg/pull/6743
   * https://github.com/apache/iceberg/pull/6744


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#issuecomment-1416552184

   > I am not sure I understood why not simply use Util$mayHaveBlockLocations in Spark.
   > boolean defaultValue = Util.mayHaveBlockLocations(table.io(), table.location());
   return PropertyUtil.propertyAsBoolean(readOptions, SparkReadOptions.LOCALITY, defaultValue);
   
   previous behviour :
   | SparkReadOptions.LOCALITY      | FileIO | SparkReadConf#localityEnabled
   | ----------- | ----------- |---|
   | true      |  S3FileIO       | false |
   
   after the change : 
   
   | SparkReadOptions.LOCALITY      | FileIO | SparkReadConf#localityEnabled
   | ----------- | ----------- |---|
   | true      |  S3FileIO       | true |
   
   This means since localityEnabled is true now spark will call Utils#blockLocation for each SparkInputPartition even for S3FileIO if the conf is set which was not the case earlier. (Apologies for my prev correspondance.)
   
   But it makes sense to make flink and spark locality enabled identical, made the changes suggested above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1086085951


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java:
##########
@@ -67,11 +69,15 @@ public boolean caseSensitive() {
   }
 
   public boolean localityEnabled() {
-    if (table.io() instanceof HadoopFileIO) {
-      HadoopInputFile file = (HadoopInputFile) table.io().newInputFile(table.location());
-      String scheme = file.getFileSystem().getScheme();
-      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
-      return PropertyUtil.propertyAsBoolean(readOptions, SparkReadOptions.LOCALITY, defaultValue);
+    if (table.io() instanceof HadoopFileIO || table.io() instanceof ResolvingFileIO) {

Review Comment:
   > I am wondering if we should make ResolvingFileIO.implFromLocation a public method so we can know the FileIO used for the location without the need to open input file.
   
   +1 on this, will certainly help.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#issuecomment-1402877695

   planning to fix this here as well : 
   https://github.com/apache/iceberg/blob/2671621565cde8adda27b81d1699663f71d9b3d4/core/src/main/java/org/apache/iceberg/hadoop/Util.java#L87-L95
   
   https://github.com/apache/iceberg/blob/2671621565cde8adda27b81d1699663f71d9b3d4/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/SourceUtil.java#L54
   
   Thanks @amogh-jahagirdar for the background :) !!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#issuecomment-1416573251

   > Do you think that's reasonable, @singhpk234?
   
   +1, makes sense to me. Thanks @aokolnychyi !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1093523455


##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -84,10 +84,16 @@ public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {
     return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
   }
 
+  public static boolean isHDFSLocation(FileIO io, String location) {

Review Comment:
   Let me think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1096212430


##########
core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java:
##########
@@ -168,6 +168,10 @@ private static String implFromLocation(String location) {
     return SCHEME_TO_FILE_IO.getOrDefault(scheme(location), FALLBACK_IMPL);
   }
 
+  public Class<? extends FileIO> ioClass(String location) {
+    return io(location).getClass();

Review Comment:
   we also wanted to check if any fileIO extending HadoopFileIO should also be covered hence didn't use the implFromFileLocation, but definitely loading S3FileIO etc is going to be expensive, was thinking we would eventually need the fileIO obj anyway since we check the table location at the moment (unless the location of metadata and data path are overidden by table prop), Should we restrict the check to only HadoopFileIO, Your thoughs ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1093717093


##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -84,10 +84,16 @@ public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {
     return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
   }
 
+  public static boolean isHDFSLocation(FileIO io, String location) {
+    return io instanceof HadoopFileIO
+        || (io instanceof ResolvingFileIO
+            && ResolvingFileIO.implFromLocation(location).equals(HadoopFileIO.class.getName()));
+  }
+
   private static String[] blockLocations(FileIO io, ContentScanTask<?> task) {
-    InputFile inputFile = io.newInputFile(task.file().path().toString());
-    if (inputFile instanceof HadoopInputFile) {
-      HadoopInputFile hadoopInputFile = (HadoopInputFile) inputFile;
+    String location = task.file().path().toString();
+    if (isHDFSLocation(io, location)) {
+      HadoopInputFile hadoopInputFile = (HadoopInputFile) io.newInputFile(location);

Review Comment:
   If we plan to support custom `FileIO` implementations that extend `HadoopFileIO`, this cast may be unsafe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1095368618


##########
core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java:
##########
@@ -168,6 +168,10 @@ private static String implFromLocation(String location) {
     return SCHEME_TO_FILE_IO.getOrDefault(scheme(location), FALLBACK_IMPL);
   }
 
+  public Class<? extends FileIO> ioClass(String location) {
+    return io(location).getClass();

Review Comment:
   can we get the class from `implFromLocation` through `Class.forName(className)`? `io()` will do the actual loading which is more expensive



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1093588435


##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -84,10 +84,16 @@ public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {
     return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
   }
 
+  public static boolean isHDFSLocation(FileIO io, String location) {

Review Comment:
   maybe `usesHadoopFileIO`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1093717093


##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -84,10 +84,16 @@ public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {
     return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
   }
 
+  public static boolean isHDFSLocation(FileIO io, String location) {
+    return io instanceof HadoopFileIO
+        || (io instanceof ResolvingFileIO
+            && ResolvingFileIO.implFromLocation(location).equals(HadoopFileIO.class.getName()));
+  }
+
   private static String[] blockLocations(FileIO io, ContentScanTask<?> task) {
-    InputFile inputFile = io.newInputFile(task.file().path().toString());
-    if (inputFile instanceof HadoopInputFile) {
-      HadoopInputFile hadoopInputFile = (HadoopInputFile) inputFile;
+    String location = task.file().path().toString();
+    if (isHDFSLocation(io, location)) {
+      HadoopInputFile hadoopInputFile = (HadoopInputFile) io.newInputFile(location);

Review Comment:
   If we plan to support custom `FileIO` implementations that extend `HadoopFileIO`, this cast may be not safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1095306379


##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -84,10 +88,38 @@ public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {
     return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
   }
 
+  public static boolean usesHadoopFileIO(FileIO io, String location) {

Review Comment:
   +1, this would match the flink behaviour, i refrained for making them same because let's say if my FileIO is S3FileIO and from spark read option i pass locality as true, it would make locality enabled as false and unnecessarily check `mayHaveBlockLocations` , earlier we would have short circuited it and made locality enabled as false upfront based on S3FileIO. Your thoughts ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1095306379


##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -84,10 +88,38 @@ public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {
     return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
   }
 
+  public static boolean usesHadoopFileIO(FileIO io, String location) {

Review Comment:
   +1, this would match the flink behaviour, i refrained for making them same because let's say if my FileIO is S3FileIO and from spark read option i pass locality as true, I would make locality enabled as false and unnecessarily check `mayHaveBlockLocations` , earlier we would have short circuited it and made locality enabled as false. Your thoughts ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1096373389


##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -84,11 +88,44 @@ public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {
     return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
   }
 
+  public static boolean mayHaveBlockLocations(FileIO io, String location) {
+    if (usesHadoopFileIO(io, location)) {
+      InputFile inputFile = io.newInputFile(location);
+      if (inputFile instanceof HadoopInputFile) {
+        String scheme = ((HadoopInputFile) inputFile).getFileSystem().getScheme();
+        return LOCALITY_WHITELIST_FS.contains(scheme);
+
+      } else {
+        return false;
+      }
+    }
+
+    return false;
+  }
+
+  public static boolean usesHadoopFileIO(FileIO io, String location) {
+    if (io instanceof HadoopFileIO) {
+      return true;
+
+    } else if (io instanceof ResolvingFileIO) {
+      ResolvingFileIO resolvingFileIO = (ResolvingFileIO) io;
+      return HadoopFileIO.class.isAssignableFrom(resolvingFileIO.ioClass(location));
+
+    } else {
+      return false;
+    }
+  }
+
   private static String[] blockLocations(FileIO io, ContentScanTask<?> task) {

Review Comment:
   Oops, this is the third overloaded `blockLocations`. Let's keep all three next to each other.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1095306379


##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -84,10 +88,38 @@ public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {
     return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
   }
 
+  public static boolean usesHadoopFileIO(FileIO io, String location) {

Review Comment:
   +1, this would match the flink behaviour, i refrained for making them same because let's say if my FileIO is S3FileIO and from spark read option i pass locality as true, it would make locality enabled as true and unnecessarily check `blockLocations` , earlier we would have short circuited it and made locality enabled as false upfront based on S3FileIO. Your thoughts ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1085900059


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java:
##########
@@ -67,11 +69,15 @@ public boolean caseSensitive() {
   }
 
   public boolean localityEnabled() {
-    if (table.io() instanceof HadoopFileIO) {
-      HadoopInputFile file = (HadoopInputFile) table.io().newInputFile(table.location());
-      String scheme = file.getFileSystem().getScheme();
-      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
-      return PropertyUtil.propertyAsBoolean(readOptions, SparkReadOptions.LOCALITY, defaultValue);
+    if (table.io() instanceof HadoopFileIO || table.io() instanceof ResolvingFileIO) {

Review Comment:
   I am a bit concerned for people using `ResolvingFileIO` with this approach, previously we will only create an input file to check locality if it's `HadoopFileIO`, but now if the user is using `RevolvingFileIO` this operation will be done for every single file even if it is not a HadoopFileIO for the specific location.
   
   I am wondering if we should make `ResolvingFileIO.implFromLocation` a public method so we can know the FileIO used for the location without the need to open input file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1084813006


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java:
##########
@@ -67,11 +69,15 @@ public boolean caseSensitive() {
   }
 
   public boolean localityEnabled() {
-    if (table.io() instanceof HadoopFileIO) {
-      HadoopInputFile file = (HadoopInputFile) table.io().newInputFile(table.location());
-      String scheme = file.getFileSystem().getScheme();
-      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
-      return PropertyUtil.propertyAsBoolean(readOptions, SparkReadOptions.LOCALITY, defaultValue);
+    if (table.io() instanceof HadoopFileIO || table.io() instanceof ResolvingFileIO) {
+      InputFile file = table.io().newInputFile(table.location());
+      if (file instanceof HadoopInputFile) {
+        String scheme = ((HadoopInputFile) file).getFileSystem().getScheme();
+        boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+        return PropertyUtil.propertyAsBoolean(readOptions, SparkReadOptions.LOCALITY, defaultValue);
+      } else {
+        return false;

Review Comment:
   Nit: could table.io() be assigned to a variable? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1096243437


##########
core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java:
##########
@@ -168,6 +168,14 @@ private static String implFromLocation(String location) {
     return SCHEME_TO_FILE_IO.getOrDefault(scheme(location), FALLBACK_IMPL);
   }
 
+  public Class<?> ioClass(String location) {
+    try {
+      return Class.forName(implFromLocation(location));
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);

Review Comment:
   I think this can be a validation exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1096374242


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java:
##########
@@ -67,14 +62,13 @@ public boolean caseSensitive() {
   }
 
   public boolean localityEnabled() {
-    if (table.io() instanceof HadoopFileIO) {
-      HadoopInputFile file = (HadoopInputFile) table.io().newInputFile(table.location());
-      String scheme = file.getFileSystem().getScheme();
-      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+    if (Util.usesHadoopFileIO(table.io(), table.location())) {

Review Comment:
   @singhpk234, I think `Util.mayHaveBlockLocations` internally calls `usesHadoopFileIO` so I am not sure I got your point [here](https://github.com/apache/iceberg/pull/6655#discussion_r1095306379). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#issuecomment-1416609858

   Thanks for this change, @singhpk234! Thanks for reviewing, @jackye1995 @amogh-jahagirdar!
   
   @singhpk234, would you be interested to cherry-pick this change to other query engine versions?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1093520619


##########
core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java:
##########
@@ -164,7 +164,7 @@ private FileIO io(String location) {
     return io;
   }
 
-  private static String implFromLocation(String location) {
+  public static String implFromLocation(String location) {

Review Comment:
   The name of the method sounds like something we would want to keep private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1093586756


##########
core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java:
##########
@@ -164,7 +164,7 @@ private FileIO io(String location) {
     return io;
   }
 
-  private static String implFromLocation(String location) {
+  public static String implFromLocation(String location) {

Review Comment:
   yes that's why I am a bit hesitated, but I think it also makes sense that `ResolvingFileIO` allows the caller to know what is the `FileIO` it will resolve to for each location.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6655: Spark: Handle ResolvingFileIO while determining LocalityPreference

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6655:
URL: https://github.com/apache/iceberg/pull/6655#discussion_r1093522365


##########
core/src/main/java/org/apache/iceberg/hadoop/Util.java:
##########
@@ -84,10 +84,16 @@ public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {
     return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
   }
 
+  public static boolean isHDFSLocation(FileIO io, String location) {

Review Comment:
   I am not sure the name makes sense cause we don't actually validate the location is an HDFS location. In some cases, we simply validate it is `HadoopFileIO`, which tells us nothing about the location.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org