You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2023/02/06 02:39:51 UTC

[iceberg] branch master updated: Flink: Backport handling ResolvingFileIO in determining locality - PR 6655 (#6743)

This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 531a81581a Flink: Backport handling ResolvingFileIO in determining locality - PR 6655 (#6743)
531a81581a is described below

commit 531a81581aed152bdbcfa8801593f26918dc1d02
Author: Prashant Singh <35...@users.noreply.github.com>
AuthorDate: Sun Feb 5 18:39:45 2023 -0800

    Flink: Backport handling ResolvingFileIO in determining locality - PR 6655 (#6743)
    
    Co-authored-by: Prashant Singh <ps...@amazon.com>
---
 .../apache/iceberg/flink/source/SourceUtil.java    | 29 ++--------------------
 .../apache/iceberg/flink/source/SourceUtil.java    | 29 ++--------------------
 2 files changed, 4 insertions(+), 54 deletions(-)

diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/SourceUtil.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/SourceUtil.java
index 8ed897d3f4..7c3a69dbc1 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/SourceUtil.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/SourceUtil.java
@@ -18,27 +18,17 @@
  */
 package org.apache.iceberg.flink.source;
 
-import java.io.IOException;
-import java.util.Set;
 import java.util.function.Supplier;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.FlinkConfigOptions;
-import org.apache.iceberg.hadoop.HadoopFileIO;
-import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 class SourceUtil {
   private SourceUtil() {}
 
-  private static final Logger LOG = LoggerFactory.getLogger(SourceUtil.class);
-  private static final Set<String> FILE_SYSTEM_SUPPORT_LOCALITY = ImmutableSet.of("hdfs");
-
   static boolean isLocalityEnabled(
       Table table, ReadableConfig readableConfig, Boolean exposeLocality) {
     Boolean localityEnabled =
@@ -50,22 +40,7 @@ class SourceUtil {
       return false;
     }
 
-    FileIO fileIO = table.io();
-    if (fileIO instanceof HadoopFileIO) {
-      HadoopFileIO hadoopFileIO = (HadoopFileIO) fileIO;
-      try {
-        String scheme =
-            new Path(table.location()).getFileSystem(hadoopFileIO.getConf()).getScheme();
-        return FILE_SYSTEM_SUPPORT_LOCALITY.contains(scheme);
-      } catch (IOException e) {
-        LOG.warn(
-            "Failed to determine whether the locality information can be exposed for table: {}",
-            table,
-            e);
-      }
-    }
-
-    return false;
+    return Util.mayHaveBlockLocations(table.io(), table.location());
   }
 
   /**
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/SourceUtil.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/SourceUtil.java
index 8ed897d3f4..7c3a69dbc1 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/SourceUtil.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/SourceUtil.java
@@ -18,27 +18,17 @@
  */
 package org.apache.iceberg.flink.source;
 
-import java.io.IOException;
-import java.util.Set;
 import java.util.function.Supplier;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.FlinkConfigOptions;
-import org.apache.iceberg.hadoop.HadoopFileIO;
-import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 class SourceUtil {
   private SourceUtil() {}
 
-  private static final Logger LOG = LoggerFactory.getLogger(SourceUtil.class);
-  private static final Set<String> FILE_SYSTEM_SUPPORT_LOCALITY = ImmutableSet.of("hdfs");
-
   static boolean isLocalityEnabled(
       Table table, ReadableConfig readableConfig, Boolean exposeLocality) {
     Boolean localityEnabled =
@@ -50,22 +40,7 @@ class SourceUtil {
       return false;
     }
 
-    FileIO fileIO = table.io();
-    if (fileIO instanceof HadoopFileIO) {
-      HadoopFileIO hadoopFileIO = (HadoopFileIO) fileIO;
-      try {
-        String scheme =
-            new Path(table.location()).getFileSystem(hadoopFileIO.getConf()).getScheme();
-        return FILE_SYSTEM_SUPPORT_LOCALITY.contains(scheme);
-      } catch (IOException e) {
-        LOG.warn(
-            "Failed to determine whether the locality information can be exposed for table: {}",
-            table,
-            e);
-      }
-    }
-
-    return false;
+    return Util.mayHaveBlockLocations(table.io(), table.location());
   }
 
   /**