You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/01/09 18:15:27 UTC
[iceberg] branch master updated: Flink: Add FLIP-27 Iceberg source split (#3501)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d2c26a0 Flink: Add FLIP-27 Iceberg source split (#3501)
d2c26a0 is described below
commit d2c26a02190a16539c8c0621c4d8aac2e9e3ec6c
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Sun Jan 9 10:15:08 2022 -0800
Flink: Add FLIP-27 Iceberg source split (#3501)
---
.../iceberg/flink/source/FlinkInputFormat.java | 2 +-
...kSplitGenerator.java => FlinkSplitPlanner.java} | 47 +++++---
.../apache/iceberg/flink/source/ScanContext.java | 24 +++-
.../flink/source/StreamingMonitorFunction.java | 2 +-
.../flink/source/split/IcebergSourceSplit.java | 122 +++++++++++++++++++++
.../source/split/IcebergSourceSplitSerializer.java | 56 ++++++++++
.../apache/iceberg/flink/source/SplitHelpers.java | 90 +++++++++++++++
.../flink/source/TestStreamingReaderOperator.java | 2 +-
.../split/TestIcebergSourceSplitSerializer.java | 116 ++++++++++++++++++++
9 files changed, 441 insertions(+), 20 deletions(-)
diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
index 8b757ac..a4cbab5 100644
--- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
+++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
@@ -77,7 +77,7 @@ public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit>
tableLoader.open();
try (TableLoader loader = tableLoader) {
Table table = loader.loadTable();
- return FlinkSplitGenerator.createInputSplits(table, context);
+ return FlinkSplitPlanner.planInputSplits(table, context);
}
}
diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
similarity index 63%
rename from flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
rename to flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
index f495e09..e000114 100644
--- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
+++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
@@ -22,33 +22,56 @@ package org.apache.iceberg.flink.source;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
+import org.apache.flink.annotation.Internal;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-class FlinkSplitGenerator {
- private FlinkSplitGenerator() {
+@Internal
+public class FlinkSplitPlanner {
+ private FlinkSplitPlanner() {
}
- static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
- List<CombinedScanTask> tasks = tasks(table, context);
- FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
- for (int i = 0; i < tasks.size(); i++) {
- splits[i] = new FlinkInputSplit(i, tasks.get(i));
+ static FlinkInputSplit[] planInputSplits(Table table, ScanContext context) {
+ try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
+ List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
+ FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
+ for (int i = 0; i < tasks.size(); i++) {
+ splits[i] = new FlinkInputSplit(i, tasks.get(i));
+ }
+ return splits;
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to process tasks iterable", e);
+ }
+ }
+
+ /**
+ * This returns splits for the FLIP-27 source
+ */
+ public static List<IcebergSourceSplit> planIcebergSourceSplits(Table table, ScanContext context) {
+ try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
+ return Lists.newArrayList(CloseableIterable.transform(tasksIterable,
+ task -> IcebergSourceSplit.fromCombinedScanTask(task)));
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to process task iterable: ", e);
}
- return splits;
}
- private static List<CombinedScanTask> tasks(Table table, ScanContext context) {
+ static CloseableIterable<CombinedScanTask> planTasks(Table table, ScanContext context) {
TableScan scan = table
.newScan()
.caseSensitive(context.caseSensitive())
.project(context.project());
+ if (context.includeColumnStats()) {
+ scan = scan.includeColumnStats();
+ }
+
if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
}
@@ -83,10 +106,6 @@ class FlinkSplitGenerator {
}
}
- try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
- return Lists.newArrayList(tasksIterable);
- } catch (IOException e) {
- throw new UncheckedIOException("Failed to close table scan: " + scan, e);
- }
+ return scan.planTasks();
}
}
diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 2896efb..d290a64 100644
--- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -68,6 +68,9 @@ class ScanContext implements Serializable {
private static final ConfigOption<Duration> MONITOR_INTERVAL =
ConfigOptions.key("monitor-interval").durationType().defaultValue(Duration.ofSeconds(10));
+ private static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS =
+ ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);
+
private final boolean caseSensitive;
private final Long snapshotId;
private final Long startSnapshotId;
@@ -83,11 +86,12 @@ class ScanContext implements Serializable {
private final Schema schema;
private final List<Expression> filters;
private final long limit;
+ private final boolean includeColumnStats;
private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId,
Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost,
boolean isStreaming, Duration monitorInterval, String nameMapping,
- Schema schema, List<Expression> filters, long limit) {
+ Schema schema, List<Expression> filters, long limit, boolean includeColumnStats) {
this.caseSensitive = caseSensitive;
this.snapshotId = snapshotId;
this.startSnapshotId = startSnapshotId;
@@ -103,6 +107,7 @@ class ScanContext implements Serializable {
this.schema = schema;
this.filters = filters;
this.limit = limit;
+ this.includeColumnStats = includeColumnStats;
}
boolean caseSensitive() {
@@ -161,6 +166,10 @@ class ScanContext implements Serializable {
return limit;
}
+ boolean includeColumnStats() {
+ return includeColumnStats;
+ }
+
ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
@@ -177,6 +186,7 @@ class ScanContext implements Serializable {
.project(schema)
.filters(filters)
.limit(limit)
+ .includeColumnStats(includeColumnStats)
.build();
}
@@ -196,6 +206,7 @@ class ScanContext implements Serializable {
.project(schema)
.filters(filters)
.limit(limit)
+ .includeColumnStats(includeColumnStats)
.build();
}
@@ -218,6 +229,7 @@ class ScanContext implements Serializable {
private Schema projectedSchema;
private List<Expression> filters;
private long limit = -1L;
+ private boolean includeColumnStats = INCLUDE_COLUMN_STATS.defaultValue();
private Builder() {
}
@@ -292,6 +304,11 @@ class ScanContext implements Serializable {
return this;
}
+ Builder includeColumnStats(boolean newIncludeColumnStats) {
+ this.includeColumnStats = newIncludeColumnStats;
+ return this;
+ }
+
Builder fromProperties(Map<String, String> properties) {
Configuration config = new Configuration();
properties.forEach(config::setString);
@@ -306,14 +323,15 @@ class ScanContext implements Serializable {
.splitOpenFileCost(config.get(SPLIT_FILE_OPEN_COST))
.streaming(config.get(STREAMING))
.monitorInterval(config.get(MONITOR_INTERVAL))
- .nameMapping(properties.get(DEFAULT_NAME_MAPPING));
+ .nameMapping(properties.get(DEFAULT_NAME_MAPPING))
+ .includeColumnStats(config.get(INCLUDE_COLUMN_STATS));
}
public ScanContext build() {
return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize, splitLookback,
splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema,
- filters, limit);
+ filters, limit, includeColumnStats);
}
}
}
diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
index 9d8e204..8bfad6d 100644
--- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
+++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
@@ -140,7 +140,7 @@ public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit
newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId);
}
- FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext);
+ FlinkInputSplit[] splits = FlinkSplitPlanner.planInputSplits(table, newScanContext);
for (FlinkInputSplit split : splits) {
sourceContext.collect(split);
}
diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
new file mode 100644
index 0000000..b46096a
--- /dev/null
+++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.split;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+
+@Internal
+public class IcebergSourceSplit implements SourceSplit, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final CombinedScanTask task;
+
+ private int fileOffset;
+ private long recordOffset;
+
+ // The splits are frequently serialized into checkpoints.
+ // Caching the byte representation makes repeated serialization cheap.
+ @Nullable
+ private transient byte[] serializedBytesCache;
+
+ private IcebergSourceSplit(CombinedScanTask task, int fileOffset, long recordOffset) {
+ this.task = task;
+ this.fileOffset = fileOffset;
+ this.recordOffset = recordOffset;
+ }
+
+ public static IcebergSourceSplit fromCombinedScanTask(CombinedScanTask combinedScanTask) {
+ return fromCombinedScanTask(combinedScanTask, 0, 0L);
+ }
+
+ public static IcebergSourceSplit fromCombinedScanTask(
+ CombinedScanTask combinedScanTask, int fileOffset, long recordOffset) {
+ return new IcebergSourceSplit(combinedScanTask, fileOffset, recordOffset);
+ }
+
+ public CombinedScanTask task() {
+ return task;
+ }
+
+ public int fileOffset() {
+ return fileOffset;
+ }
+
+ public long recordOffset() {
+ return recordOffset;
+ }
+
+ @Override
+ public String splitId() {
+ return MoreObjects.toStringHelper(this)
+ .add("files", toString(task.files()))
+ .toString();
+ }
+
+ public void updatePosition(int newFileOffset, long newRecordOffset) {
+ // invalidate the cache after position change
+ serializedBytesCache = null;
+ fileOffset = newFileOffset;
+ recordOffset = newRecordOffset;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("files", toString(task.files()))
+ .add("fileOffset", fileOffset)
+ .add("recordOffset", recordOffset)
+ .toString();
+ }
+
+ private String toString(Collection<FileScanTask> files) {
+ return Iterables.toString(files.stream().map(fileScanTask ->
+ MoreObjects.toStringHelper(fileScanTask)
+ .add("file", fileScanTask.file().path().toString())
+ .add("start", fileScanTask.start())
+ .add("length", fileScanTask.length())
+ .toString()).collect(Collectors.toList()));
+ }
+
+ byte[] serializeV1() throws IOException {
+ if (serializedBytesCache == null) {
+ serializedBytesCache = InstantiationUtil.serializeObject(this);
+ }
+ return serializedBytesCache;
+ }
+
+ static IcebergSourceSplit deserializeV1(byte[] serialized) throws IOException {
+ try {
+ return InstantiationUtil.deserializeObject(serialized, IcebergSourceSplit.class.getClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Failed to deserialize the split.", e);
+ }
+ }
+}
diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
new file mode 100644
index 0000000..9e32af5
--- /dev/null
+++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.split;
+
+import java.io.IOException;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * TODO: use Java serialization for now.
+ * Will switch to more stable serializer from
+ * <a href="https://github.com/apache/iceberg/issues/1698">issue-1698</a>.
+ */
+@Internal
+public class IcebergSourceSplitSerializer implements SimpleVersionedSerializer<IcebergSourceSplit> {
+ public static final IcebergSourceSplitSerializer INSTANCE = new IcebergSourceSplitSerializer();
+ private static final int VERSION = 1;
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public byte[] serialize(IcebergSourceSplit split) throws IOException {
+ return split.serializeV1();
+ }
+
+ @Override
+ public IcebergSourceSplit deserialize(int version, byte[] serialized) throws IOException {
+ switch (version) {
+ case 1:
+ return IcebergSourceSplit.deserializeV1(serialized);
+ default:
+ throw new IOException(String.format("Failed to deserialize IcebergSourceSplit. " +
+ "Encountered unsupported version: %d. Supported version are [1]", version));
+ }
+ }
+}
diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
new file mode 100644
index 0000000..df98808
--- /dev/null
+++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.rules.TemporaryFolder;
+
+public class SplitHelpers {
+
+ private static final AtomicLong splitLengthIncrement = new AtomicLong();
+
+ private SplitHelpers() {
+ }
+
+ /**
+ * This create a list of IcebergSourceSplit from real files
+ * <li>Create a new Hadoop table under the {@code temporaryFolder}
+ * <li>write {@code fileCount} number of files to the new Iceberg table
+ * <li>Discover the splits from the table and partition the splits by the {@code filePerSplit} limit
+ * <li>Delete the Hadoop table
+ *
+ * Since the table and data files are deleted before this method return,
+ * caller shouldn't attempt to read the data files.
+ */
+ public static List<IcebergSourceSplit> createSplitsFromTransientHadoopTable(
+ TemporaryFolder temporaryFolder, int fileCount, int filesPerSplit) throws Exception {
+ final File warehouseFile = temporaryFolder.newFolder();
+ Assert.assertTrue(warehouseFile.delete());
+ final String warehouse = "file:" + warehouseFile;
+ Configuration hadoopConf = new Configuration();
+ final HadoopCatalog catalog = new HadoopCatalog(hadoopConf, warehouse);
+ try {
+ final Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+ final GenericAppenderHelper dataAppender = new GenericAppenderHelper(
+ table, FileFormat.PARQUET, temporaryFolder);
+ for (int i = 0; i < fileCount; ++i) {
+ List<Record> records = RandomGenericData.generate(TestFixtures.SCHEMA, 2, i);
+ dataAppender.appendToTable(records);
+ }
+
+ final ScanContext scanContext = ScanContext.builder().build();
+ final List<IcebergSourceSplit> splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext);
+ return splits.stream()
+ .flatMap(split -> {
+ List<List<FileScanTask>> filesList = Lists.partition(
+ Lists.newArrayList(split.task().files()), filesPerSplit);
+ return filesList.stream()
+ .map(files -> new BaseCombinedScanTask(files))
+ .map(combinedScanTask -> IcebergSourceSplit.fromCombinedScanTask(combinedScanTask));
+ })
+ .collect(Collectors.toList());
+ } finally {
+ catalog.dropTable(TestFixtures.TABLE_IDENTIFIER);
+ catalog.close();
+ }
+ }
+}
diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java
index 19c2b6a..7978af6 100644
--- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java
+++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java
@@ -254,7 +254,7 @@ public class TestStreamingReaderOperator extends TableTestBase {
.build();
}
- Collections.addAll(inputSplits, FlinkSplitGenerator.createInputSplits(table, scanContext));
+ Collections.addAll(inputSplits, FlinkSplitPlanner.planInputSplits(table, scanContext));
}
return inputSplits;
diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java
new file mode 100644
index 0000000..36eea1e
--- /dev/null
+++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.split;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.iceberg.flink.source.SplitHelpers;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceSplitSerializer {
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ private final IcebergSourceSplitSerializer serializer = IcebergSourceSplitSerializer.INSTANCE;
+
+ @Test
+ public void testLatestVersion() throws Exception {
+ serializeAndDeserialize(1, 1);
+ serializeAndDeserialize(10, 2);
+ }
+
+ private void serializeAndDeserialize(int splitCount, int filesPerSplit) throws Exception {
+ final List<IcebergSourceSplit> splits = SplitHelpers
+ .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, splitCount, filesPerSplit);
+ for (IcebergSourceSplit split : splits) {
+ byte[] result = serializer.serialize(split);
+ IcebergSourceSplit deserialized = serializer.deserialize(serializer.getVersion(), result);
+ assertSplitEquals(split, deserialized);
+
+ byte[] cachedResult = serializer.serialize(split);
+ Assert.assertSame(result, cachedResult);
+ IcebergSourceSplit deserialized2 = serializer.deserialize(serializer.getVersion(), cachedResult);
+ assertSplitEquals(split, deserialized2);
+
+ split.updatePosition(0, 100);
+ byte[] resultAfterUpdatePosition = serializer.serialize(split);
+ // after position change, serialized bytes should have changed
+ Assert.assertNotSame(cachedResult, resultAfterUpdatePosition);
+ IcebergSourceSplit deserialized3 = serializer.deserialize(serializer.getVersion(), resultAfterUpdatePosition);
+ assertSplitEquals(split, deserialized3);
+ }
+ }
+
+ @Test
+ public void testV1() throws Exception {
+ serializeAndDeserializeV1(1, 1);
+ serializeAndDeserializeV1(10, 2);
+ }
+
+ private void serializeAndDeserializeV1(int splitCount, int filesPerSplit) throws Exception {
+ final List<IcebergSourceSplit> splits = SplitHelpers
+ .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, splitCount, filesPerSplit);
+ for (IcebergSourceSplit split : splits) {
+ byte[] result = split.serializeV1();
+ IcebergSourceSplit deserialized = IcebergSourceSplit.deserializeV1(result);
+ assertSplitEquals(split, deserialized);
+ }
+ }
+
+ @Test
+ public void testCheckpointedPosition() throws Exception {
+ final AtomicInteger index = new AtomicInteger();
+ final List<IcebergSourceSplit> splits = SplitHelpers
+ .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 10, 2).stream()
+ .map(split -> {
+ IcebergSourceSplit result;
+ if (index.get() % 2 == 0) {
+ result = IcebergSourceSplit.fromCombinedScanTask(split.task(), index.get(), index.get());
+ } else {
+ result = split;
+ }
+ index.incrementAndGet();
+ return result;
+ })
+ .collect(Collectors.toList());
+
+ for (IcebergSourceSplit split : splits) {
+ byte[] result = serializer.serialize(split);
+ IcebergSourceSplit deserialized = serializer.deserialize(serializer.getVersion(), result);
+ assertSplitEquals(split, deserialized);
+
+ byte[] cachedResult = serializer.serialize(split);
+ Assert.assertSame(result, cachedResult);
+ IcebergSourceSplit deserialized2 = serializer.deserialize(serializer.getVersion(), cachedResult);
+ assertSplitEquals(split, deserialized2);
+ }
+ }
+
+ private void assertSplitEquals(IcebergSourceSplit expected, IcebergSourceSplit actual) {
+ Assert.assertEquals(expected.splitId(), actual.splitId());
+ Assert.assertEquals(expected.fileOffset(), actual.fileOffset());
+ Assert.assertEquals(expected.recordOffset(), actual.recordOffset());
+ }
+}