You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/10/09 14:18:40 UTC

[GitHub] [iceberg] openinx commented on a change in pull request #1477: Flink: maintain the complete data files into manifest before checkpoint finished.

openinx commented on a change in pull request #1477:
URL: https://github.com/apache/iceberg/pull/1477#discussion_r502452643



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -240,12 +256,22 @@ public void processElement(StreamRecord<DataFile> element) {
   }
 
   @Override
-  public void endInput() {
+  public void endInput() throws IOException {
     // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
-    dataFilesPerCheckpoint.put(Long.MAX_VALUE, ImmutableList.copyOf(dataFilesOfCurrentCheckpoint));
+    long currentCheckpointId = Long.MAX_VALUE;
+    dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId));
     dataFilesOfCurrentCheckpoint.clear();
 
-    commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, Long.MAX_VALUE);
+    commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, currentCheckpointId);
+  }
+
+  /**
+   * Write all the complete data files to a newly created manifest file and return the manifest's avro serialized bytes.
+   */
+  private Byte[] writeToManifest(long checkpointId) throws IOException {

Review comment:
       I thought that flink state backend could only support `Byte[]`  type,  but after skimmed the flink code again, I found that we have `PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO`  to support `byte[]`.  So here we should use the `byte[]` directly.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -178,14 +185,20 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     }
   }
 
-  private void commitUpToCheckpoint(NavigableMap<Long, List<DataFile>> dataFilesMap,
+  private void commitUpToCheckpoint(NavigableMap<Long, Byte[]> manifestsMap,
                                     String newFlinkJobId,
-                                    long checkpointId) {
-    NavigableMap<Long, List<DataFile>> pendingFileMap = dataFilesMap.headMap(checkpointId, true);
+                                    long checkpointId) throws IOException {
+    NavigableMap<Long, Byte[]> pendingManifestMap = manifestsMap.headMap(checkpointId, true);
+
+    List<ManifestFile> manifestFiles = Lists.newArrayList();
+    for (Byte[] manifestData : pendingManifestMap.values()) {
+      ManifestFile manifestFile = ManifestFiles.decode(ArrayUtils.toPrimitive(manifestData));
+      manifestFiles.add(manifestFile);
+    }
 
     List<DataFile> pendingDataFiles = Lists.newArrayList();
-    for (List<DataFile> dataFiles : pendingFileMap.values()) {
-      pendingDataFiles.addAll(dataFiles);
+    for (ManifestFile manifestFile : manifestFiles) {
+      pendingDataFiles.addAll(FlinkManifest.read(manifestFile, table.io()));

Review comment:
       I read all data files from the written manifests, because there may be one of the operations:  `ReplacePartitions` or `AppendFiles`.  For `AppendFiles` , it's true that we could just use `appendManifest` if it's in the metadata location, while for `ReplacePartitions`, it does not have an `appendManifest` so we still have to read all DataFiles and call `ReplacePartitions#addFile` one by one.  So here I use the unified method to handle both cases. 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -149,7 +156,7 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
     LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", table, checkpointId);
 
     // Update the checkpoint state.
-    dataFilesPerCheckpoint.put(checkpointId, ImmutableList.copyOf(dataFilesOfCurrentCheckpoint));
+    dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));

Review comment:
       Make sense !




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org