You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/02/14 03:03:00 UTC

[iceberg] branch master updated: Flink: Support HDFS locality with LocatableInputSplit (#3817)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d43cb4c  Flink: Support HDFS locality with LocatableInputSplit (#3817)
d43cb4c is described below

commit d43cb4c296f8096300b43fe830fe6c4fb085aaa5
Author: liliwei <hi...@gmail.com>
AuthorDate: Mon Feb 14 11:02:42 2022 +0800

    Flink: Support HDFS locality with LocatableInputSplit (#3817)
---
 .../apache/iceberg/flink/FlinkConfigOptions.java   |  6 +++
 .../iceberg/flink/source/FlinkInputFormat.java     |  5 ++-
 .../iceberg/flink/source/FlinkInputSplit.java      | 21 ++++-------
 .../apache/iceberg/flink/source/FlinkSource.java   | 43 +++++++++++++++++++++-
 .../iceberg/flink/source/FlinkSplitPlanner.java    | 19 ++++++++--
 .../apache/iceberg/flink/source/ScanContext.java   | 20 ++++++++--
 .../iceberg/flink/source/TestFlinkScanSql.java     | 35 ++++++++++++++++++
 7 files changed, 127 insertions(+), 22 deletions(-)

diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
index 067abe8..49cef32 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
@@ -40,4 +40,10 @@ public class FlinkConfigOptions {
           .intType()
           .defaultValue(100)
           .withDescription("Sets max infer parallelism for source operator.");
+
+  public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO =
+      ConfigOptions.key("table.exec.iceberg.expose-split-locality-info")
+          .booleanType()
+          .noDefaultValue()
+          .withDescription("Expose split host information to use Flink's locality aware split assigner.");
 }
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
index a4cbab5..1027b5b 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.flink.source;
 import java.io.IOException;
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
@@ -83,7 +84,9 @@ public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit>
 
   @Override
   public InputSplitAssigner getInputSplitAssigner(FlinkInputSplit[] inputSplits) {
-    return new DefaultInputSplitAssigner(inputSplits);
+    return context.exposeLocality() ?
+        new LocatableInputSplitAssigner(inputSplits) :
+        new DefaultInputSplitAssigner(inputSplits);
   }
 
   @Override
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
index b59574f..5bb85fe 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
@@ -19,29 +19,21 @@
 
 package org.apache.iceberg.flink.source;
 
-import org.apache.flink.core.io.InputSplit;
+import java.util.Arrays;
+import javax.annotation.Nullable;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 
-/**
- * TODO Implement {@link LocatableInputSplit}.
- */
-public class FlinkInputSplit implements InputSplit {
+public class FlinkInputSplit extends LocatableInputSplit {
 
-  private final int splitNumber;
   private final CombinedScanTask task;
 
-  FlinkInputSplit(int splitNumber, CombinedScanTask task) {
-    this.splitNumber = splitNumber;
+  FlinkInputSplit(int splitNumber, CombinedScanTask task, @Nullable String[] hostnames) {
+    super(splitNumber, hostnames);
     this.task = task;
   }
 
-  @Override
-  public int getSplitNumber() {
-    return splitNumber;
-  }
-
   CombinedScanTask getTask() {
     return task;
   }
@@ -49,8 +41,9 @@ public class FlinkInputSplit implements InputSplit {
   @Override
   public String toString() {
     return MoreObjects.toStringHelper(this)
-        .add("splitNumber", splitNumber)
+        .add("splitNumber", getSplitNumber())
         .add("task", task)
+        .add("hosts", Arrays.toString(getHostnames()))
         .toString();
   }
 }
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
index a3263d2..df8009f 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
@@ -31,6 +32,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableScan;
@@ -40,10 +42,16 @@ import org.apache.iceberg.flink.FlinkConfigOptions;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class FlinkSource {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkSource.class);
+
   private FlinkSource() {
   }
 
@@ -70,12 +78,15 @@ public class FlinkSource {
    * Source builder to build {@link DataStream}.
    */
   public static class Builder {
+    private static final Set<String> FILE_SYSTEM_SUPPORT_LOCALITY = ImmutableSet.of("hdfs");
+
     private StreamExecutionEnvironment env;
     private Table table;
     private TableLoader tableLoader;
     private TableSchema projectedSchema;
     private ReadableConfig readableConfig = new Configuration();
     private final ScanContext.Builder contextBuilder = ScanContext.builder();
+    private Boolean exposeLocality;
 
     public Builder tableLoader(TableLoader newLoader) {
       this.tableLoader = newLoader;
@@ -157,6 +168,11 @@ public class FlinkSource {
       return this;
     }
 
+    public Builder exposeLocality(boolean newExposeLocality) {
+      this.exposeLocality = newExposeLocality;
+      return this;
+    }
+
     public Builder nameMapping(String nameMapping) {
       contextBuilder.nameMapping(nameMapping);
       return this;
@@ -195,6 +211,7 @@ public class FlinkSource {
       } else {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
       }
+      contextBuilder.exposeLocality(localityEnabled());
 
       return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build());
     }
@@ -225,7 +242,8 @@ public class FlinkSource {
       if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) {
         int maxInferParallelism = readableConfig.get(FlinkConfigOptions
             .TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
-        Preconditions.checkState(maxInferParallelism >= 1,
+        Preconditions.checkState(
+            maxInferParallelism >= 1,
             FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
         int splitNum;
         try {
@@ -247,6 +265,29 @@ public class FlinkSource {
       parallelism = Math.max(1, parallelism);
       return parallelism;
     }
+
+    private boolean localityEnabled() {
+      Boolean localityEnabled =
+          this.exposeLocality != null ? this.exposeLocality :
+              readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+
+      if (localityEnabled != null && !localityEnabled) {
+        return false;
+      }
+
+      FileIO fileIO = table.io();
+      if (fileIO instanceof HadoopFileIO) {
+        HadoopFileIO hadoopFileIO = (HadoopFileIO) fileIO;
+        try {
+          String scheme = new Path(table.location()).getFileSystem(hadoopFileIO.getConf()).getScheme();
+          return FILE_SYSTEM_SUPPORT_LOCALITY.contains(scheme);
+        } catch (IOException e) {
+          LOG.warn("Failed to determine whether the locality information can be exposed for table: {}", table, e);
+        }
+      }
+
+      return false;
+    }
   }
 
   public static boolean isBounded(Map<String, String> properties) {
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
index e000114..213f9c9 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
@@ -29,8 +29,11 @@ import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
 
 @Internal
 public class FlinkSplitPlanner {
@@ -41,9 +44,19 @@ public class FlinkSplitPlanner {
     try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
       List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
       FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-      for (int i = 0; i < tasks.size(); i++) {
-        splits[i] = new FlinkInputSplit(i, tasks.get(i));
-      }
+      boolean exposeLocality = context.exposeLocality();
+
+      Tasks.range(tasks.size())
+          .stopOnFailure()
+          .executeWith(exposeLocality ? ThreadPools.getWorkerPool() : null)
+          .run(index -> {
+            CombinedScanTask task = tasks.get(index);
+            String[] hostnames = null;
+            if (exposeLocality) {
+              hostnames = Util.blockLocations(table.io(), task);
+            }
+            splits[index] = new FlinkInputSplit(index, task, hostnames);
+          });
       return splits;
     } catch (IOException e) {
       throw new UncheckedIOException("Failed to process tasks iterable", e);
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index d290a64..25b3bd0 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -72,6 +72,7 @@ class ScanContext implements Serializable {
       ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);
 
   private final boolean caseSensitive;
+  private final boolean exposeLocality;
   private final Long snapshotId;
   private final Long startSnapshotId;
   private final Long endSnapshotId;
@@ -90,8 +91,8 @@ class ScanContext implements Serializable {
 
   private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId,
                       Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost,
-                      boolean isStreaming, Duration monitorInterval, String nameMapping,
-                      Schema schema, List<Expression> filters, long limit, boolean includeColumnStats) {
+                      boolean isStreaming, Duration monitorInterval, String nameMapping, Schema schema,
+                      List<Expression> filters, long limit, boolean includeColumnStats, boolean exposeLocality) {
     this.caseSensitive = caseSensitive;
     this.snapshotId = snapshotId;
     this.startSnapshotId = startSnapshotId;
@@ -108,6 +109,7 @@ class ScanContext implements Serializable {
     this.filters = filters;
     this.limit = limit;
     this.includeColumnStats = includeColumnStats;
+    this.exposeLocality = exposeLocality;
   }
 
   boolean caseSensitive() {
@@ -170,6 +172,10 @@ class ScanContext implements Serializable {
     return includeColumnStats;
   }
 
+  boolean exposeLocality() {
+    return exposeLocality;
+  }
+
   ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) {
     return ScanContext.builder()
         .caseSensitive(caseSensitive)
@@ -186,6 +192,7 @@ class ScanContext implements Serializable {
         .project(schema)
         .filters(filters)
         .limit(limit)
+        .exposeLocality(exposeLocality)
         .includeColumnStats(includeColumnStats)
         .build();
   }
@@ -207,6 +214,7 @@ class ScanContext implements Serializable {
         .filters(filters)
         .limit(limit)
         .includeColumnStats(includeColumnStats)
+        .exposeLocality(exposeLocality)
         .build();
   }
 
@@ -230,6 +238,7 @@ class ScanContext implements Serializable {
     private List<Expression> filters;
     private long limit = -1L;
     private boolean includeColumnStats = INCLUDE_COLUMN_STATS.defaultValue();
+    private boolean exposeLocality;
 
     private Builder() {
     }
@@ -309,6 +318,11 @@ class ScanContext implements Serializable {
       return this;
     }
 
+    Builder exposeLocality(boolean newExposeLocality) {
+      this.exposeLocality = newExposeLocality;
+      return this;
+    }
+
     Builder fromProperties(Map<String, String> properties) {
       Configuration config = new Configuration();
       properties.forEach(config::setString);
@@ -331,7 +345,7 @@ class ScanContext implements Serializable {
       return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
           endSnapshotId, asOfTimestamp, splitSize, splitLookback,
           splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema,
-          filters, limit, includeColumnStats);
+          filters, limit, includeColumnStats, exposeLocality);
     }
   }
 }
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
index 9af1b7c..b4f4631 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.common.DynMethods;
 import org.apache.iceberg.data.GenericAppenderHelper;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
@@ -42,6 +43,7 @@ import org.apache.iceberg.flink.FlinkConfigOptions;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -178,6 +180,39 @@ public class TestFlinkScanSql extends TestFlinkSource {
     Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
   }
 
+  @Test
+  public void testExposeLocality() throws Exception {
+    Table table =
+        catalog.createTable(TableIdentifier.of("default", "t"), TestFixtures.SCHEMA, TestFixtures.SPEC);
+
+    TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 10, 0L);
+    expectedRecords.forEach(expectedRecord -> expectedRecord.set(2, "2020-03-20"));
+
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+    DataFile dataFile = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), expectedRecords);
+    helper.appendToTable(dataFile);
+
+    // test sql api
+    Configuration tableConf = getTableEnv().getConfig().getConfiguration();
+    tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO.key(), false);
+
+    List<Row> results = sql("select * from t");
+    org.apache.iceberg.flink.TestHelpers.assertRecords(results, expectedRecords, TestFixtures.SCHEMA);
+
+    // test table api
+    tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO.key(), true);
+    FlinkSource.Builder builder = FlinkSource.forRowData().tableLoader(tableLoader).table(table);
+
+    Boolean localityEnabled =
+        DynMethods.builder("localityEnabled").hiddenImpl(builder.getClass()).build().invoke(builder);
+    // When running with CI or local, `localityEnabled` will be false even if this configuration is enabled
+    Assert.assertFalse("Expose split locality info should be false.", localityEnabled);
+
+    results = run(builder, Maps.newHashMap(), "where dt='2020-03-20'", "*");
+    org.apache.iceberg.flink.TestHelpers.assertRecords(results, expectedRecords, TestFixtures.SCHEMA);
+  }
+
   private List<Row> sql(String query, Object... args) {
     TableResult tableResult = getTableEnv().executeSql(String.format(query, args));
     try (CloseableIterator<Row> iter = tableResult.collect()) {