You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/02/14 03:03:00 UTC
[iceberg] branch master updated: Flink: Support HDFS locality with LocatableInputSplit (#3817)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d43cb4c Flink: Support HDFS locality with LocatableInputSplit (#3817)
d43cb4c is described below
commit d43cb4c296f8096300b43fe830fe6c4fb085aaa5
Author: liliwei <hi...@gmail.com>
AuthorDate: Mon Feb 14 11:02:42 2022 +0800
Flink: Support HDFS locality with LocatableInputSplit (#3817)
---
.../apache/iceberg/flink/FlinkConfigOptions.java | 6 +++
.../iceberg/flink/source/FlinkInputFormat.java | 5 ++-
.../iceberg/flink/source/FlinkInputSplit.java | 21 ++++-------
.../apache/iceberg/flink/source/FlinkSource.java | 43 +++++++++++++++++++++-
.../iceberg/flink/source/FlinkSplitPlanner.java | 19 ++++++++--
.../apache/iceberg/flink/source/ScanContext.java | 20 ++++++++--
.../iceberg/flink/source/TestFlinkScanSql.java | 35 ++++++++++++++++++
7 files changed, 127 insertions(+), 22 deletions(-)
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
index 067abe8..49cef32 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
@@ -40,4 +40,10 @@ public class FlinkConfigOptions {
.intType()
.defaultValue(100)
.withDescription("Sets max infer parallelism for source operator.");
+
+ public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO =
+ ConfigOptions.key("table.exec.iceberg.expose-split-locality-info")
+ .booleanType()
+ .noDefaultValue()
+ .withDescription("Expose split host information to use Flink's locality aware split assigner.");
}
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
index a4cbab5..1027b5b 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.flink.source;
import java.io.IOException;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
@@ -83,7 +84,9 @@ public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit>
@Override
public InputSplitAssigner getInputSplitAssigner(FlinkInputSplit[] inputSplits) {
- return new DefaultInputSplitAssigner(inputSplits);
+ return context.exposeLocality() ?
+ new LocatableInputSplitAssigner(inputSplits) :
+ new DefaultInputSplitAssigner(inputSplits);
}
@Override
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
index b59574f..5bb85fe 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
@@ -19,29 +19,21 @@
package org.apache.iceberg.flink.source;
-import org.apache.flink.core.io.InputSplit;
+import java.util.Arrays;
+import javax.annotation.Nullable;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
-/**
- * TODO Implement {@link LocatableInputSplit}.
- */
-public class FlinkInputSplit implements InputSplit {
+public class FlinkInputSplit extends LocatableInputSplit {
- private final int splitNumber;
private final CombinedScanTask task;
- FlinkInputSplit(int splitNumber, CombinedScanTask task) {
- this.splitNumber = splitNumber;
+ FlinkInputSplit(int splitNumber, CombinedScanTask task, @Nullable String[] hostnames) {
+ super(splitNumber, hostnames);
this.task = task;
}
- @Override
- public int getSplitNumber() {
- return splitNumber;
- }
-
CombinedScanTask getTask() {
return task;
}
@@ -49,8 +41,9 @@ public class FlinkInputSplit implements InputSplit {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
- .add("splitNumber", splitNumber)
+ .add("splitNumber", getSplitNumber())
.add("task", task)
+ .add("hosts", Arrays.toString(getHostnames()))
.toString();
}
}
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
index a3263d2..df8009f 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
@@ -31,6 +32,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
@@ -40,10 +42,16 @@ import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class FlinkSource {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkSource.class);
+
private FlinkSource() {
}
@@ -70,12 +78,15 @@ public class FlinkSource {
* Source builder to build {@link DataStream}.
*/
public static class Builder {
+ private static final Set<String> FILE_SYSTEM_SUPPORT_LOCALITY = ImmutableSet.of("hdfs");
+
private StreamExecutionEnvironment env;
private Table table;
private TableLoader tableLoader;
private TableSchema projectedSchema;
private ReadableConfig readableConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
+ private Boolean exposeLocality;
public Builder tableLoader(TableLoader newLoader) {
this.tableLoader = newLoader;
@@ -157,6 +168,11 @@ public class FlinkSource {
return this;
}
+ public Builder exposeLocality(boolean newExposeLocality) {
+ this.exposeLocality = newExposeLocality;
+ return this;
+ }
+
public Builder nameMapping(String nameMapping) {
contextBuilder.nameMapping(nameMapping);
return this;
@@ -195,6 +211,7 @@ public class FlinkSource {
} else {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
}
+ contextBuilder.exposeLocality(localityEnabled());
return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build());
}
@@ -225,7 +242,8 @@ public class FlinkSource {
if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) {
int maxInferParallelism = readableConfig.get(FlinkConfigOptions
.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
- Preconditions.checkState(maxInferParallelism >= 1,
+ Preconditions.checkState(
+ maxInferParallelism >= 1,
FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
int splitNum;
try {
@@ -247,6 +265,29 @@ public class FlinkSource {
parallelism = Math.max(1, parallelism);
return parallelism;
}
+
+ private boolean localityEnabled() {
+ Boolean localityEnabled =
+ this.exposeLocality != null ? this.exposeLocality :
+ readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+
+ if (localityEnabled != null && !localityEnabled) {
+ return false;
+ }
+
+ FileIO fileIO = table.io();
+ if (fileIO instanceof HadoopFileIO) {
+ HadoopFileIO hadoopFileIO = (HadoopFileIO) fileIO;
+ try {
+ String scheme = new Path(table.location()).getFileSystem(hadoopFileIO.getConf()).getScheme();
+ return FILE_SYSTEM_SUPPORT_LOCALITY.contains(scheme);
+ } catch (IOException e) {
+ LOG.warn("Failed to determine whether the locality information can be exposed for table: {}", table, e);
+ }
+ }
+
+ return false;
+ }
}
public static boolean isBounded(Map<String, String> properties) {
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
index e000114..213f9c9 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
@@ -29,8 +29,11 @@ import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
@Internal
public class FlinkSplitPlanner {
@@ -41,9 +44,19 @@ public class FlinkSplitPlanner {
try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
- for (int i = 0; i < tasks.size(); i++) {
- splits[i] = new FlinkInputSplit(i, tasks.get(i));
- }
+ boolean exposeLocality = context.exposeLocality();
+
+ Tasks.range(tasks.size())
+ .stopOnFailure()
+ .executeWith(exposeLocality ? ThreadPools.getWorkerPool() : null)
+ .run(index -> {
+ CombinedScanTask task = tasks.get(index);
+ String[] hostnames = null;
+ if (exposeLocality) {
+ hostnames = Util.blockLocations(table.io(), task);
+ }
+ splits[index] = new FlinkInputSplit(index, task, hostnames);
+ });
return splits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to process tasks iterable", e);
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index d290a64..25b3bd0 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -72,6 +72,7 @@ class ScanContext implements Serializable {
ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);
private final boolean caseSensitive;
+ private final boolean exposeLocality;
private final Long snapshotId;
private final Long startSnapshotId;
private final Long endSnapshotId;
@@ -90,8 +91,8 @@ class ScanContext implements Serializable {
private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId,
Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost,
- boolean isStreaming, Duration monitorInterval, String nameMapping,
- Schema schema, List<Expression> filters, long limit, boolean includeColumnStats) {
+ boolean isStreaming, Duration monitorInterval, String nameMapping, Schema schema,
+ List<Expression> filters, long limit, boolean includeColumnStats, boolean exposeLocality) {
this.caseSensitive = caseSensitive;
this.snapshotId = snapshotId;
this.startSnapshotId = startSnapshotId;
@@ -108,6 +109,7 @@ class ScanContext implements Serializable {
this.filters = filters;
this.limit = limit;
this.includeColumnStats = includeColumnStats;
+ this.exposeLocality = exposeLocality;
}
boolean caseSensitive() {
@@ -170,6 +172,10 @@ class ScanContext implements Serializable {
return includeColumnStats;
}
+ boolean exposeLocality() {
+ return exposeLocality;
+ }
+
ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
@@ -186,6 +192,7 @@ class ScanContext implements Serializable {
.project(schema)
.filters(filters)
.limit(limit)
+ .exposeLocality(exposeLocality)
.includeColumnStats(includeColumnStats)
.build();
}
@@ -207,6 +214,7 @@ class ScanContext implements Serializable {
.filters(filters)
.limit(limit)
.includeColumnStats(includeColumnStats)
+ .exposeLocality(exposeLocality)
.build();
}
@@ -230,6 +238,7 @@ class ScanContext implements Serializable {
private List<Expression> filters;
private long limit = -1L;
private boolean includeColumnStats = INCLUDE_COLUMN_STATS.defaultValue();
+ private boolean exposeLocality;
private Builder() {
}
@@ -309,6 +318,11 @@ class ScanContext implements Serializable {
return this;
}
+ Builder exposeLocality(boolean newExposeLocality) {
+ this.exposeLocality = newExposeLocality;
+ return this;
+ }
+
Builder fromProperties(Map<String, String> properties) {
Configuration config = new Configuration();
properties.forEach(config::setString);
@@ -331,7 +345,7 @@ class ScanContext implements Serializable {
return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize, splitLookback,
splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema,
- filters, limit, includeColumnStats);
+ filters, limit, includeColumnStats, exposeLocality);
}
}
}
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
index 9af1b7c..b4f4631 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
@@ -42,6 +43,7 @@ import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;
@@ -178,6 +180,39 @@ public class TestFlinkScanSql extends TestFlinkSource {
Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
}
+ @Test
+ public void testExposeLocality() throws Exception {
+ Table table =
+ catalog.createTable(TableIdentifier.of("default", "t"), TestFixtures.SCHEMA, TestFixtures.SPEC);
+
+ TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());
+ List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 10, 0L);
+ expectedRecords.forEach(expectedRecord -> expectedRecord.set(2, "2020-03-20"));
+
+ GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+ DataFile dataFile = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), expectedRecords);
+ helper.appendToTable(dataFile);
+
+ // test sql api
+ Configuration tableConf = getTableEnv().getConfig().getConfiguration();
+ tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO.key(), false);
+
+ List<Row> results = sql("select * from t");
+ org.apache.iceberg.flink.TestHelpers.assertRecords(results, expectedRecords, TestFixtures.SCHEMA);
+
+ // test table api
+ tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO.key(), true);
+ FlinkSource.Builder builder = FlinkSource.forRowData().tableLoader(tableLoader).table(table);
+
+ Boolean localityEnabled =
+ DynMethods.builder("localityEnabled").hiddenImpl(builder.getClass()).build().invoke(builder);
+ // When running with CI or local, `localityEnabled` will be false even if this configuration is enabled
+ Assert.assertFalse("Expose split locality info should be false.", localityEnabled);
+
+ results = run(builder, Maps.newHashMap(), "where dt='2020-03-20'", "*");
+ org.apache.iceberg.flink.TestHelpers.assertRecords(results, expectedRecords, TestFixtures.SCHEMA);
+ }
+
private List<Row> sql(String query, Object... args) {
TableResult tableResult = getTableEnv().executeSql(String.format(query, args));
try (CloseableIterator<Row> iter = tableResult.collect()) {