You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/08/02 19:13:13 UTC

[incubator-iceberg] branch master updated: Improve filtering in Snapshot#addedFiles (#341)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f71e9de  Improve filtering in Snapshot#addedFiles (#341)
f71e9de is described below

commit f71e9de1a9a340725dd0f33d994b41bb8ba9136d
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Fri Aug 2 12:13:08 2019 -0700

    Improve filtering in Snapshot#addedFiles (#341)
---
 .../main/java/org/apache/iceberg/BaseSnapshot.java | 49 ++++++++++++----------
 .../java/org/apache/iceberg/ManifestReader.java    | 24 ++++++-----
 2 files changed, 41 insertions(+), 32 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
index c2e59af..ff44a9c 100644
--- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
@@ -20,6 +20,8 @@
 package org.apache.iceberg;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import java.io.IOException;
@@ -152,32 +154,35 @@ class BaseSnapshot implements Snapshot {
   }
 
   private void cacheChanges() {
-    List<DataFile> adds = Lists.newArrayList();
-    List<DataFile> deletes = Lists.newArrayList();
-
-    // accumulate adds and deletes from all manifests.
-    // because manifests can be reused in newer snapshots, filter the changes by snapshot id.
-    for (String manifest : Iterables.transform(manifests(), ManifestFile::path)) {
-      try (ManifestReader reader = ManifestReader.read(
-          ops.io().newInputFile(manifest),
-          ops.current()::spec)) {
-        for (ManifestEntry add : reader.addedFiles()) {
-          if (add.snapshotId() == snapshotId) {
-            adds.add(add.file().copyWithoutStats());
-          }
+    ImmutableList.Builder<DataFile> adds = ImmutableList.builder();
+    ImmutableList.Builder<DataFile> deletes = ImmutableList.builder();
+
+    // read only manifests that were created by this snapshot
+    Iterable<ManifestFile> changedManifests = Iterables.filter(manifests(),
+        manifest -> Objects.equal(manifest.snapshotId(), snapshotId));
+    try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops, changedManifests)
+        .ignoreExisting()
+        .select(ManifestReader.CHANGE_COLUNNS)
+        .entries()) {
+      for (ManifestEntry entry : entries) {
+        switch (entry.status()) {
+          case ADDED:
+            adds.add(entry.file().copyWithoutStats());
+            break;
+          case DELETED:
+            deletes.add(entry.file().copyWithoutStats());
+            break;
+          default:
+            throw new IllegalStateException(
+                "Unexpected entry status, not added or deleted: " + entry);
         }
-        for (ManifestEntry delete : reader.deletedFiles()) {
-          if (delete.snapshotId() == snapshotId) {
-            deletes.add(delete.file().copyWithoutStats());
-          }
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(e, "Failed to close reader while caching changes");
       }
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to close entries while caching changes");
     }
 
-    this.cachedAdds = adds;
-    this.cachedDeletes = deletes;
+    this.cachedAdds = adds.build();
+    this.cachedDeletes = deletes.build();
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index dceac87..c634d29 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -50,7 +50,7 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
   private static final Logger LOG = LoggerFactory.getLogger(ManifestReader.class);
 
   private static final List<String> ALL_COLUMNS = Lists.newArrayList("*");
-  private static final List<String> CHANGE_COLUNNS = Lists.newArrayList(
+  static final List<String> CHANGE_COLUNNS = Lists.newArrayList(
       "file_path", "file_format", "partition", "record_count", "file_size_in_bytes");
 
   // Visible for testing
@@ -161,16 +161,20 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
     List<ManifestEntry> adds = Lists.newArrayList();
     List<ManifestEntry> deletes = Lists.newArrayList();
 
-    for (ManifestEntry entry : entries(fileSchema.select(CHANGE_COLUNNS))) {
-      switch (entry.status()) {
-        case ADDED:
-          adds.add(entry.copyWithoutStats());
-          break;
-        case DELETED:
-          deletes.add(entry.copyWithoutStats());
-          break;
-        default:
+    try (CloseableIterable<ManifestEntry> entries = entries(fileSchema.select(CHANGE_COLUNNS))) {
+      for (ManifestEntry entry : entries) {
+        switch (entry.status()) {
+          case ADDED:
+            adds.add(entry.copyWithoutStats());
+            break;
+          case DELETED:
+            deletes.add(entry.copyWithoutStats());
+            break;
+          default:
+        }
       }
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to close manifest entries");
     }
 
     this.cachedAdds = adds;