You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2022/02/10 22:35:54 UTC
[iceberg] 03/03: Flink: Ensure temp manifest names are unique across tasks (#3986)
This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 0d9c63e5101f11463b6a5d6b2ec94dc9cec8dca8
Author: Peidian li <38...@users.noreply.github.com>
AuthorDate: Sat Feb 5 01:38:39 2022 +0800
Flink: Ensure temp manifest names are unique across tasks (#3986)
---
.../apache/iceberg/flink/sink/FlinkManifestUtil.java | 7 ++++---
.../iceberg/flink/sink/IcebergFilesCommitter.java | 4 +++-
.../iceberg/flink/sink/ManifestOutputFileFactory.java | 8 +++++---
.../apache/iceberg/flink/sink/TestFlinkManifest.java | 18 ++++++++++++++----
.../iceberg/flink/sink/TestIcebergFilesCommitter.java | 8 ++++++--
5 files changed, 32 insertions(+), 13 deletions(-)
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index b00018b..d208593 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -61,10 +61,11 @@ class FlinkManifestUtil {
}
}
- static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId,
- long attemptNumber) {
+ static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, String operatorUniqueId,
+ int subTaskId, long attemptNumber) {
TableOperations ops = ((HasTableOperations) table).operations();
- return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
+ return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, operatorUniqueId,
+ subTaskId, attemptNumber);
}
static DeltaManifests writeCompletedFiles(WriteResult result,
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 8f8bdad..beffff6 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -129,7 +129,9 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
int attemptId = getRuntimeContext().getAttemptNumber();
- this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
+ String operatorUniqueId = getRuntimeContext().getOperatorUniqueID();
+ this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorUniqueId,
+ subTaskId, attemptId);
this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
index fca8608..b7d575b 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
@@ -35,23 +35,25 @@ class ManifestOutputFileFactory {
private final FileIO io;
private final Map<String, String> props;
private final String flinkJobId;
+ private final String operatorUniqueId;
private final int subTaskId;
private final long attemptNumber;
private final AtomicInteger fileCount = new AtomicInteger(0);
ManifestOutputFileFactory(TableOperations ops, FileIO io, Map<String, String> props,
- String flinkJobId, int subTaskId, long attemptNumber) {
+ String flinkJobId, String operatorUniqueId, int subTaskId, long attemptNumber) {
this.ops = ops;
this.io = io;
this.props = props;
this.flinkJobId = flinkJobId;
+ this.operatorUniqueId = operatorUniqueId;
this.subTaskId = subTaskId;
this.attemptNumber = attemptNumber;
}
private String generatePath(long checkpointId) {
- return FileFormat.AVRO.addExtension(String.format("%s-%05d-%d-%d-%05d", flinkJobId, subTaskId,
- attemptNumber, checkpointId, fileCount.incrementAndGet()));
+ return FileFormat.AVRO.addExtension(String.format("%s-%s-%05d-%d-%d-%05d", flinkJobId, operatorUniqueId,
+ subTaskId, attemptNumber, checkpointId, fileCount.incrementAndGet()));
}
OutputFile create(long checkpointId) {
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index c1538bc..4a47656 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -87,9 +87,10 @@ public class TestFlinkManifest {
@Test
public void testIO() throws IOException {
String flinkJobId = newFlinkJobId();
+ String operatorId = newOperatorUniqueId();
for (long checkpointId = 1; checkpointId <= 3; checkpointId++) {
ManifestOutputFileFactory factory =
- FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
+ FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId, 1, 1);
final long curCkpId = checkpointId;
List<DataFile> dataFiles = generateDataFiles(10);
@@ -122,11 +123,12 @@ public class TestFlinkManifest {
public void testUserProvidedManifestLocation() throws IOException {
long checkpointId = 1;
String flinkJobId = newFlinkJobId();
+ String operatorId = newOperatorUniqueId();
File userProvidedFolder = tempFolder.newFolder();
Map<String, String> props = ImmutableMap.of(FLINK_MANIFEST_LOCATION, userProvidedFolder.getAbsolutePath() + "///");
ManifestOutputFileFactory factory = new ManifestOutputFileFactory(
((HasTableOperations) table).operations(), table.io(), props,
- flinkJobId, 1, 1);
+ flinkJobId, operatorId, 1, 1);
List<DataFile> dataFiles = generateDataFiles(5);
DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(
@@ -156,7 +158,9 @@ public class TestFlinkManifest {
public void testVersionedSerializer() throws IOException {
long checkpointId = 1;
String flinkJobId = newFlinkJobId();
- ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
+ String operatorId = newOperatorUniqueId();
+ ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId,
+ 1, 1);
List<DataFile> dataFiles = generateDataFiles(10);
List<DeleteFile> eqDeleteFiles = generateEqDeleteFiles(10);
@@ -186,7 +190,9 @@ public class TestFlinkManifest {
// The v2 deserializer should be able to deserialize the v1 binary.
long checkpointId = 1;
String flinkJobId = newFlinkJobId();
- ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
+ String operatorId = newOperatorUniqueId();
+ ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId,
+ 1, 1);
List<DataFile> dataFiles = generateDataFiles(10);
ManifestFile manifest = FlinkManifestUtil.writeDataFiles(factory.create(checkpointId), table.spec(), dataFiles);
@@ -271,4 +277,8 @@ public class TestFlinkManifest {
private static String newFlinkJobId() {
return UUID.randomUUID().toString();
}
+
+ private static String newOperatorUniqueId() {
+ return UUID.randomUUID().toString();
+ }
}
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index 135fa84..9c23d8a 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -599,8 +599,10 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.snapshot(checkpoint, ++timestamp);
List<Path> manifestPaths = assertFlinkManifests(1);
Path manifestPath = manifestPaths.get(0);
+ String operatorId = harness.getOneInputOperator().getOperatorID().toString();
Assert.assertEquals("File name should have the expected pattern.",
- String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString());
+ String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1),
+ manifestPath.getFileName().toString());
// 2. Read the data files from manifests and assert.
List<DataFile> dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());
@@ -640,8 +642,10 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.snapshot(checkpoint, ++timestamp);
List<Path> manifestPaths = assertFlinkManifests(1);
Path manifestPath = manifestPaths.get(0);
+ String operatorId = harness.getOneInputOperator().getOperatorID().toString();
Assert.assertEquals("File name should have the expected pattern.",
- String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString());
+ String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1),
+ manifestPath.getFileName().toString());
// 2. Read the data files from manifests and assert.
List<DataFile> dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());