You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/01/09 18:15:27 UTC

[iceberg] branch master updated: Flink: Add FLIP-27 Iceberg source split (#3501)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d2c26a0  Flink: Add FLIP-27 Iceberg source split (#3501)
d2c26a0 is described below

commit d2c26a02190a16539c8c0621c4d8aac2e9e3ec6c
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Sun Jan 9 10:15:08 2022 -0800

    Flink: Add FLIP-27 Iceberg source split (#3501)
---
 .../iceberg/flink/source/FlinkInputFormat.java     |   2 +-
 ...kSplitGenerator.java => FlinkSplitPlanner.java} |  47 +++++---
 .../apache/iceberg/flink/source/ScanContext.java   |  24 +++-
 .../flink/source/StreamingMonitorFunction.java     |   2 +-
 .../flink/source/split/IcebergSourceSplit.java     | 122 +++++++++++++++++++++
 .../source/split/IcebergSourceSplitSerializer.java |  56 ++++++++++
 .../apache/iceberg/flink/source/SplitHelpers.java  |  90 +++++++++++++++
 .../flink/source/TestStreamingReaderOperator.java  |   2 +-
 .../split/TestIcebergSourceSplitSerializer.java    | 116 ++++++++++++++++++++
 9 files changed, 441 insertions(+), 20 deletions(-)

diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
index 8b757ac..a4cbab5 100644
--- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
+++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
@@ -77,7 +77,7 @@ public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit>
     tableLoader.open();
     try (TableLoader loader = tableLoader) {
       Table table = loader.loadTable();
-      return FlinkSplitGenerator.createInputSplits(table, context);
+      return FlinkSplitPlanner.planInputSplits(table, context);
     }
   }
 
diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
similarity index 63%
rename from flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
rename to flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
index f495e09..e000114 100644
--- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
+++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
@@ -22,33 +22,56 @@ package org.apache.iceberg.flink.source;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
+import org.apache.flink.annotation.Internal;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
-class FlinkSplitGenerator {
-  private FlinkSplitGenerator() {
+@Internal
+public class FlinkSplitPlanner {
+  private FlinkSplitPlanner() {
   }
 
-  static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
-    List<CombinedScanTask> tasks = tasks(table, context);
-    FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
-    for (int i = 0; i < tasks.size(); i++) {
-      splits[i] = new FlinkInputSplit(i, tasks.get(i));
+  static FlinkInputSplit[] planInputSplits(Table table, ScanContext context) {
+    try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
+      List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
+      FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
+      for (int i = 0; i < tasks.size(); i++) {
+        splits[i] = new FlinkInputSplit(i, tasks.get(i));
+      }
+      return splits;
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to process tasks iterable", e);
+    }
+  }
+
+  /**
+   * This returns splits for the FLIP-27 source
+   */
+  public static List<IcebergSourceSplit> planIcebergSourceSplits(Table table, ScanContext context) {
+    try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
+      return Lists.newArrayList(CloseableIterable.transform(tasksIterable,
+          task -> IcebergSourceSplit.fromCombinedScanTask(task)));
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to process task iterable: ", e);
     }
-    return splits;
   }
 
-  private static List<CombinedScanTask> tasks(Table table, ScanContext context) {
+  static CloseableIterable<CombinedScanTask> planTasks(Table table, ScanContext context) {
     TableScan scan = table
         .newScan()
         .caseSensitive(context.caseSensitive())
         .project(context.project());
 
+    if (context.includeColumnStats()) {
+      scan = scan.includeColumnStats();
+    }
+
     if (context.snapshotId() != null) {
       scan = scan.useSnapshot(context.snapshotId());
     }
@@ -83,10 +106,6 @@ class FlinkSplitGenerator {
       }
     }
 
-    try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
-      return Lists.newArrayList(tasksIterable);
-    } catch (IOException e) {
-      throw new UncheckedIOException("Failed to close table scan: " + scan, e);
-    }
+    return scan.planTasks();
   }
 }
diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 2896efb..d290a64 100644
--- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -68,6 +68,9 @@ class ScanContext implements Serializable {
   private static final ConfigOption<Duration> MONITOR_INTERVAL =
       ConfigOptions.key("monitor-interval").durationType().defaultValue(Duration.ofSeconds(10));
 
+  private static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS =
+      ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);
+
   private final boolean caseSensitive;
   private final Long snapshotId;
   private final Long startSnapshotId;
@@ -83,11 +86,12 @@ class ScanContext implements Serializable {
   private final Schema schema;
   private final List<Expression> filters;
   private final long limit;
+  private final boolean includeColumnStats;
 
   private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId,
                       Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost,
                       boolean isStreaming, Duration monitorInterval, String nameMapping,
-                      Schema schema, List<Expression> filters, long limit) {
+                      Schema schema, List<Expression> filters, long limit, boolean includeColumnStats) {
     this.caseSensitive = caseSensitive;
     this.snapshotId = snapshotId;
     this.startSnapshotId = startSnapshotId;
@@ -103,6 +107,7 @@ class ScanContext implements Serializable {
     this.schema = schema;
     this.filters = filters;
     this.limit = limit;
+    this.includeColumnStats = includeColumnStats;
   }
 
   boolean caseSensitive() {
@@ -161,6 +166,10 @@ class ScanContext implements Serializable {
     return limit;
   }
 
+  boolean includeColumnStats() {
+    return includeColumnStats;
+  }
+
   ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) {
     return ScanContext.builder()
         .caseSensitive(caseSensitive)
@@ -177,6 +186,7 @@ class ScanContext implements Serializable {
         .project(schema)
         .filters(filters)
         .limit(limit)
+        .includeColumnStats(includeColumnStats)
         .build();
   }
 
@@ -196,6 +206,7 @@ class ScanContext implements Serializable {
         .project(schema)
         .filters(filters)
         .limit(limit)
+        .includeColumnStats(includeColumnStats)
         .build();
   }
 
@@ -218,6 +229,7 @@ class ScanContext implements Serializable {
     private Schema projectedSchema;
     private List<Expression> filters;
     private long limit = -1L;
+    private boolean includeColumnStats = INCLUDE_COLUMN_STATS.defaultValue();
 
     private Builder() {
     }
@@ -292,6 +304,11 @@ class ScanContext implements Serializable {
       return this;
     }
 
+    Builder includeColumnStats(boolean newIncludeColumnStats) {
+      this.includeColumnStats = newIncludeColumnStats;
+      return this;
+    }
+
     Builder fromProperties(Map<String, String> properties) {
       Configuration config = new Configuration();
       properties.forEach(config::setString);
@@ -306,14 +323,15 @@ class ScanContext implements Serializable {
           .splitOpenFileCost(config.get(SPLIT_FILE_OPEN_COST))
           .streaming(config.get(STREAMING))
           .monitorInterval(config.get(MONITOR_INTERVAL))
-          .nameMapping(properties.get(DEFAULT_NAME_MAPPING));
+          .nameMapping(properties.get(DEFAULT_NAME_MAPPING))
+          .includeColumnStats(config.get(INCLUDE_COLUMN_STATS));
     }
 
     public ScanContext build() {
       return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
           endSnapshotId, asOfTimestamp, splitSize, splitLookback,
           splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema,
-          filters, limit);
+          filters, limit, includeColumnStats);
     }
   }
 }
diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
index 9d8e204..8bfad6d 100644
--- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
+++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
@@ -140,7 +140,7 @@ public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit
         newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId);
       }
 
-      FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext);
+      FlinkInputSplit[] splits = FlinkSplitPlanner.planInputSplits(table, newScanContext);
       for (FlinkInputSplit split : splits) {
         sourceContext.collect(split);
       }
diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
new file mode 100644
index 0000000..b46096a
--- /dev/null
+++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.split;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+
+@Internal
+public class IcebergSourceSplit implements SourceSplit, Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final CombinedScanTask task;
+
+  private int fileOffset;
+  private long recordOffset;
+
+  // The splits are frequently serialized into checkpoints.
+  // Caching the byte representation makes repeated serialization cheap.
+  @Nullable
+  private transient byte[] serializedBytesCache;
+
+  private IcebergSourceSplit(CombinedScanTask task, int fileOffset, long recordOffset) {
+    this.task = task;
+    this.fileOffset = fileOffset;
+    this.recordOffset = recordOffset;
+  }
+
+  public static IcebergSourceSplit fromCombinedScanTask(CombinedScanTask combinedScanTask) {
+    return fromCombinedScanTask(combinedScanTask, 0, 0L);
+  }
+
+  public static IcebergSourceSplit fromCombinedScanTask(
+      CombinedScanTask combinedScanTask, int fileOffset, long recordOffset) {
+    return new IcebergSourceSplit(combinedScanTask, fileOffset, recordOffset);
+  }
+
+  public CombinedScanTask task() {
+    return task;
+  }
+
+  public int fileOffset() {
+    return fileOffset;
+  }
+
+  public long recordOffset() {
+    return recordOffset;
+  }
+
+  @Override
+  public String splitId() {
+    return MoreObjects.toStringHelper(this)
+        .add("files", toString(task.files()))
+        .toString();
+  }
+
+  public void updatePosition(int newFileOffset, long newRecordOffset) {
+    // invalidate the cache after position change
+    serializedBytesCache = null;
+    fileOffset = newFileOffset;
+    recordOffset = newRecordOffset;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("files", toString(task.files()))
+        .add("fileOffset", fileOffset)
+        .add("recordOffset", recordOffset)
+        .toString();
+  }
+
+  private String toString(Collection<FileScanTask> files) {
+    return Iterables.toString(files.stream().map(fileScanTask ->
+        MoreObjects.toStringHelper(fileScanTask)
+            .add("file", fileScanTask.file().path().toString())
+            .add("start", fileScanTask.start())
+            .add("length", fileScanTask.length())
+            .toString()).collect(Collectors.toList()));
+  }
+
+  byte[] serializeV1() throws IOException {
+    if (serializedBytesCache == null) {
+      serializedBytesCache = InstantiationUtil.serializeObject(this);
+    }
+    return serializedBytesCache;
+  }
+
+  static IcebergSourceSplit deserializeV1(byte[] serialized) throws IOException {
+    try {
+      return InstantiationUtil.deserializeObject(serialized, IcebergSourceSplit.class.getClassLoader());
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException("Failed to deserialize the split.", e);
+    }
+  }
+}
diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
new file mode 100644
index 0000000..9e32af5
--- /dev/null
+++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.split;
+
+import java.io.IOException;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * TODO: use Java serialization for now.
+ * Will switch to more stable serializer from
+ * <a href="https://github.com/apache/iceberg/issues/1698">issue-1698</a>.
+ */
+@Internal
+public class IcebergSourceSplitSerializer implements SimpleVersionedSerializer<IcebergSourceSplit> {
+  public static final IcebergSourceSplitSerializer INSTANCE = new IcebergSourceSplitSerializer();
+  private static final int VERSION = 1;
+
+  @Override
+  public int getVersion() {
+    return VERSION;
+  }
+
+  @Override
+  public byte[] serialize(IcebergSourceSplit split) throws IOException {
+    return split.serializeV1();
+  }
+
+  @Override
+  public IcebergSourceSplit deserialize(int version, byte[] serialized) throws IOException {
+    switch (version) {
+      case 1:
+        return IcebergSourceSplit.deserializeV1(serialized);
+      default:
+        throw new IOException(String.format("Failed to deserialize IcebergSourceSplit. " +
+            "Encountered unsupported version: %d. Supported version are [1]", version));
+    }
+  }
+}
diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
new file mode 100644
index 0000000..df98808
--- /dev/null
+++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.rules.TemporaryFolder;
+
+public class SplitHelpers {
+
+  private static final AtomicLong splitLengthIncrement = new AtomicLong();
+
+  private SplitHelpers() {
+  }
+
+  /**
+   * This create a list of IcebergSourceSplit from real files
+   * <li>Create a new Hadoop table under the {@code temporaryFolder}
+   * <li>write {@code fileCount} number of files to the new Iceberg table
+   * <li>Discover the splits from the table and partition the splits by the {@code filePerSplit} limit
+   * <li>Delete the Hadoop table
+   *
+   * Since the table and data files are deleted before this method return,
+   * caller shouldn't attempt to read the data files.
+   */
+  public static List<IcebergSourceSplit> createSplitsFromTransientHadoopTable(
+      TemporaryFolder temporaryFolder, int fileCount, int filesPerSplit) throws Exception {
+    final File warehouseFile = temporaryFolder.newFolder();
+    Assert.assertTrue(warehouseFile.delete());
+    final String warehouse = "file:" + warehouseFile;
+    Configuration hadoopConf = new Configuration();
+    final HadoopCatalog catalog = new HadoopCatalog(hadoopConf, warehouse);
+    try {
+      final Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+      final GenericAppenderHelper dataAppender = new GenericAppenderHelper(
+          table, FileFormat.PARQUET, temporaryFolder);
+      for (int i = 0; i < fileCount; ++i) {
+        List<Record> records = RandomGenericData.generate(TestFixtures.SCHEMA, 2, i);
+        dataAppender.appendToTable(records);
+      }
+
+      final ScanContext scanContext = ScanContext.builder().build();
+      final List<IcebergSourceSplit> splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext);
+      return splits.stream()
+          .flatMap(split -> {
+            List<List<FileScanTask>> filesList = Lists.partition(
+                Lists.newArrayList(split.task().files()), filesPerSplit);
+            return filesList.stream()
+                .map(files ->  new BaseCombinedScanTask(files))
+                .map(combinedScanTask -> IcebergSourceSplit.fromCombinedScanTask(combinedScanTask));
+          })
+          .collect(Collectors.toList());
+    } finally {
+      catalog.dropTable(TestFixtures.TABLE_IDENTIFIER);
+      catalog.close();
+    }
+  }
+}
diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java
index 19c2b6a..7978af6 100644
--- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java
+++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java
@@ -254,7 +254,7 @@ public class TestStreamingReaderOperator extends TableTestBase {
             .build();
       }
 
-      Collections.addAll(inputSplits, FlinkSplitGenerator.createInputSplits(table, scanContext));
+      Collections.addAll(inputSplits, FlinkSplitPlanner.planInputSplits(table, scanContext));
     }
 
     return inputSplits;
diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java
new file mode 100644
index 0000000..36eea1e
--- /dev/null
+++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.split;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.iceberg.flink.source.SplitHelpers;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceSplitSerializer {
+
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private final IcebergSourceSplitSerializer serializer = IcebergSourceSplitSerializer.INSTANCE;
+
+  @Test
+  public void testLatestVersion() throws Exception {
+    serializeAndDeserialize(1, 1);
+    serializeAndDeserialize(10, 2);
+  }
+
+  private void serializeAndDeserialize(int splitCount, int filesPerSplit) throws Exception {
+    final List<IcebergSourceSplit> splits = SplitHelpers
+        .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, splitCount, filesPerSplit);
+    for (IcebergSourceSplit split : splits) {
+      byte[] result = serializer.serialize(split);
+      IcebergSourceSplit deserialized = serializer.deserialize(serializer.getVersion(), result);
+      assertSplitEquals(split, deserialized);
+
+      byte[] cachedResult = serializer.serialize(split);
+      Assert.assertSame(result, cachedResult);
+      IcebergSourceSplit deserialized2 = serializer.deserialize(serializer.getVersion(), cachedResult);
+      assertSplitEquals(split, deserialized2);
+
+      split.updatePosition(0, 100);
+      byte[] resultAfterUpdatePosition = serializer.serialize(split);
+      // after position change, serialized bytes should have changed
+      Assert.assertNotSame(cachedResult, resultAfterUpdatePosition);
+      IcebergSourceSplit deserialized3 = serializer.deserialize(serializer.getVersion(), resultAfterUpdatePosition);
+      assertSplitEquals(split, deserialized3);
+    }
+  }
+
+  @Test
+  public void testV1() throws Exception {
+    serializeAndDeserializeV1(1, 1);
+    serializeAndDeserializeV1(10, 2);
+  }
+
+  private void serializeAndDeserializeV1(int splitCount, int filesPerSplit) throws Exception {
+    final List<IcebergSourceSplit> splits = SplitHelpers
+        .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, splitCount, filesPerSplit);
+    for (IcebergSourceSplit split : splits) {
+      byte[] result = split.serializeV1();
+      IcebergSourceSplit deserialized = IcebergSourceSplit.deserializeV1(result);
+      assertSplitEquals(split, deserialized);
+    }
+  }
+
+  @Test
+  public void testCheckpointedPosition() throws Exception {
+    final AtomicInteger index = new AtomicInteger();
+    final List<IcebergSourceSplit> splits = SplitHelpers
+        .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 10, 2).stream()
+        .map(split -> {
+          IcebergSourceSplit result;
+          if (index.get() % 2 == 0) {
+            result = IcebergSourceSplit.fromCombinedScanTask(split.task(), index.get(), index.get());
+          } else {
+            result = split;
+          }
+          index.incrementAndGet();
+          return result;
+        })
+        .collect(Collectors.toList());
+
+    for (IcebergSourceSplit split : splits) {
+      byte[] result = serializer.serialize(split);
+      IcebergSourceSplit deserialized = serializer.deserialize(serializer.getVersion(), result);
+      assertSplitEquals(split, deserialized);
+
+      byte[] cachedResult = serializer.serialize(split);
+      Assert.assertSame(result, cachedResult);
+      IcebergSourceSplit deserialized2 = serializer.deserialize(serializer.getVersion(), cachedResult);
+      assertSplitEquals(split, deserialized2);
+    }
+  }
+
+  private void assertSplitEquals(IcebergSourceSplit expected, IcebergSourceSplit actual) {
+    Assert.assertEquals(expected.splitId(), actual.splitId());
+    Assert.assertEquals(expected.fileOffset(), actual.fileOffset());
+    Assert.assertEquals(expected.recordOffset(), actual.recordOffset());
+  }
+}