You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2020/10/28 04:07:03 UTC

[iceberg] branch master updated: Flink: maintain the complete data files into manifest before checkpoint finished. (#1477)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new b9634c9  Flink: maintain the complete data files into manifest before checkpoint finished. (#1477)
b9634c9 is described below

commit b9634c9511c8a028074e8e5bdc54f0db47058668
Author: openinx <op...@gmail.com>
AuthorDate: Wed Oct 28 12:06:53 2020 +0800

    Flink: maintain the complete data files into manifest before checkpoint finished. (#1477)
---
 .../java/org/apache/iceberg/ManifestFiles.java     |  32 ++++
 .../org/apache/iceberg/avro/AvroEncoderUtil.java   |  88 +++++++++
 .../apache/iceberg/avro/TestAvroEncoderUtil.java   |  54 ++++++
 .../flink/sink/FlinkManifestSerializer.java        |  48 +++++
 .../iceberg/flink/sink/FlinkManifestUtil.java      |  66 +++++++
 .../iceberg/flink/sink/IcebergFilesCommitter.java  |  94 +++++++---
 .../flink/sink/ManifestOutputFileFactory.java      |  78 ++++++++
 .../iceberg/flink/sink/TestFlinkManifest.java      | 199 +++++++++++++++++++++
 .../flink/sink/TestIcebergFilesCommitter.java      | 119 +++++++++++-
 9 files changed, 753 insertions(+), 25 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index c629156..9dbf50e 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -22,6 +22,8 @@ package org.apache.iceberg;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.iceberg.ManifestReader.FileType;
+import org.apache.iceberg.avro.AvroEncoderUtil;
+import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
@@ -29,11 +31,18 @@ import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 
 public class ManifestFiles {
   private ManifestFiles() {
   }
 
+  private static final org.apache.avro.Schema MANIFEST_AVRO_SCHEMA = AvroSchemaUtil.convert(ManifestFile.schema(),
+      ImmutableMap.of(
+          ManifestFile.schema().asStruct(), GenericManifestFile.class.getName(),
+          ManifestFile.PARTITION_SUMMARY_TYPE, GenericPartitionFieldSummary.class.getName()
+      ));
+
   /**
    * Returns a {@link CloseableIterable} of file paths in the {@link ManifestFile}.
    *
@@ -149,6 +158,29 @@ public class ManifestFiles {
     throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion);
   }
 
+  /**
+   * Encode the {@link ManifestFile} to a byte array by using avro encoder.
+   *
+   * @param manifestFile a {@link ManifestFile}, which should always be a {@link GenericManifestFile}.
+   * @return the binary data.
+   * @throws IOException if encounter any IO error when encoding.
+   */
+  public static byte[] encode(ManifestFile manifestFile) throws IOException {
+    GenericManifestFile genericManifestFile = (GenericManifestFile) manifestFile;
+    return AvroEncoderUtil.encode(genericManifestFile, MANIFEST_AVRO_SCHEMA);
+  }
+
+  /**
+   * Decode the binary data into a {@link ManifestFile}.
+   *
+   * @param manifestData the binary data.
+   * @return a {@link ManifestFile}. To be precise, it's a {@link GenericManifestFile} which don't expose to public.
+   * @throws IOException if encounter any IO error when decoding.
+   */
+  public static ManifestFile decode(byte[] manifestData) throws IOException {
+    return AvroEncoderUtil.decode(manifestData);
+  }
+
   static ManifestReader<?> open(ManifestFile manifest, FileIO io) {
     return open(manifest, io, null);
   }
diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java
new file mode 100644
index 0000000..74e8ede
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class AvroEncoderUtil {
+
+  private AvroEncoderUtil() {
+  }
+
+  static {
+    LogicalTypes.register(LogicalMap.NAME, schema -> LogicalMap.get());
+  }
+
+  private static final byte[] MAGIC_BYTES = new byte[] {(byte) 0xC2, (byte) 0x01};
+
+  public static <T> byte[] encode(T datum, Schema avroSchema) throws IOException {
+    try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+      DataOutputStream dataOut = new DataOutputStream(out);
+
+      // Write the magic bytes
+      dataOut.write(MAGIC_BYTES);
+
+      // Write avro schema
+      dataOut.writeUTF(avroSchema.toString());
+
+      // Encode the datum with avro schema.
+      BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+      DatumWriter<T> writer = new GenericAvroWriter<>(avroSchema);
+      writer.write(datum, encoder);
+      encoder.flush();
+
+      return out.toByteArray();
+    }
+  }
+
+  public static <T> T decode(byte[] data) throws IOException {
+    try (ByteArrayInputStream in = new ByteArrayInputStream(data, 0, data.length)) {
+      DataInputStream dataInput = new DataInputStream(in);
+
+      // Read the magic bytes
+      byte header0 = dataInput.readByte();
+      byte header1 = dataInput.readByte();
+      Preconditions.checkState(header0 == MAGIC_BYTES[0] && header1 == MAGIC_BYTES[1],
+          "Unrecognized header bytes: 0x%02X 0x%02X", header0, header1);
+
+      // Read avro schema
+      Schema avroSchema = new Schema.Parser().parse(dataInput.readUTF());
+
+      // Decode the datum with the parsed avro schema.
+      BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(in, null);
+      DatumReader<T> reader = new GenericAvroReader<>(avroSchema);
+      reader.setSchema(avroSchema);
+      return reader.read(null, binaryDecoder);
+    }
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroEncoderUtil.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroEncoderUtil.java
new file mode 100644
index 0000000..483493f
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroEncoderUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.types.Type;
+import org.junit.Assert;
+
+public class TestAvroEncoderUtil extends AvroDataTest {
+
+  @Override
+  protected void writeAndValidate(org.apache.iceberg.Schema schema) throws IOException {
+    List<GenericData.Record> expected = RandomAvroData.generate(schema, 100, 1990L);
+    Map<Type, Schema> typeToSchema = AvroSchemaUtil.convertTypes(schema.asStruct(), "test");
+    Schema avroSchema = typeToSchema.get(schema.asStruct());
+
+    for (GenericData.Record record : expected) {
+      byte[] serializedData = AvroEncoderUtil.encode(record, avroSchema);
+      GenericData.Record expectedRecord = AvroEncoderUtil.decode(serializedData);
+
+      // Fallback to compare the record's string, because its equals implementation will depend on the avro schema.
+      // While the avro schema will convert the 'map' type to be a list of key/value pairs for non-string keys, it
+      // would be failing to read the 'array' from a 'map'.
+      Assert.assertEquals(expectedRecord.toString(), record.toString());
+
+      byte[] serializedData2 = AvroEncoderUtil.encode(expectedRecord, avroSchema);
+      Assert.assertArrayEquals(serializedData2, serializedData);
+
+      expectedRecord = AvroEncoderUtil.decode(serializedData2);
+      Assert.assertEquals(expectedRecord.toString(), record.toString());
+    }
+  }
+}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestSerializer.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestSerializer.java
new file mode 100644
index 0000000..bec4e65
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestSerializer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+class FlinkManifestSerializer implements SimpleVersionedSerializer<ManifestFile> {
+  private static final int VERSION_NUM = 1;
+  static final FlinkManifestSerializer INSTANCE = new FlinkManifestSerializer();
+
+  @Override
+  public int getVersion() {
+    return VERSION_NUM;
+  }
+
+  @Override
+  public byte[] serialize(ManifestFile manifestFile) throws IOException {
+    Preconditions.checkNotNull(manifestFile, "ManifestFile to be serialized should not be null");
+
+    return ManifestFiles.encode(manifestFile);
+  }
+
+  @Override
+  public ManifestFile deserialize(int version, byte[] serialized) throws IOException {
+    return ManifestFiles.decode(serialized);
+  }
+}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
new file mode 100644
index 0000000..aa36c73
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+class FlinkManifestUtil {
+  private static final int ICEBERG_FORMAT_VERSION = 2;
+  private static final Long DUMMY_SNAPSHOT_ID = 0L;
+
+  private FlinkManifestUtil() {
+  }
+
+  static ManifestFile writeDataFiles(OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles)
+      throws IOException {
+    ManifestWriter<DataFile> writer = ManifestFiles.write(ICEBERG_FORMAT_VERSION, spec, outputFile, DUMMY_SNAPSHOT_ID);
+
+    try (ManifestWriter<DataFile> closeableWriter = writer) {
+      closeableWriter.addAll(dataFiles);
+    }
+
+    return writer.toManifestFile();
+  }
+
+  static List<DataFile> readDataFiles(ManifestFile manifestFile, FileIO io) throws IOException {
+    try (CloseableIterable<DataFile> dataFiles = ManifestFiles.read(manifestFile, io)) {
+      return Lists.newArrayList(dataFiles);
+    }
+  }
+
+  static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId,
+                                                           long attemptNumber) {
+    TableOperations ops = ((HasTableOperations) table).operations();
+    return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
+  }
+}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 1734a0d..ad89061 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -19,6 +19,8 @@
 
 package org.apache.iceberg.flink.sink;
 
+import java.io.IOException;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -27,8 +29,8 @@ import java.util.SortedMap;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -38,14 +40,16 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Comparators;
@@ -58,6 +62,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
 
   private static final long serialVersionUID = 1L;
   private static final long INITIAL_CHECKPOINT_ID = -1L;
+  private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
 
   private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class);
   private static final String FLINK_JOB_ID = "flink.job-id";
@@ -78,7 +83,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
   // <2, <file3>>. Snapshot for checkpoint#1 interrupted because of network/disk failure etc, while we don't expect
   // any data loss in iceberg table. So we keep the finished files <1, <file0, file1>> in memory and retry to commit
   // iceberg table when the next checkpoint happen.
-  private final NavigableMap<Long, List<DataFile>> dataFilesPerCheckpoint = Maps.newTreeMap();
+  private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
 
   // The data files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
   // 'dataFilesPerCheckpoint'.
@@ -87,6 +92,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
   // It will have an unique identifier for one job.
   private transient String flinkJobId;
   private transient Table table;
+  private transient ManifestOutputFileFactory manifestOutputFileFactory;
   private transient long maxCommittedCheckpointId;
 
   // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by the
@@ -97,8 +103,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
       "iceberg-flink-job-id", BasicTypeInfo.STRING_TYPE_INFO);
   private transient ListState<String> jobIdState;
   // All pending checkpoints states for this function.
-  private static final ListStateDescriptor<SortedMap<Long, List<DataFile>>> STATE_DESCRIPTOR = buildStateDescriptor();
-  private transient ListState<SortedMap<Long, List<DataFile>>> checkpointsState;
+  private static final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR = buildStateDescriptor();
+  private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
 
   IcebergFilesCommitter(TableLoader tableLoader, boolean replacePartitions) {
     this.tableLoader = tableLoader;
@@ -113,6 +119,10 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     // Open the table loader and load the table.
     this.tableLoader.open();
     this.table = tableLoader.loadTable();
+
+    int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+    int attemptId = getRuntimeContext().getAttemptNumber();
+    this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
     this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
@@ -127,7 +137,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
       // checkpoint id from restored flink job to the current flink job.
       this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId);
 
-      NavigableMap<Long, List<DataFile>> uncommittedDataFiles = Maps
+      NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
           .newTreeMap(checkpointsState.get().iterator().next())
           .tailMap(maxCommittedCheckpointId, false);
       if (!uncommittedDataFiles.isEmpty()) {
@@ -145,7 +155,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", table, checkpointId);
 
     // Update the checkpoint state.
-    dataFilesPerCheckpoint.put(checkpointId, ImmutableList.copyOf(dataFilesOfCurrentCheckpoint));
+    dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
 
     // Reset the snapshot state to the latest state.
     checkpointsState.clear();
@@ -174,14 +184,24 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     }
   }
 
-  private void commitUpToCheckpoint(NavigableMap<Long, List<DataFile>> dataFilesMap,
+  private void commitUpToCheckpoint(NavigableMap<Long, byte[]> manifestsMap,
                                     String newFlinkJobId,
-                                    long checkpointId) {
-    NavigableMap<Long, List<DataFile>> pendingFileMap = dataFilesMap.headMap(checkpointId, true);
+                                    long checkpointId) throws IOException {
+    NavigableMap<Long, byte[]> pendingManifestMap = manifestsMap.headMap(checkpointId, true);
 
+    List<ManifestFile> manifestFiles = Lists.newArrayList();
     List<DataFile> pendingDataFiles = Lists.newArrayList();
-    for (List<DataFile> dataFiles : pendingFileMap.values()) {
-      pendingDataFiles.addAll(dataFiles);
+    for (byte[] manifestData : pendingManifestMap.values()) {
+      if (Arrays.equals(EMPTY_MANIFEST_DATA, manifestData)) {
+        // Skip the empty flink manifest.
+        continue;
+      }
+
+      ManifestFile manifestFile =
+          SimpleVersionedSerialization.readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE, manifestData);
+
+      manifestFiles.add(manifestFile);
+      pendingDataFiles.addAll(FlinkManifestUtil.readDataFiles(manifestFile, table.io()));
     }
 
     if (replacePartitions) {
@@ -190,8 +210,23 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
       append(pendingDataFiles, newFlinkJobId, checkpointId);
     }
 
-    // Clear the committed data files from dataFilesPerCheckpoint.
-    pendingFileMap.clear();
+    pendingManifestMap.clear();
+
+    // Delete the committed manifests and clear the committed data files from dataFilesPerCheckpoint.
+    for (ManifestFile manifestFile : manifestFiles) {
+      try {
+        table.io().deleteFile(manifestFile.path());
+      } catch (Exception e) {
+        // The flink manifests cleaning failure shouldn't abort the completed checkpoint.
+        String details = MoreObjects.toStringHelper(this)
+            .add("flinkJobId", newFlinkJobId)
+            .add("checkpointId", checkpointId)
+            .add("manifestPath", manifestFile.path())
+            .toString();
+        LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}",
+            details, e);
+      }
+    }
   }
 
   private void replacePartitions(List<DataFile> dataFiles, String newFlinkJobId, long checkpointId) {
@@ -236,12 +271,27 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
   }
 
   @Override
-  public void endInput() {
+  public void endInput() throws IOException {
     // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
-    dataFilesPerCheckpoint.put(Long.MAX_VALUE, ImmutableList.copyOf(dataFilesOfCurrentCheckpoint));
+    long currentCheckpointId = Long.MAX_VALUE;
+    dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId));
     dataFilesOfCurrentCheckpoint.clear();
 
-    commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, Long.MAX_VALUE);
+    commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, currentCheckpointId);
+  }
+
+  /**
+   * Write all the complete data files to a newly created manifest file and return the manifest's avro serialized bytes.
+   */
+  private byte[] writeToManifest(long checkpointId) throws IOException {
+    if (dataFilesOfCurrentCheckpoint.isEmpty()) {
+      return EMPTY_MANIFEST_DATA;
+    }
+
+    OutputFile manifestOutputFile = manifestOutputFileFactory.create(checkpointId);
+    ManifestFile manifestFile =
+        FlinkManifestUtil.writeDataFiles(manifestOutputFile, table.spec(), dataFilesOfCurrentCheckpoint);
+    return SimpleVersionedSerialization.writeVersionAndSerialize(FlinkManifestSerializer.INSTANCE, manifestFile);
   }
 
   @Override
@@ -251,13 +301,11 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     }
   }
 
-  private static ListStateDescriptor<SortedMap<Long, List<DataFile>>> buildStateDescriptor() {
+  private static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor() {
     Comparator<Long> longComparator = Comparators.forType(Types.LongType.get());
-    // Construct a ListTypeInfo.
-    ListTypeInfo<DataFile> dataFileListTypeInfo = new ListTypeInfo<>(TypeInformation.of(DataFile.class));
     // Construct a SortedMapTypeInfo.
-    SortedMapTypeInfo<Long, List<DataFile>> sortedMapTypeInfo = new SortedMapTypeInfo<>(
-        BasicTypeInfo.LONG_TYPE_INFO, dataFileListTypeInfo, longComparator
+    SortedMapTypeInfo<Long, byte[]> sortedMapTypeInfo = new SortedMapTypeInfo<>(
+        BasicTypeInfo.LONG_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, longComparator
     );
     return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo);
   }
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
new file mode 100644
index 0000000..fca8608
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+
+class ManifestOutputFileFactory {
+  // Users could define their own flink manifests directory by setting this value in table properties.
+  static final String FLINK_MANIFEST_LOCATION = "flink.manifests.location";
+
+  private final TableOperations ops;
+  private final FileIO io;
+  private final Map<String, String> props;
+  private final String flinkJobId;
+  private final int subTaskId;
+  private final long attemptNumber;
+  private final AtomicInteger fileCount = new AtomicInteger(0);
+
+  ManifestOutputFileFactory(TableOperations ops, FileIO io, Map<String, String> props,
+                            String flinkJobId, int subTaskId, long attemptNumber) {
+    this.ops = ops;
+    this.io = io;
+    this.props = props;
+    this.flinkJobId = flinkJobId;
+    this.subTaskId = subTaskId;
+    this.attemptNumber = attemptNumber;
+  }
+
+  private String generatePath(long checkpointId) {
+    return FileFormat.AVRO.addExtension(String.format("%s-%05d-%d-%d-%05d", flinkJobId, subTaskId,
+        attemptNumber, checkpointId, fileCount.incrementAndGet()));
+  }
+
+  OutputFile create(long checkpointId) {
+    String flinkManifestDir = props.get(FLINK_MANIFEST_LOCATION);
+
+    String newManifestFullPath;
+    if (Strings.isNullOrEmpty(flinkManifestDir)) {
+      // User don't specify any flink manifest directory, so just use the default metadata path.
+      newManifestFullPath = ops.metadataFileLocation(generatePath(checkpointId));
+    } else {
+      newManifestFullPath = String.format("%s/%s", stripTrailingSlash(flinkManifestDir), generatePath(checkpointId));
+    }
+
+    return io.newOutputFile(newManifestFullPath);
+  }
+
+  private static String stripTrailingSlash(String path) {
+    String result = path;
+    while (result.endsWith("/")) {
+      result = result.substring(0, result.length() - 1);
+    }
+    return result;
+  }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
new file mode 100644
index 0000000..9fd1507
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION;
+
+public class TestFlinkManifest {
+  private static final Configuration CONF = new Configuration();
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  private String tablePath;
+  private Table table;
+  private final AtomicInteger dataFileCount = new AtomicInteger(0);
+
+  @Before
+  public void before() throws IOException {
+    File folder = tempFolder.newFolder();
+    String warehouse = folder.getAbsolutePath();
+
+    tablePath = warehouse.concat("/test");
+    Assert.assertTrue("Should create the table directory correctly.", new File(tablePath).mkdir());
+
+    // Construct the iceberg table.
+    table = SimpleDataUtil.createTable(tablePath, ImmutableMap.of(), false);
+  }
+
+
+  @Test
+  public void testIO() throws IOException {
+    String flinkJobId = newFlinkJobId();
+    for (long checkpointId = 1; checkpointId <= 3; checkpointId++) {
+      ManifestOutputFileFactory factory =
+          FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
+      OutputFile manifestOutputFile = factory.create(checkpointId);
+
+      List<DataFile> expectedDataFiles = generateDataFiles(10);
+      ManifestFile manifestFile = FlinkManifestUtil.writeDataFiles(manifestOutputFile, table.spec(), expectedDataFiles);
+
+      List<DataFile> actualDataFiles = FlinkManifestUtil.readDataFiles(manifestFile, table.io());
+
+      Assert.assertEquals("Size of data file list are not equal.", expectedDataFiles.size(), actualDataFiles.size());
+      for (int i = 0; i < expectedDataFiles.size(); i++) {
+        checkDataFile(expectedDataFiles.get(i), actualDataFiles.get(i));
+      }
+    }
+  }
+
+  @Test
+  public void testUserProvidedManifestLocation() throws IOException {
+    long checkpointId = 1;
+    String flinkJobId = newFlinkJobId();
+    File userProvidedFolder = tempFolder.newFolder();
+    Map<String, String> props = ImmutableMap.of(FLINK_MANIFEST_LOCATION, userProvidedFolder.getAbsolutePath() + "///");
+    ManifestOutputFileFactory factory = new ManifestOutputFileFactory(
+        ((HasTableOperations) table).operations(), table.io(), props,
+        flinkJobId, 1, 1);
+
+    OutputFile outputFile = factory.create(checkpointId);
+    List<DataFile> expectedDataFiles = generateDataFiles(5);
+    ManifestFile manifestFile = FlinkManifestUtil.writeDataFiles(outputFile, table.spec(), expectedDataFiles);
+
+    Assert.assertEquals("The newly created manifest file should be located under the user provided directory",
+        userProvidedFolder.toPath(), Paths.get(manifestFile.path()).getParent());
+
+    List<DataFile> actualDataFiles = FlinkManifestUtil.readDataFiles(manifestFile, table.io());
+
+    Assert.assertEquals("Size of data file list are not equal.", expectedDataFiles.size(), actualDataFiles.size());
+    for (int i = 0; i < expectedDataFiles.size(); i++) {
+      checkDataFile(expectedDataFiles.get(i), actualDataFiles.get(i));
+    }
+  }
+
+  @Test
+  public void testVersionedSerializer() throws IOException {
+    long checkpointId = 1;
+    String flinkJobId = newFlinkJobId();
+    ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
+    OutputFile outputFile = factory.create(checkpointId);
+
+    List<DataFile> expectedDataFiles = generateDataFiles(10);
+    ManifestFile expected = FlinkManifestUtil.writeDataFiles(outputFile, table.spec(), expectedDataFiles);
+
+    byte[] versionedSerializeData =
+        SimpleVersionedSerialization.writeVersionAndSerialize(FlinkManifestSerializer.INSTANCE, expected);
+    ManifestFile actual = SimpleVersionedSerialization
+        .readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE, versionedSerializeData);
+    checkManifestFile(expected, actual);
+
+    byte[] versionedSerializeData2 =
+        SimpleVersionedSerialization.writeVersionAndSerialize(FlinkManifestSerializer.INSTANCE, actual);
+    Assert.assertArrayEquals(versionedSerializeData, versionedSerializeData2);
+  }
+
+  private DataFile writeDataFile(String filename, List<RowData> rows) throws IOException {
+    return SimpleDataUtil.writeFile(table.schema(), table.spec(), CONF,
+        tablePath, FileFormat.PARQUET.addExtension(filename), rows);
+  }
+
+  private List<DataFile> generateDataFiles(int fileNum) throws IOException {
+    List<RowData> rowDataList = Lists.newArrayList();
+    List<DataFile> dataFiles = Lists.newArrayList();
+    for (int i = 0; i < fileNum; i++) {
+      rowDataList.add(SimpleDataUtil.createRowData(i, "a" + i));
+      dataFiles.add(writeDataFile("data-file-" + dataFileCount.incrementAndGet(), rowDataList));
+    }
+    return dataFiles;
+  }
+
+  private static String newFlinkJobId() {
+    return UUID.randomUUID().toString();
+  }
+
+  private static void checkManifestFile(ManifestFile expected, ManifestFile actual) {
+    Assert.assertEquals("Path must match", expected.path(), actual.path());
+    Assert.assertEquals("Length must match", expected.length(), actual.length());
+    Assert.assertEquals("Spec id must match", expected.partitionSpecId(), actual.partitionSpecId());
+    Assert.assertEquals("ManifestContent must match", expected.content(), actual.content());
+    Assert.assertEquals("SequenceNumber must match", expected.sequenceNumber(), actual.sequenceNumber());
+    Assert.assertEquals("MinSequenceNumber must match", expected.minSequenceNumber(), actual.minSequenceNumber());
+    Assert.assertEquals("Snapshot id must match", expected.snapshotId(), actual.snapshotId());
+    Assert.assertEquals("Added files flag must match", expected.hasAddedFiles(), actual.hasAddedFiles());
+    Assert.assertEquals("Added files count must match", expected.addedFilesCount(), actual.addedFilesCount());
+    Assert.assertEquals("Added rows count must match", expected.addedRowsCount(), actual.addedRowsCount());
+    Assert.assertEquals("Existing files flag must match", expected.hasExistingFiles(), actual.hasExistingFiles());
+    Assert.assertEquals("Existing files count must match", expected.existingFilesCount(), actual.existingFilesCount());
+    Assert.assertEquals("Existing rows count must match", expected.existingRowsCount(), actual.existingRowsCount());
+    Assert.assertEquals("Deleted files flag must match", expected.hasDeletedFiles(), actual.hasDeletedFiles());
+    Assert.assertEquals("Deleted files count must match", expected.deletedFilesCount(), actual.deletedFilesCount());
+    Assert.assertEquals("Deleted rows count must match", expected.deletedRowsCount(), actual.deletedRowsCount());
+    Assert.assertEquals("PartitionFieldSummary must match", expected.partitions(), actual.partitions());
+  }
+
+  static void checkDataFile(DataFile expected, DataFile actual) {
+    if (expected == actual) {
+      return;
+    }
+    Assert.assertTrue("Shouldn't have null DataFile.", expected != null && actual != null);
+    Assert.assertEquals("SpecId", expected.specId(), actual.specId());
+    Assert.assertEquals("Content", expected.content(), actual.content());
+    Assert.assertEquals("Path", expected.path(), actual.path());
+    Assert.assertEquals("Format", expected.format(), actual.format());
+    Assert.assertEquals("Partition", expected.partition(), actual.partition());
+    Assert.assertEquals("Record count", expected.recordCount(), actual.recordCount());
+    Assert.assertEquals("File size in bytes", expected.fileSizeInBytes(), actual.fileSizeInBytes());
+    Assert.assertEquals("Column sizes", expected.columnSizes(), actual.columnSizes());
+    Assert.assertEquals("Value counts", expected.valueCounts(), actual.valueCounts());
+    Assert.assertEquals("Null value counts", expected.nullValueCounts(), actual.nullValueCounts());
+    Assert.assertEquals("Lower bounds", expected.lowerBounds(), actual.lowerBounds());
+    Assert.assertEquals("Upper bounds", expected.upperBounds(), actual.upperBounds());
+    Assert.assertEquals("Key metadata", expected.keyMetadata(), actual.keyMetadata());
+    Assert.assertEquals("Split offsets", expected.splitOffsets(), actual.splitOffsets());
+    Assert.assertNull(actual.equalityFieldIds());
+    Assert.assertNull(expected.equalityFieldIds());
+  }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index 2ce7c36..583b6f1 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -21,9 +21,12 @@ package org.apache.iceberg.flink.sink;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
@@ -40,8 +43,10 @@ import org.apache.flink.table.data.RowData;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestContent;
+import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -55,6 +60,9 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION;
+
 @RunWith(Parameterized.class)
 public class TestIcebergFilesCommitter {
   private static final Configuration CONF = new Configuration();
@@ -64,6 +72,7 @@ public class TestIcebergFilesCommitter {
 
   private String tablePath;
   private Table table;
+  private File flinkManifestFolder;
 
   private final FileFormat format;
 
@@ -83,13 +92,19 @@ public class TestIcebergFilesCommitter {
   @Before
   public void before() throws IOException {
     File folder = tempFolder.newFolder();
+    flinkManifestFolder = tempFolder.newFolder();
     String warehouse = folder.getAbsolutePath();
 
     tablePath = warehouse.concat("/test");
     Assert.assertTrue("Should create the table directory correctly.", new File(tablePath).mkdir());
 
     // Construct the iceberg table.
-    Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+    Map<String, String> props = ImmutableMap.of(
+        // file format.
+        DEFAULT_FILE_FORMAT, format.name(),
+        // temporary flink manifests location.
+        FLINK_MANIFEST_LOCATION, flinkManifestFolder.getAbsolutePath()
+    );
     table = SimpleDataUtil.createTable(tablePath, props, false);
   }
 
@@ -110,7 +125,11 @@ public class TestIcebergFilesCommitter {
       // failover won't fail.
       for (int i = 1; i <= 3; i++) {
         harness.snapshot(++checkpointId, ++timestamp);
+        assertFlinkManifests(0);
+
         harness.notifyOfCompletedCheckpoint(checkpointId);
+        assertFlinkManifests(0);
+
         assertSnapshotSize(i);
         assertMaxCommittedCheckpointId(jobId, checkpointId);
       }
@@ -142,8 +161,10 @@ public class TestIcebergFilesCommitter {
         rows.add(rowData);
 
         harness.snapshot(i, ++timestamp);
+        assertFlinkManifests(1);
 
         harness.notifyOfCompletedCheckpoint(i);
+        assertFlinkManifests(0);
 
         SimpleDataUtil.assertTableRows(tablePath, ImmutableList.copyOf(rows));
         assertSnapshotSize(i);
@@ -177,6 +198,7 @@ public class TestIcebergFilesCommitter {
       // 1. snapshotState for checkpoint#1
       long firstCheckpointId = 1;
       harness.snapshot(firstCheckpointId, ++timestamp);
+      assertFlinkManifests(1);
 
       RowData row2 = SimpleDataUtil.createRowData(2, "world");
       DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2));
@@ -186,16 +208,19 @@ public class TestIcebergFilesCommitter {
       // 2. snapshotState for checkpoint#2
       long secondCheckpointId = 2;
       harness.snapshot(secondCheckpointId, ++timestamp);
+      assertFlinkManifests(2);
 
       // 3. notifyCheckpointComplete for checkpoint#1
       harness.notifyOfCompletedCheckpoint(firstCheckpointId);
       SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1));
       assertMaxCommittedCheckpointId(jobId, firstCheckpointId);
+      assertFlinkManifests(1);
 
       // 4. notifyCheckpointComplete for checkpoint#2
       harness.notifyOfCompletedCheckpoint(secondCheckpointId);
       SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2));
       assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+      assertFlinkManifests(0);
     }
   }
 
@@ -224,6 +249,7 @@ public class TestIcebergFilesCommitter {
       // 1. snapshotState for checkpoint#1
       long firstCheckpointId = 1;
       harness.snapshot(firstCheckpointId, ++timestamp);
+      assertFlinkManifests(1);
 
       RowData row2 = SimpleDataUtil.createRowData(2, "world");
       DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2));
@@ -233,16 +259,19 @@ public class TestIcebergFilesCommitter {
       // 2. snapshotState for checkpoint#2
       long secondCheckpointId = 2;
       harness.snapshot(secondCheckpointId, ++timestamp);
+      assertFlinkManifests(2);
 
       // 3. notifyCheckpointComplete for checkpoint#2
       harness.notifyOfCompletedCheckpoint(secondCheckpointId);
       SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2));
       assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+      assertFlinkManifests(0);
 
       // 4. notifyCheckpointComplete for checkpoint#1
       harness.notifyOfCompletedCheckpoint(firstCheckpointId);
       SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2));
       assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+      assertFlinkManifests(0);
     }
   }
 
@@ -267,7 +296,11 @@ public class TestIcebergFilesCommitter {
 
       harness.processElement(dataFile1, ++timestamp);
       snapshot = harness.snapshot(++checkpointId, ++timestamp);
+      assertFlinkManifests(1);
+
       harness.notifyOfCompletedCheckpoint(checkpointId);
+      assertFlinkManifests(0);
+
       SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row));
       assertSnapshotSize(1);
       assertMaxCommittedCheckpointId(jobId, checkpointId);
@@ -289,7 +322,11 @@ public class TestIcebergFilesCommitter {
       harness.processElement(dataFile, ++timestamp);
 
       harness.snapshot(++checkpointId, ++timestamp);
+      assertFlinkManifests(1);
+
       harness.notifyOfCompletedCheckpoint(checkpointId);
+      assertFlinkManifests(0);
+
       SimpleDataUtil.assertTableRows(tablePath, expectedRows);
       assertSnapshotSize(2);
       assertMaxCommittedCheckpointId(jobId, checkpointId);
@@ -320,6 +357,7 @@ public class TestIcebergFilesCommitter {
       snapshot = harness.snapshot(++checkpointId, ++timestamp);
       SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of());
       assertMaxCommittedCheckpointId(jobId, -1L);
+      assertFlinkManifests(1);
     }
 
     try (OneInputStreamOperatorTestHarness<DataFile, Void> harness = createStreamSink(jobId)) {
@@ -327,11 +365,19 @@ public class TestIcebergFilesCommitter {
       harness.initializeState(snapshot);
       harness.open();
 
+      // All flink manifests should be cleaned because it has committed the unfinished iceberg transaction.
+      assertFlinkManifests(0);
+
       SimpleDataUtil.assertTableRows(tablePath, expectedRows);
       assertMaxCommittedCheckpointId(jobId, checkpointId);
 
       harness.snapshot(++checkpointId, ++timestamp);
+      // Did not write any new record, so it won't generate new manifest.
+      assertFlinkManifests(0);
+
       harness.notifyOfCompletedCheckpoint(checkpointId);
+      assertFlinkManifests(0);
+
       SimpleDataUtil.assertTableRows(tablePath, expectedRows);
       assertSnapshotSize(2);
       assertMaxCommittedCheckpointId(jobId, checkpointId);
@@ -342,6 +388,7 @@ public class TestIcebergFilesCommitter {
       harness.processElement(dataFile, ++timestamp);
 
       snapshot = harness.snapshot(++checkpointId, ++timestamp);
+      assertFlinkManifests(1);
     }
 
     // Redeploying flink job from external checkpoint.
@@ -351,6 +398,9 @@ public class TestIcebergFilesCommitter {
       harness.initializeState(snapshot);
       harness.open();
 
+      // All flink manifests should be cleaned because it has committed the unfinished iceberg transaction.
+      assertFlinkManifests(0);
+
       assertMaxCommittedCheckpointId(newJobId, -1);
       assertMaxCommittedCheckpointId(jobId, checkpointId);
       SimpleDataUtil.assertTableRows(tablePath, expectedRows);
@@ -362,7 +412,11 @@ public class TestIcebergFilesCommitter {
       harness.processElement(dataFile, ++timestamp);
 
       harness.snapshot(++checkpointId, ++timestamp);
+      assertFlinkManifests(1);
+
       harness.notifyOfCompletedCheckpoint(checkpointId);
+      assertFlinkManifests(0);
+
       SimpleDataUtil.assertTableRows(tablePath, expectedRows);
       assertSnapshotSize(4);
       assertMaxCommittedCheckpointId(newJobId, checkpointId);
@@ -391,8 +445,11 @@ public class TestIcebergFilesCommitter {
         DataFile dataFile = writeDataFile(String.format("data-%d", i), rows);
         harness.processElement(dataFile, ++timestamp);
         harness.snapshot(++checkpointId, ++timestamp);
+        assertFlinkManifests(1);
 
         harness.notifyOfCompletedCheckpoint(checkpointId);
+        assertFlinkManifests(0);
+
         SimpleDataUtil.assertTableRows(tablePath, tableRows);
         assertSnapshotSize(i);
         assertMaxCommittedCheckpointId(oldJobId, checkpointId);
@@ -417,8 +474,10 @@ public class TestIcebergFilesCommitter {
       DataFile dataFile = writeDataFile("data-new-1", rows);
       harness.processElement(dataFile, ++timestamp);
       harness.snapshot(++checkpointId, ++timestamp);
+      assertFlinkManifests(1);
 
       harness.notifyOfCompletedCheckpoint(checkpointId);
+      assertFlinkManifests(0);
       SimpleDataUtil.assertTableRows(tablePath, tableRows);
       assertSnapshotSize(4);
       assertMaxCommittedCheckpointId(newJobId, checkpointId);
@@ -448,8 +507,10 @@ public class TestIcebergFilesCommitter {
         DataFile dataFile = writeDataFile(String.format("data-%d", i), rows);
         harness.processElement(dataFile, ++timestamp);
         harness.snapshot(checkpointId + 1, ++timestamp);
+        assertFlinkManifests(1);
 
         harness.notifyOfCompletedCheckpoint(checkpointId + 1);
+        assertFlinkManifests(0);
         SimpleDataUtil.assertTableRows(tablePath, tableRows);
         assertSnapshotSize(i + 1);
         assertMaxCommittedCheckpointId(jobId, checkpointId + 1);
@@ -464,6 +525,7 @@ public class TestIcebergFilesCommitter {
       harness.setup();
       harness.open();
 
+      assertFlinkManifests(0);
       assertSnapshotSize(0);
       assertMaxCommittedCheckpointId(jobId, -1L);
 
@@ -473,12 +535,65 @@ public class TestIcebergFilesCommitter {
       harness.processElement(dataFile, 1);
       ((BoundedOneInput) harness.getOneInputOperator()).endInput();
 
+      assertFlinkManifests(0);
       SimpleDataUtil.assertTableRows(tablePath, tableRows);
       assertSnapshotSize(1);
       assertMaxCommittedCheckpointId(jobId, Long.MAX_VALUE);
     }
   }
 
+  @Test
+  public void testFlinkManifests() throws Exception {
+    long timestamp = 0;
+    final long checkpoint = 10;
+
+    JobID jobId = new JobID();
+    try (OneInputStreamOperatorTestHarness<DataFile, Void> harness = createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+
+      assertMaxCommittedCheckpointId(jobId, -1L);
+
+      RowData row1 = SimpleDataUtil.createRowData(1, "hello");
+      DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
+
+      harness.processElement(dataFile1, ++timestamp);
+      assertMaxCommittedCheckpointId(jobId, -1L);
+
+      // 1. snapshotState for checkpoint#1
+      harness.snapshot(checkpoint, ++timestamp);
+      List<Path> manifestPaths = assertFlinkManifests(1);
+      Path manifestPath = manifestPaths.get(0);
+      Assert.assertEquals("File name should have the expected pattern.",
+          String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString());
+
+      // 2. Read the data files from manifests and assert.
+      List<DataFile> dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());
+      Assert.assertEquals(1, dataFiles.size());
+      TestFlinkManifest.checkDataFile(dataFile1, dataFiles.get(0));
+
+      // 3. notifyCheckpointComplete for checkpoint#1
+      harness.notifyOfCompletedCheckpoint(checkpoint);
+      SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1));
+      assertMaxCommittedCheckpointId(jobId, checkpoint);
+      assertFlinkManifests(0);
+    }
+  }
+
+  private ManifestFile createTestingManifestFile(Path manifestPath) {
+    return new GenericManifestFile(manifestPath.toAbsolutePath().toString(), manifestPath.toFile().length(), 0,
+        ManifestContent.DATA, 0, 0, 0L, 0, 0, 0, 0, 0, 0, null);
+  }
+
+  private List<Path> assertFlinkManifests(int expectedCount) throws IOException {
+    List<Path> manifests = Files.list(flinkManifestFolder.toPath())
+        .filter(p -> !p.toString().endsWith(".crc"))
+        .collect(Collectors.toList());
+    Assert.assertEquals(String.format("Expected %s flink manifests, but the list is: %s", expectedCount, manifests),
+        expectedCount, manifests.size());
+    return manifests;
+  }
+
   private DataFile writeDataFile(String filename, List<RowData> rows) throws IOException {
     return SimpleDataUtil.writeFile(table.schema(), table.spec(), CONF, tablePath, format.addExtension(filename), rows);
   }