You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/10/25 23:16:08 UTC

[iceberg] branch master updated: Flink: Fix flaky test TestFlinkSink#testHashDistributeMode (#3365)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e22f36  Flink: Fix flaky test TestFlinkSink#testHashDistributeMode (#3365)
8e22f36 is described below

commit 8e22f3657af709e711bbaa9b0badecd58a12ae9d
Author: openinx <op...@gmail.com>
AuthorDate: Tue Oct 26 07:15:48 2021 +0800

    Flink: Fix flaky test TestFlinkSink#testHashDistributeMode (#3365)
---
 .../org/apache/iceberg/flink/SimpleDataUtil.java   | 36 ++++++++++++++--------
 1 file changed, 24 insertions(+), 12 deletions(-)

diff --git a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index 7e7f2c6..4cfccfb 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -38,12 +38,11 @@ import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.ManifestFile;
-import org.apache.iceberg.ManifestFiles;
-import org.apache.iceberg.ManifestReader;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.IcebergGenerics;
 import org.apache.iceberg.data.Record;
@@ -57,7 +56,9 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.FileAppenderFactory;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
@@ -267,17 +268,28 @@ public class SimpleDataUtil {
 
   public static Map<Long, List<DataFile>> snapshotToDataFiles(Table table) throws IOException {
     table.refresh();
+
     Map<Long, List<DataFile>> result = Maps.newHashMap();
-    List<ManifestFile> manifestFiles = table.currentSnapshot().dataManifests();
-    for (ManifestFile manifestFile : manifestFiles) {
-      try (ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, table.io())) {
-        List<DataFile> dataFiles = Lists.newArrayList(reader);
-        if (result.containsKey(manifestFile.snapshotId())) {
-          result.get(manifestFile.snapshotId()).addAll(dataFiles);
-        } else {
-          result.put(manifestFile.snapshotId(), dataFiles);
-        }
+    Snapshot current = table.currentSnapshot();
+    while (current != null) {
+      TableScan tableScan = table.newScan();
+      if (current.parentId() != null) {
+        // Collect the data files that was added only in current snapshot.
+        tableScan.appendsBetween(current.parentId(), current.snapshotId());
+      } else {
+        // Collect the data files that was added in the oldest snapshot.
+        tableScan.useSnapshot(current.snapshotId());
+      }
+      try (CloseableIterable<FileScanTask> scanTasks = tableScan.planFiles()) {
+        result.put(current.snapshotId(), ImmutableList.copyOf(Iterables.transform(scanTasks, FileScanTask::file)));
+      }
+
+      // Continue to traverse the parent snapshot if exists.
+      if (current.parentId() == null) {
+        break;
       }
+      // Iterate to the parent snapshot.
+      current = table.snapshot(current.parentId());
     }
     return result;
   }