You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2020/10/28 04:07:03 UTC
[iceberg] branch master updated: Flink: maintain the complete data
files into manifest before checkpoint finished. (#1477)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new b9634c9 Flink: maintain the complete data files into manifest before checkpoint finished. (#1477)
b9634c9 is described below
commit b9634c9511c8a028074e8e5bdc54f0db47058668
Author: openinx <op...@gmail.com>
AuthorDate: Wed Oct 28 12:06:53 2020 +0800
Flink: maintain the complete data files into manifest before checkpoint finished. (#1477)
---
.../java/org/apache/iceberg/ManifestFiles.java | 32 ++++
.../org/apache/iceberg/avro/AvroEncoderUtil.java | 88 +++++++++
.../apache/iceberg/avro/TestAvroEncoderUtil.java | 54 ++++++
.../flink/sink/FlinkManifestSerializer.java | 48 +++++
.../iceberg/flink/sink/FlinkManifestUtil.java | 66 +++++++
.../iceberg/flink/sink/IcebergFilesCommitter.java | 94 +++++++---
.../flink/sink/ManifestOutputFileFactory.java | 78 ++++++++
.../iceberg/flink/sink/TestFlinkManifest.java | 199 +++++++++++++++++++++
.../flink/sink/TestIcebergFilesCommitter.java | 119 +++++++++++-
9 files changed, 753 insertions(+), 25 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index c629156..9dbf50e 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -22,6 +22,8 @@ package org.apache.iceberg;
import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.ManifestReader.FileType;
+import org.apache.iceberg.avro.AvroEncoderUtil;
+import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
@@ -29,11 +31,18 @@ import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
public class ManifestFiles {
private ManifestFiles() {
}
+ private static final org.apache.avro.Schema MANIFEST_AVRO_SCHEMA = AvroSchemaUtil.convert(ManifestFile.schema(),
+ ImmutableMap.of(
+ ManifestFile.schema().asStruct(), GenericManifestFile.class.getName(),
+ ManifestFile.PARTITION_SUMMARY_TYPE, GenericPartitionFieldSummary.class.getName()
+ ));
+
/**
* Returns a {@link CloseableIterable} of file paths in the {@link ManifestFile}.
*
@@ -149,6 +158,29 @@ public class ManifestFiles {
throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion);
}
+ /**
+ * Encode the {@link ManifestFile} to a byte array by using avro encoder.
+ *
+ * @param manifestFile a {@link ManifestFile}, which should always be a {@link GenericManifestFile}.
+ * @return the binary data.
+ * @throws IOException if encounter any IO error when encoding.
+ */
+ public static byte[] encode(ManifestFile manifestFile) throws IOException {
+ GenericManifestFile genericManifestFile = (GenericManifestFile) manifestFile;
+ return AvroEncoderUtil.encode(genericManifestFile, MANIFEST_AVRO_SCHEMA);
+ }
+
+ /**
+ * Decode the binary data into a {@link ManifestFile}.
+ *
+ * @param manifestData the binary data.
+ * @return a {@link ManifestFile}. To be precise, it's a {@link GenericManifestFile} which don't expose to public.
+ * @throws IOException if encounter any IO error when decoding.
+ */
+ public static ManifestFile decode(byte[] manifestData) throws IOException {
+ return AvroEncoderUtil.decode(manifestData);
+ }
+
static ManifestReader<?> open(ManifestFile manifest, FileIO io) {
return open(manifest, io, null);
}
diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java
new file mode 100644
index 0000000..74e8ede
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class AvroEncoderUtil {
+
+ private AvroEncoderUtil() {
+ }
+
+ static {
+ LogicalTypes.register(LogicalMap.NAME, schema -> LogicalMap.get());
+ }
+
+ private static final byte[] MAGIC_BYTES = new byte[] {(byte) 0xC2, (byte) 0x01};
+
+ public static <T> byte[] encode(T datum, Schema avroSchema) throws IOException {
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ DataOutputStream dataOut = new DataOutputStream(out);
+
+ // Write the magic bytes
+ dataOut.write(MAGIC_BYTES);
+
+ // Write avro schema
+ dataOut.writeUTF(avroSchema.toString());
+
+ // Encode the datum with avro schema.
+ BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+ DatumWriter<T> writer = new GenericAvroWriter<>(avroSchema);
+ writer.write(datum, encoder);
+ encoder.flush();
+
+ return out.toByteArray();
+ }
+ }
+
+ public static <T> T decode(byte[] data) throws IOException {
+ try (ByteArrayInputStream in = new ByteArrayInputStream(data, 0, data.length)) {
+ DataInputStream dataInput = new DataInputStream(in);
+
+ // Read the magic bytes
+ byte header0 = dataInput.readByte();
+ byte header1 = dataInput.readByte();
+ Preconditions.checkState(header0 == MAGIC_BYTES[0] && header1 == MAGIC_BYTES[1],
+ "Unrecognized header bytes: 0x%02X 0x%02X", header0, header1);
+
+ // Read avro schema
+ Schema avroSchema = new Schema.Parser().parse(dataInput.readUTF());
+
+ // Decode the datum with the parsed avro schema.
+ BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(in, null);
+ DatumReader<T> reader = new GenericAvroReader<>(avroSchema);
+ reader.setSchema(avroSchema);
+ return reader.read(null, binaryDecoder);
+ }
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroEncoderUtil.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroEncoderUtil.java
new file mode 100644
index 0000000..483493f
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroEncoderUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.types.Type;
+import org.junit.Assert;
+
+public class TestAvroEncoderUtil extends AvroDataTest {
+
+ @Override
+ protected void writeAndValidate(org.apache.iceberg.Schema schema) throws IOException {
+ List<GenericData.Record> expected = RandomAvroData.generate(schema, 100, 1990L);
+ Map<Type, Schema> typeToSchema = AvroSchemaUtil.convertTypes(schema.asStruct(), "test");
+ Schema avroSchema = typeToSchema.get(schema.asStruct());
+
+ for (GenericData.Record record : expected) {
+ byte[] serializedData = AvroEncoderUtil.encode(record, avroSchema);
+ GenericData.Record expectedRecord = AvroEncoderUtil.decode(serializedData);
+
+ // Fallback to compare the record's string, because its equals implementation will depend on the avro schema.
+ // While the avro schema will convert the 'map' type to be a list of key/value pairs for non-string keys, it
+ // would be failing to read the 'array' from a 'map'.
+ Assert.assertEquals(expectedRecord.toString(), record.toString());
+
+ byte[] serializedData2 = AvroEncoderUtil.encode(expectedRecord, avroSchema);
+ Assert.assertArrayEquals(serializedData2, serializedData);
+
+ expectedRecord = AvroEncoderUtil.decode(serializedData2);
+ Assert.assertEquals(expectedRecord.toString(), record.toString());
+ }
+ }
+}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestSerializer.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestSerializer.java
new file mode 100644
index 0000000..bec4e65
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestSerializer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+class FlinkManifestSerializer implements SimpleVersionedSerializer<ManifestFile> {
+ private static final int VERSION_NUM = 1;
+ static final FlinkManifestSerializer INSTANCE = new FlinkManifestSerializer();
+
+ @Override
+ public int getVersion() {
+ return VERSION_NUM;
+ }
+
+ @Override
+ public byte[] serialize(ManifestFile manifestFile) throws IOException {
+ Preconditions.checkNotNull(manifestFile, "ManifestFile to be serialized should not be null");
+
+ return ManifestFiles.encode(manifestFile);
+ }
+
+ @Override
+ public ManifestFile deserialize(int version, byte[] serialized) throws IOException {
+ return ManifestFiles.decode(serialized);
+ }
+}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
new file mode 100644
index 0000000..aa36c73
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+class FlinkManifestUtil {
+ private static final int ICEBERG_FORMAT_VERSION = 2;
+ private static final Long DUMMY_SNAPSHOT_ID = 0L;
+
+ private FlinkManifestUtil() {
+ }
+
+ static ManifestFile writeDataFiles(OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles)
+ throws IOException {
+ ManifestWriter<DataFile> writer = ManifestFiles.write(ICEBERG_FORMAT_VERSION, spec, outputFile, DUMMY_SNAPSHOT_ID);
+
+ try (ManifestWriter<DataFile> closeableWriter = writer) {
+ closeableWriter.addAll(dataFiles);
+ }
+
+ return writer.toManifestFile();
+ }
+
+ static List<DataFile> readDataFiles(ManifestFile manifestFile, FileIO io) throws IOException {
+ try (CloseableIterable<DataFile> dataFiles = ManifestFiles.read(manifestFile, io)) {
+ return Lists.newArrayList(dataFiles);
+ }
+ }
+
+ static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId,
+ long attemptNumber) {
+ TableOperations ops = ((HasTableOperations) table).operations();
+ return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
+ }
+}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 1734a0d..ad89061 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -19,6 +19,8 @@
package org.apache.iceberg.flink.sink;
+import java.io.IOException;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -27,8 +29,8 @@ import java.util.SortedMap;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -38,14 +40,16 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Comparators;
@@ -58,6 +62,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
private static final long serialVersionUID = 1L;
private static final long INITIAL_CHECKPOINT_ID = -1L;
+ private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class);
private static final String FLINK_JOB_ID = "flink.job-id";
@@ -78,7 +83,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// <2, <file3>>. Snapshot for checkpoint#1 interrupted because of network/disk failure etc, while we don't expect
// any data loss in iceberg table. So we keep the finished files <1, <file0, file1>> in memory and retry to commit
// iceberg table when the next checkpoint happen.
- private final NavigableMap<Long, List<DataFile>> dataFilesPerCheckpoint = Maps.newTreeMap();
+ private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
// The data files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
// 'dataFilesPerCheckpoint'.
@@ -87,6 +92,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// It will have an unique identifier for one job.
private transient String flinkJobId;
private transient Table table;
+ private transient ManifestOutputFileFactory manifestOutputFileFactory;
private transient long maxCommittedCheckpointId;
// There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by the
@@ -97,8 +103,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
"iceberg-flink-job-id", BasicTypeInfo.STRING_TYPE_INFO);
private transient ListState<String> jobIdState;
// All pending checkpoints states for this function.
- private static final ListStateDescriptor<SortedMap<Long, List<DataFile>>> STATE_DESCRIPTOR = buildStateDescriptor();
- private transient ListState<SortedMap<Long, List<DataFile>>> checkpointsState;
+ private static final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR = buildStateDescriptor();
+ private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
IcebergFilesCommitter(TableLoader tableLoader, boolean replacePartitions) {
this.tableLoader = tableLoader;
@@ -113,6 +119,10 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// Open the table loader and load the table.
this.tableLoader.open();
this.table = tableLoader.loadTable();
+
+ int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+ int attemptId = getRuntimeContext().getAttemptNumber();
+ this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
@@ -127,7 +137,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// checkpoint id from restored flink job to the current flink job.
this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId);
- NavigableMap<Long, List<DataFile>> uncommittedDataFiles = Maps
+ NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
.newTreeMap(checkpointsState.get().iterator().next())
.tailMap(maxCommittedCheckpointId, false);
if (!uncommittedDataFiles.isEmpty()) {
@@ -145,7 +155,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", table, checkpointId);
// Update the checkpoint state.
- dataFilesPerCheckpoint.put(checkpointId, ImmutableList.copyOf(dataFilesOfCurrentCheckpoint));
+ dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
// Reset the snapshot state to the latest state.
checkpointsState.clear();
@@ -174,14 +184,24 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
}
}
- private void commitUpToCheckpoint(NavigableMap<Long, List<DataFile>> dataFilesMap,
+ private void commitUpToCheckpoint(NavigableMap<Long, byte[]> manifestsMap,
String newFlinkJobId,
- long checkpointId) {
- NavigableMap<Long, List<DataFile>> pendingFileMap = dataFilesMap.headMap(checkpointId, true);
+ long checkpointId) throws IOException {
+ NavigableMap<Long, byte[]> pendingManifestMap = manifestsMap.headMap(checkpointId, true);
+ List<ManifestFile> manifestFiles = Lists.newArrayList();
List<DataFile> pendingDataFiles = Lists.newArrayList();
- for (List<DataFile> dataFiles : pendingFileMap.values()) {
- pendingDataFiles.addAll(dataFiles);
+ for (byte[] manifestData : pendingManifestMap.values()) {
+ if (Arrays.equals(EMPTY_MANIFEST_DATA, manifestData)) {
+ // Skip the empty flink manifest.
+ continue;
+ }
+
+ ManifestFile manifestFile =
+ SimpleVersionedSerialization.readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE, manifestData);
+
+ manifestFiles.add(manifestFile);
+ pendingDataFiles.addAll(FlinkManifestUtil.readDataFiles(manifestFile, table.io()));
}
if (replacePartitions) {
@@ -190,8 +210,23 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
append(pendingDataFiles, newFlinkJobId, checkpointId);
}
- // Clear the committed data files from dataFilesPerCheckpoint.
- pendingFileMap.clear();
+ pendingManifestMap.clear();
+
+ // Delete the committed manifests and clear the committed data files from dataFilesPerCheckpoint.
+ for (ManifestFile manifestFile : manifestFiles) {
+ try {
+ table.io().deleteFile(manifestFile.path());
+ } catch (Exception e) {
+ // The flink manifests cleaning failure shouldn't abort the completed checkpoint.
+ String details = MoreObjects.toStringHelper(this)
+ .add("flinkJobId", newFlinkJobId)
+ .add("checkpointId", checkpointId)
+ .add("manifestPath", manifestFile.path())
+ .toString();
+ LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}",
+ details, e);
+ }
+ }
}
private void replacePartitions(List<DataFile> dataFiles, String newFlinkJobId, long checkpointId) {
@@ -236,12 +271,27 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
}
@Override
- public void endInput() {
+ public void endInput() throws IOException {
// Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
- dataFilesPerCheckpoint.put(Long.MAX_VALUE, ImmutableList.copyOf(dataFilesOfCurrentCheckpoint));
+ long currentCheckpointId = Long.MAX_VALUE;
+ dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId));
dataFilesOfCurrentCheckpoint.clear();
- commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, Long.MAX_VALUE);
+ commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, currentCheckpointId);
+ }
+
+ /**
+ * Write all the complete data files to a newly created manifest file and return the manifest's avro serialized bytes.
+ */
+ private byte[] writeToManifest(long checkpointId) throws IOException {
+ if (dataFilesOfCurrentCheckpoint.isEmpty()) {
+ return EMPTY_MANIFEST_DATA;
+ }
+
+ OutputFile manifestOutputFile = manifestOutputFileFactory.create(checkpointId);
+ ManifestFile manifestFile =
+ FlinkManifestUtil.writeDataFiles(manifestOutputFile, table.spec(), dataFilesOfCurrentCheckpoint);
+ return SimpleVersionedSerialization.writeVersionAndSerialize(FlinkManifestSerializer.INSTANCE, manifestFile);
}
@Override
@@ -251,13 +301,11 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
}
}
- private static ListStateDescriptor<SortedMap<Long, List<DataFile>>> buildStateDescriptor() {
+ private static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor() {
Comparator<Long> longComparator = Comparators.forType(Types.LongType.get());
- // Construct a ListTypeInfo.
- ListTypeInfo<DataFile> dataFileListTypeInfo = new ListTypeInfo<>(TypeInformation.of(DataFile.class));
// Construct a SortedMapTypeInfo.
- SortedMapTypeInfo<Long, List<DataFile>> sortedMapTypeInfo = new SortedMapTypeInfo<>(
- BasicTypeInfo.LONG_TYPE_INFO, dataFileListTypeInfo, longComparator
+ SortedMapTypeInfo<Long, byte[]> sortedMapTypeInfo = new SortedMapTypeInfo<>(
+ BasicTypeInfo.LONG_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, longComparator
);
return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo);
}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
new file mode 100644
index 0000000..fca8608
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+
+class ManifestOutputFileFactory {
+ // Users could define their own flink manifests directory by setting this value in table properties.
+ static final String FLINK_MANIFEST_LOCATION = "flink.manifests.location";
+
+ private final TableOperations ops;
+ private final FileIO io;
+ private final Map<String, String> props;
+ private final String flinkJobId;
+ private final int subTaskId;
+ private final long attemptNumber;
+ private final AtomicInteger fileCount = new AtomicInteger(0);
+
+ ManifestOutputFileFactory(TableOperations ops, FileIO io, Map<String, String> props,
+ String flinkJobId, int subTaskId, long attemptNumber) {
+ this.ops = ops;
+ this.io = io;
+ this.props = props;
+ this.flinkJobId = flinkJobId;
+ this.subTaskId = subTaskId;
+ this.attemptNumber = attemptNumber;
+ }
+
+ private String generatePath(long checkpointId) {
+ return FileFormat.AVRO.addExtension(String.format("%s-%05d-%d-%d-%05d", flinkJobId, subTaskId,
+ attemptNumber, checkpointId, fileCount.incrementAndGet()));
+ }
+
+ OutputFile create(long checkpointId) {
+ String flinkManifestDir = props.get(FLINK_MANIFEST_LOCATION);
+
+ String newManifestFullPath;
+ if (Strings.isNullOrEmpty(flinkManifestDir)) {
+ // User don't specify any flink manifest directory, so just use the default metadata path.
+ newManifestFullPath = ops.metadataFileLocation(generatePath(checkpointId));
+ } else {
+ newManifestFullPath = String.format("%s/%s", stripTrailingSlash(flinkManifestDir), generatePath(checkpointId));
+ }
+
+ return io.newOutputFile(newManifestFullPath);
+ }
+
+ private static String stripTrailingSlash(String path) {
+ String result = path;
+ while (result.endsWith("/")) {
+ result = result.substring(0, result.length() - 1);
+ }
+ return result;
+ }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
new file mode 100644
index 0000000..9fd1507
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION;
+
+public class TestFlinkManifest {
+ private static final Configuration CONF = new Configuration();
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private String tablePath;
+ private Table table;
+ private final AtomicInteger dataFileCount = new AtomicInteger(0);
+
+ @Before
+ public void before() throws IOException {
+ File folder = tempFolder.newFolder();
+ String warehouse = folder.getAbsolutePath();
+
+ tablePath = warehouse.concat("/test");
+ Assert.assertTrue("Should create the table directory correctly.", new File(tablePath).mkdir());
+
+ // Construct the iceberg table.
+ table = SimpleDataUtil.createTable(tablePath, ImmutableMap.of(), false);
+ }
+
+
+ @Test
+ public void testIO() throws IOException {
+ String flinkJobId = newFlinkJobId();
+ for (long checkpointId = 1; checkpointId <= 3; checkpointId++) {
+ ManifestOutputFileFactory factory =
+ FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
+ OutputFile manifestOutputFile = factory.create(checkpointId);
+
+ List<DataFile> expectedDataFiles = generateDataFiles(10);
+ ManifestFile manifestFile = FlinkManifestUtil.writeDataFiles(manifestOutputFile, table.spec(), expectedDataFiles);
+
+ List<DataFile> actualDataFiles = FlinkManifestUtil.readDataFiles(manifestFile, table.io());
+
+ Assert.assertEquals("Size of data file list are not equal.", expectedDataFiles.size(), actualDataFiles.size());
+ for (int i = 0; i < expectedDataFiles.size(); i++) {
+ checkDataFile(expectedDataFiles.get(i), actualDataFiles.get(i));
+ }
+ }
+ }
+
+ @Test
+ public void testUserProvidedManifestLocation() throws IOException {
+ long checkpointId = 1;
+ String flinkJobId = newFlinkJobId();
+ File userProvidedFolder = tempFolder.newFolder();
+ Map<String, String> props = ImmutableMap.of(FLINK_MANIFEST_LOCATION, userProvidedFolder.getAbsolutePath() + "///");
+ ManifestOutputFileFactory factory = new ManifestOutputFileFactory(
+ ((HasTableOperations) table).operations(), table.io(), props,
+ flinkJobId, 1, 1);
+
+ OutputFile outputFile = factory.create(checkpointId);
+ List<DataFile> expectedDataFiles = generateDataFiles(5);
+ ManifestFile manifestFile = FlinkManifestUtil.writeDataFiles(outputFile, table.spec(), expectedDataFiles);
+
+ Assert.assertEquals("The newly created manifest file should be located under the user provided directory",
+ userProvidedFolder.toPath(), Paths.get(manifestFile.path()).getParent());
+
+ List<DataFile> actualDataFiles = FlinkManifestUtil.readDataFiles(manifestFile, table.io());
+
+ Assert.assertEquals("Size of data file list are not equal.", expectedDataFiles.size(), actualDataFiles.size());
+ for (int i = 0; i < expectedDataFiles.size(); i++) {
+ checkDataFile(expectedDataFiles.get(i), actualDataFiles.get(i));
+ }
+ }
+
+ @Test
+ public void testVersionedSerializer() throws IOException {
+ long checkpointId = 1;
+ String flinkJobId = newFlinkJobId();
+ ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
+ OutputFile outputFile = factory.create(checkpointId);
+
+ List<DataFile> expectedDataFiles = generateDataFiles(10);
+ ManifestFile expected = FlinkManifestUtil.writeDataFiles(outputFile, table.spec(), expectedDataFiles);
+
+ byte[] versionedSerializeData =
+ SimpleVersionedSerialization.writeVersionAndSerialize(FlinkManifestSerializer.INSTANCE, expected);
+ ManifestFile actual = SimpleVersionedSerialization
+ .readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE, versionedSerializeData);
+ checkManifestFile(expected, actual);
+
+ byte[] versionedSerializeData2 =
+ SimpleVersionedSerialization.writeVersionAndSerialize(FlinkManifestSerializer.INSTANCE, actual);
+ Assert.assertArrayEquals(versionedSerializeData, versionedSerializeData2);
+ }
+
+ private DataFile writeDataFile(String filename, List<RowData> rows) throws IOException {
+ return SimpleDataUtil.writeFile(table.schema(), table.spec(), CONF,
+ tablePath, FileFormat.PARQUET.addExtension(filename), rows);
+ }
+
+ private List<DataFile> generateDataFiles(int fileNum) throws IOException {
+ List<RowData> rowDataList = Lists.newArrayList();
+ List<DataFile> dataFiles = Lists.newArrayList();
+ for (int i = 0; i < fileNum; i++) {
+ rowDataList.add(SimpleDataUtil.createRowData(i, "a" + i));
+ dataFiles.add(writeDataFile("data-file-" + dataFileCount.incrementAndGet(), rowDataList));
+ }
+ return dataFiles;
+ }
+
+ private static String newFlinkJobId() {
+ return UUID.randomUUID().toString();
+ }
+
+ private static void checkManifestFile(ManifestFile expected, ManifestFile actual) {
+ Assert.assertEquals("Path must match", expected.path(), actual.path());
+ Assert.assertEquals("Length must match", expected.length(), actual.length());
+ Assert.assertEquals("Spec id must match", expected.partitionSpecId(), actual.partitionSpecId());
+ Assert.assertEquals("ManifestContent must match", expected.content(), actual.content());
+ Assert.assertEquals("SequenceNumber must match", expected.sequenceNumber(), actual.sequenceNumber());
+ Assert.assertEquals("MinSequenceNumber must match", expected.minSequenceNumber(), actual.minSequenceNumber());
+ Assert.assertEquals("Snapshot id must match", expected.snapshotId(), actual.snapshotId());
+ Assert.assertEquals("Added files flag must match", expected.hasAddedFiles(), actual.hasAddedFiles());
+ Assert.assertEquals("Added files count must match", expected.addedFilesCount(), actual.addedFilesCount());
+ Assert.assertEquals("Added rows count must match", expected.addedRowsCount(), actual.addedRowsCount());
+ Assert.assertEquals("Existing files flag must match", expected.hasExistingFiles(), actual.hasExistingFiles());
+ Assert.assertEquals("Existing files count must match", expected.existingFilesCount(), actual.existingFilesCount());
+ Assert.assertEquals("Existing rows count must match", expected.existingRowsCount(), actual.existingRowsCount());
+ Assert.assertEquals("Deleted files flag must match", expected.hasDeletedFiles(), actual.hasDeletedFiles());
+ Assert.assertEquals("Deleted files count must match", expected.deletedFilesCount(), actual.deletedFilesCount());
+ Assert.assertEquals("Deleted rows count must match", expected.deletedRowsCount(), actual.deletedRowsCount());
+ Assert.assertEquals("PartitionFieldSummary must match", expected.partitions(), actual.partitions());
+ }
+
+ static void checkDataFile(DataFile expected, DataFile actual) {
+ if (expected == actual) {
+ return;
+ }
+ Assert.assertTrue("Shouldn't have null DataFile.", expected != null && actual != null);
+ Assert.assertEquals("SpecId", expected.specId(), actual.specId());
+ Assert.assertEquals("Content", expected.content(), actual.content());
+ Assert.assertEquals("Path", expected.path(), actual.path());
+ Assert.assertEquals("Format", expected.format(), actual.format());
+ Assert.assertEquals("Partition", expected.partition(), actual.partition());
+ Assert.assertEquals("Record count", expected.recordCount(), actual.recordCount());
+ Assert.assertEquals("File size in bytes", expected.fileSizeInBytes(), actual.fileSizeInBytes());
+ Assert.assertEquals("Column sizes", expected.columnSizes(), actual.columnSizes());
+ Assert.assertEquals("Value counts", expected.valueCounts(), actual.valueCounts());
+ Assert.assertEquals("Null value counts", expected.nullValueCounts(), actual.nullValueCounts());
+ Assert.assertEquals("Lower bounds", expected.lowerBounds(), actual.lowerBounds());
+ Assert.assertEquals("Upper bounds", expected.upperBounds(), actual.upperBounds());
+ Assert.assertEquals("Key metadata", expected.keyMetadata(), actual.keyMetadata());
+ Assert.assertEquals("Split offsets", expected.splitOffsets(), actual.splitOffsets());
+ Assert.assertNull(actual.equalityFieldIds());
+ Assert.assertNull(expected.equalityFieldIds());
+ }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index 2ce7c36..583b6f1 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -21,9 +21,12 @@ package org.apache.iceberg.flink.sink;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
@@ -40,8 +43,10 @@ import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestContent;
+import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -55,6 +60,9 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION;
+
@RunWith(Parameterized.class)
public class TestIcebergFilesCommitter {
private static final Configuration CONF = new Configuration();
@@ -64,6 +72,7 @@ public class TestIcebergFilesCommitter {
private String tablePath;
private Table table;
+ private File flinkManifestFolder;
private final FileFormat format;
@@ -83,13 +92,19 @@ public class TestIcebergFilesCommitter {
@Before
public void before() throws IOException {
File folder = tempFolder.newFolder();
+ flinkManifestFolder = tempFolder.newFolder();
String warehouse = folder.getAbsolutePath();
tablePath = warehouse.concat("/test");
Assert.assertTrue("Should create the table directory correctly.", new File(tablePath).mkdir());
// Construct the iceberg table.
- Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+ Map<String, String> props = ImmutableMap.of(
+ // file format.
+ DEFAULT_FILE_FORMAT, format.name(),
+ // temporary flink manifests location.
+ FLINK_MANIFEST_LOCATION, flinkManifestFolder.getAbsolutePath()
+ );
table = SimpleDataUtil.createTable(tablePath, props, false);
}
@@ -110,7 +125,11 @@ public class TestIcebergFilesCommitter {
// failover won't fail.
for (int i = 1; i <= 3; i++) {
harness.snapshot(++checkpointId, ++timestamp);
+ assertFlinkManifests(0);
+
harness.notifyOfCompletedCheckpoint(checkpointId);
+ assertFlinkManifests(0);
+
assertSnapshotSize(i);
assertMaxCommittedCheckpointId(jobId, checkpointId);
}
@@ -142,8 +161,10 @@ public class TestIcebergFilesCommitter {
rows.add(rowData);
harness.snapshot(i, ++timestamp);
+ assertFlinkManifests(1);
harness.notifyOfCompletedCheckpoint(i);
+ assertFlinkManifests(0);
SimpleDataUtil.assertTableRows(tablePath, ImmutableList.copyOf(rows));
assertSnapshotSize(i);
@@ -177,6 +198,7 @@ public class TestIcebergFilesCommitter {
// 1. snapshotState for checkpoint#1
long firstCheckpointId = 1;
harness.snapshot(firstCheckpointId, ++timestamp);
+ assertFlinkManifests(1);
RowData row2 = SimpleDataUtil.createRowData(2, "world");
DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2));
@@ -186,16 +208,19 @@ public class TestIcebergFilesCommitter {
// 2. snapshotState for checkpoint#2
long secondCheckpointId = 2;
harness.snapshot(secondCheckpointId, ++timestamp);
+ assertFlinkManifests(2);
// 3. notifyCheckpointComplete for checkpoint#1
harness.notifyOfCompletedCheckpoint(firstCheckpointId);
SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1));
assertMaxCommittedCheckpointId(jobId, firstCheckpointId);
+ assertFlinkManifests(1);
// 4. notifyCheckpointComplete for checkpoint#2
harness.notifyOfCompletedCheckpoint(secondCheckpointId);
SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2));
assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+ assertFlinkManifests(0);
}
}
@@ -224,6 +249,7 @@ public class TestIcebergFilesCommitter {
// 1. snapshotState for checkpoint#1
long firstCheckpointId = 1;
harness.snapshot(firstCheckpointId, ++timestamp);
+ assertFlinkManifests(1);
RowData row2 = SimpleDataUtil.createRowData(2, "world");
DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2));
@@ -233,16 +259,19 @@ public class TestIcebergFilesCommitter {
// 2. snapshotState for checkpoint#2
long secondCheckpointId = 2;
harness.snapshot(secondCheckpointId, ++timestamp);
+ assertFlinkManifests(2);
// 3. notifyCheckpointComplete for checkpoint#2
harness.notifyOfCompletedCheckpoint(secondCheckpointId);
SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2));
assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+ assertFlinkManifests(0);
// 4. notifyCheckpointComplete for checkpoint#1
harness.notifyOfCompletedCheckpoint(firstCheckpointId);
SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2));
assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+ assertFlinkManifests(0);
}
}
@@ -267,7 +296,11 @@ public class TestIcebergFilesCommitter {
harness.processElement(dataFile1, ++timestamp);
snapshot = harness.snapshot(++checkpointId, ++timestamp);
+ assertFlinkManifests(1);
+
harness.notifyOfCompletedCheckpoint(checkpointId);
+ assertFlinkManifests(0);
+
SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row));
assertSnapshotSize(1);
assertMaxCommittedCheckpointId(jobId, checkpointId);
@@ -289,7 +322,11 @@ public class TestIcebergFilesCommitter {
harness.processElement(dataFile, ++timestamp);
harness.snapshot(++checkpointId, ++timestamp);
+ assertFlinkManifests(1);
+
harness.notifyOfCompletedCheckpoint(checkpointId);
+ assertFlinkManifests(0);
+
SimpleDataUtil.assertTableRows(tablePath, expectedRows);
assertSnapshotSize(2);
assertMaxCommittedCheckpointId(jobId, checkpointId);
@@ -320,6 +357,7 @@ public class TestIcebergFilesCommitter {
snapshot = harness.snapshot(++checkpointId, ++timestamp);
SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of());
assertMaxCommittedCheckpointId(jobId, -1L);
+ assertFlinkManifests(1);
}
try (OneInputStreamOperatorTestHarness<DataFile, Void> harness = createStreamSink(jobId)) {
@@ -327,11 +365,19 @@ public class TestIcebergFilesCommitter {
harness.initializeState(snapshot);
harness.open();
+ // All flink manifests should be cleaned because it has committed the unfinished iceberg transaction.
+ assertFlinkManifests(0);
+
SimpleDataUtil.assertTableRows(tablePath, expectedRows);
assertMaxCommittedCheckpointId(jobId, checkpointId);
harness.snapshot(++checkpointId, ++timestamp);
+ // Did not write any new record, so it won't generate new manifest.
+ assertFlinkManifests(0);
+
harness.notifyOfCompletedCheckpoint(checkpointId);
+ assertFlinkManifests(0);
+
SimpleDataUtil.assertTableRows(tablePath, expectedRows);
assertSnapshotSize(2);
assertMaxCommittedCheckpointId(jobId, checkpointId);
@@ -342,6 +388,7 @@ public class TestIcebergFilesCommitter {
harness.processElement(dataFile, ++timestamp);
snapshot = harness.snapshot(++checkpointId, ++timestamp);
+ assertFlinkManifests(1);
}
// Redeploying flink job from external checkpoint.
@@ -351,6 +398,9 @@ public class TestIcebergFilesCommitter {
harness.initializeState(snapshot);
harness.open();
+ // All flink manifests should be cleaned because it has committed the unfinished iceberg transaction.
+ assertFlinkManifests(0);
+
assertMaxCommittedCheckpointId(newJobId, -1);
assertMaxCommittedCheckpointId(jobId, checkpointId);
SimpleDataUtil.assertTableRows(tablePath, expectedRows);
@@ -362,7 +412,11 @@ public class TestIcebergFilesCommitter {
harness.processElement(dataFile, ++timestamp);
harness.snapshot(++checkpointId, ++timestamp);
+ assertFlinkManifests(1);
+
harness.notifyOfCompletedCheckpoint(checkpointId);
+ assertFlinkManifests(0);
+
SimpleDataUtil.assertTableRows(tablePath, expectedRows);
assertSnapshotSize(4);
assertMaxCommittedCheckpointId(newJobId, checkpointId);
@@ -391,8 +445,11 @@ public class TestIcebergFilesCommitter {
DataFile dataFile = writeDataFile(String.format("data-%d", i), rows);
harness.processElement(dataFile, ++timestamp);
harness.snapshot(++checkpointId, ++timestamp);
+ assertFlinkManifests(1);
harness.notifyOfCompletedCheckpoint(checkpointId);
+ assertFlinkManifests(0);
+
SimpleDataUtil.assertTableRows(tablePath, tableRows);
assertSnapshotSize(i);
assertMaxCommittedCheckpointId(oldJobId, checkpointId);
@@ -417,8 +474,10 @@ public class TestIcebergFilesCommitter {
DataFile dataFile = writeDataFile("data-new-1", rows);
harness.processElement(dataFile, ++timestamp);
harness.snapshot(++checkpointId, ++timestamp);
+ assertFlinkManifests(1);
harness.notifyOfCompletedCheckpoint(checkpointId);
+ assertFlinkManifests(0);
SimpleDataUtil.assertTableRows(tablePath, tableRows);
assertSnapshotSize(4);
assertMaxCommittedCheckpointId(newJobId, checkpointId);
@@ -448,8 +507,10 @@ public class TestIcebergFilesCommitter {
DataFile dataFile = writeDataFile(String.format("data-%d", i), rows);
harness.processElement(dataFile, ++timestamp);
harness.snapshot(checkpointId + 1, ++timestamp);
+ assertFlinkManifests(1);
harness.notifyOfCompletedCheckpoint(checkpointId + 1);
+ assertFlinkManifests(0);
SimpleDataUtil.assertTableRows(tablePath, tableRows);
assertSnapshotSize(i + 1);
assertMaxCommittedCheckpointId(jobId, checkpointId + 1);
@@ -464,6 +525,7 @@ public class TestIcebergFilesCommitter {
harness.setup();
harness.open();
+ assertFlinkManifests(0);
assertSnapshotSize(0);
assertMaxCommittedCheckpointId(jobId, -1L);
@@ -473,12 +535,65 @@ public class TestIcebergFilesCommitter {
harness.processElement(dataFile, 1);
((BoundedOneInput) harness.getOneInputOperator()).endInput();
+ assertFlinkManifests(0);
SimpleDataUtil.assertTableRows(tablePath, tableRows);
assertSnapshotSize(1);
assertMaxCommittedCheckpointId(jobId, Long.MAX_VALUE);
}
}
+ @Test
+ public void testFlinkManifests() throws Exception {
+ long timestamp = 0;
+ final long checkpoint = 10;
+
+ JobID jobId = new JobID();
+ try (OneInputStreamOperatorTestHarness<DataFile, Void> harness = createStreamSink(jobId)) {
+ harness.setup();
+ harness.open();
+
+ assertMaxCommittedCheckpointId(jobId, -1L);
+
+ RowData row1 = SimpleDataUtil.createRowData(1, "hello");
+ DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
+
+ harness.processElement(dataFile1, ++timestamp);
+ assertMaxCommittedCheckpointId(jobId, -1L);
+
+ // 1. snapshotState for checkpoint#1
+ harness.snapshot(checkpoint, ++timestamp);
+ List<Path> manifestPaths = assertFlinkManifests(1);
+ Path manifestPath = manifestPaths.get(0);
+ Assert.assertEquals("File name should have the expected pattern.",
+ String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString());
+
+ // 2. Read the data files from manifests and assert.
+ List<DataFile> dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());
+ Assert.assertEquals(1, dataFiles.size());
+ TestFlinkManifest.checkDataFile(dataFile1, dataFiles.get(0));
+
+ // 3. notifyCheckpointComplete for checkpoint#1
+ harness.notifyOfCompletedCheckpoint(checkpoint);
+ SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1));
+ assertMaxCommittedCheckpointId(jobId, checkpoint);
+ assertFlinkManifests(0);
+ }
+ }
+
+ private ManifestFile createTestingManifestFile(Path manifestPath) {
+ return new GenericManifestFile(manifestPath.toAbsolutePath().toString(), manifestPath.toFile().length(), 0,
+ ManifestContent.DATA, 0, 0, 0L, 0, 0, 0, 0, 0, 0, null);
+ }
+
+ private List<Path> assertFlinkManifests(int expectedCount) throws IOException {
+ List<Path> manifests = Files.list(flinkManifestFolder.toPath())
+ .filter(p -> !p.toString().endsWith(".crc"))
+ .collect(Collectors.toList());
+ Assert.assertEquals(String.format("Expected %s flink manifests, but the list is: %s", expectedCount, manifests),
+ expectedCount, manifests.size());
+ return manifests;
+ }
+
private DataFile writeDataFile(String filename, List<RowData> rows) throws IOException {
return SimpleDataUtil.writeFile(table.schema(), table.spec(), CONF, tablePath, format.addExtension(filename), rows);
}