You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2022/02/10 22:35:54 UTC

[iceberg] 03/03: Flink: Ensure temp manifest names are unique across tasks (#3986)

This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 0d9c63e5101f11463b6a5d6b2ec94dc9cec8dca8
Author: Peidian li <38...@users.noreply.github.com>
AuthorDate: Sat Feb 5 01:38:39 2022 +0800

    Flink: Ensure temp manifest names are unique across tasks (#3986)
---
 .../apache/iceberg/flink/sink/FlinkManifestUtil.java   |  7 ++++---
 .../iceberg/flink/sink/IcebergFilesCommitter.java      |  4 +++-
 .../iceberg/flink/sink/ManifestOutputFileFactory.java  |  8 +++++---
 .../apache/iceberg/flink/sink/TestFlinkManifest.java   | 18 ++++++++++++++----
 .../iceberg/flink/sink/TestIcebergFilesCommitter.java  |  8 ++++++--
 5 files changed, 32 insertions(+), 13 deletions(-)

diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index b00018b..d208593 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -61,10 +61,11 @@ class FlinkManifestUtil {
     }
   }
 
-  static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId,
-                                                           long attemptNumber) {
+  static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, String operatorUniqueId,
+                                                           int subTaskId, long attemptNumber) {
     TableOperations ops = ((HasTableOperations) table).operations();
-    return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
+    return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, operatorUniqueId,
+        subTaskId, attemptNumber);
   }
 
   static DeltaManifests writeCompletedFiles(WriteResult result,
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 8f8bdad..beffff6 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -129,7 +129,9 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
 
     int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
     int attemptId = getRuntimeContext().getAttemptNumber();
-    this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
+    String operatorUniqueId = getRuntimeContext().getOperatorUniqueID();
+    this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorUniqueId,
+        subTaskId, attemptId);
     this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
index fca8608..b7d575b 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
@@ -35,23 +35,25 @@ class ManifestOutputFileFactory {
   private final FileIO io;
   private final Map<String, String> props;
   private final String flinkJobId;
+  private final String operatorUniqueId;
   private final int subTaskId;
   private final long attemptNumber;
   private final AtomicInteger fileCount = new AtomicInteger(0);
 
   ManifestOutputFileFactory(TableOperations ops, FileIO io, Map<String, String> props,
-                            String flinkJobId, int subTaskId, long attemptNumber) {
+                            String flinkJobId,  String operatorUniqueId, int subTaskId, long attemptNumber) {
     this.ops = ops;
     this.io = io;
     this.props = props;
     this.flinkJobId = flinkJobId;
+    this.operatorUniqueId = operatorUniqueId;
     this.subTaskId = subTaskId;
     this.attemptNumber = attemptNumber;
   }
 
   private String generatePath(long checkpointId) {
-    return FileFormat.AVRO.addExtension(String.format("%s-%05d-%d-%d-%05d", flinkJobId, subTaskId,
-        attemptNumber, checkpointId, fileCount.incrementAndGet()));
+    return FileFormat.AVRO.addExtension(String.format("%s-%s-%05d-%d-%d-%05d", flinkJobId, operatorUniqueId,
+        subTaskId, attemptNumber, checkpointId, fileCount.incrementAndGet()));
   }
 
   OutputFile create(long checkpointId) {
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index c1538bc..4a47656 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -87,9 +87,10 @@ public class TestFlinkManifest {
   @Test
   public void testIO() throws IOException {
     String flinkJobId = newFlinkJobId();
+    String operatorId = newOperatorUniqueId();
     for (long checkpointId = 1; checkpointId <= 3; checkpointId++) {
       ManifestOutputFileFactory factory =
-          FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
+          FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId, 1, 1);
       final long curCkpId = checkpointId;
 
       List<DataFile> dataFiles = generateDataFiles(10);
@@ -122,11 +123,12 @@ public class TestFlinkManifest {
   public void testUserProvidedManifestLocation() throws IOException {
     long checkpointId = 1;
     String flinkJobId = newFlinkJobId();
+    String operatorId = newOperatorUniqueId();
     File userProvidedFolder = tempFolder.newFolder();
     Map<String, String> props = ImmutableMap.of(FLINK_MANIFEST_LOCATION, userProvidedFolder.getAbsolutePath() + "///");
     ManifestOutputFileFactory factory = new ManifestOutputFileFactory(
         ((HasTableOperations) table).operations(), table.io(), props,
-        flinkJobId, 1, 1);
+        flinkJobId, operatorId, 1, 1);
 
     List<DataFile> dataFiles = generateDataFiles(5);
     DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(
@@ -156,7 +158,9 @@ public class TestFlinkManifest {
   public void testVersionedSerializer() throws IOException {
     long checkpointId = 1;
     String flinkJobId = newFlinkJobId();
-    ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
+    String operatorId = newOperatorUniqueId();
+    ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId,
+        1, 1);
 
     List<DataFile> dataFiles = generateDataFiles(10);
     List<DeleteFile> eqDeleteFiles = generateEqDeleteFiles(10);
@@ -186,7 +190,9 @@ public class TestFlinkManifest {
     // The v2 deserializer should be able to deserialize the v1 binary.
     long checkpointId = 1;
     String flinkJobId = newFlinkJobId();
-    ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
+    String operatorId = newOperatorUniqueId();
+    ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId,
+        1, 1);
 
     List<DataFile> dataFiles = generateDataFiles(10);
     ManifestFile manifest = FlinkManifestUtil.writeDataFiles(factory.create(checkpointId), table.spec(), dataFiles);
@@ -271,4 +277,8 @@ public class TestFlinkManifest {
   private static String newFlinkJobId() {
     return UUID.randomUUID().toString();
   }
+
+  private static String newOperatorUniqueId() {
+    return UUID.randomUUID().toString();
+  }
 }
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index 135fa84..9c23d8a 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -599,8 +599,10 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness.snapshot(checkpoint, ++timestamp);
       List<Path> manifestPaths = assertFlinkManifests(1);
       Path manifestPath = manifestPaths.get(0);
+      String operatorId = harness.getOneInputOperator().getOperatorID().toString();
       Assert.assertEquals("File name should have the expected pattern.",
-          String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString());
+          String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1),
+          manifestPath.getFileName().toString());
 
       // 2. Read the data files from manifests and assert.
       List<DataFile> dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());
@@ -640,8 +642,10 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness.snapshot(checkpoint, ++timestamp);
       List<Path> manifestPaths = assertFlinkManifests(1);
       Path manifestPath = manifestPaths.get(0);
+      String operatorId = harness.getOneInputOperator().getOperatorID().toString();
       Assert.assertEquals("File name should have the expected pattern.",
-          String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString());
+          String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1),
+          manifestPath.getFileName().toString());
 
       // 2. Read the data files from manifests and assert.
       List<DataFile> dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());