You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/08/02 19:13:13 UTC
[incubator-iceberg] branch master updated: Improve filtering in
Snapshot#addedFiles (#341)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f71e9de Improve filtering in Snapshot#addedFiles (#341)
f71e9de is described below
commit f71e9de1a9a340725dd0f33d994b41bb8ba9136d
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Fri Aug 2 12:13:08 2019 -0700
Improve filtering in Snapshot#addedFiles (#341)
---
.../main/java/org/apache/iceberg/BaseSnapshot.java | 49 ++++++++++++----------
.../java/org/apache/iceberg/ManifestReader.java | 24 ++++++-----
2 files changed, 41 insertions(+), 32 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
index c2e59af..ff44a9c 100644
--- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
@@ -20,6 +20,8 @@
package org.apache.iceberg;
import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.IOException;
@@ -152,32 +154,35 @@ class BaseSnapshot implements Snapshot {
}
private void cacheChanges() {
- List<DataFile> adds = Lists.newArrayList();
- List<DataFile> deletes = Lists.newArrayList();
-
- // accumulate adds and deletes from all manifests.
- // because manifests can be reused in newer snapshots, filter the changes by snapshot id.
- for (String manifest : Iterables.transform(manifests(), ManifestFile::path)) {
- try (ManifestReader reader = ManifestReader.read(
- ops.io().newInputFile(manifest),
- ops.current()::spec)) {
- for (ManifestEntry add : reader.addedFiles()) {
- if (add.snapshotId() == snapshotId) {
- adds.add(add.file().copyWithoutStats());
- }
+ ImmutableList.Builder<DataFile> adds = ImmutableList.builder();
+ ImmutableList.Builder<DataFile> deletes = ImmutableList.builder();
+
+ // read only manifests that were created by this snapshot
+ Iterable<ManifestFile> changedManifests = Iterables.filter(manifests(),
+ manifest -> Objects.equal(manifest.snapshotId(), snapshotId));
+ try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops, changedManifests)
+ .ignoreExisting()
+ .select(ManifestReader.CHANGE_COLUNNS)
+ .entries()) {
+ for (ManifestEntry entry : entries) {
+ switch (entry.status()) {
+ case ADDED:
+ adds.add(entry.file().copyWithoutStats());
+ break;
+ case DELETED:
+ deletes.add(entry.file().copyWithoutStats());
+ break;
+ default:
+ throw new IllegalStateException(
+ "Unexpected entry status, not added or deleted: " + entry);
}
- for (ManifestEntry delete : reader.deletedFiles()) {
- if (delete.snapshotId() == snapshotId) {
- deletes.add(delete.file().copyWithoutStats());
- }
- }
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to close reader while caching changes");
}
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to close entries while caching changes");
}
- this.cachedAdds = adds;
- this.cachedDeletes = deletes;
+ this.cachedAdds = adds.build();
+ this.cachedDeletes = deletes.build();
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index dceac87..c634d29 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -50,7 +50,7 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
private static final Logger LOG = LoggerFactory.getLogger(ManifestReader.class);
private static final List<String> ALL_COLUMNS = Lists.newArrayList("*");
- private static final List<String> CHANGE_COLUNNS = Lists.newArrayList(
+ static final List<String> CHANGE_COLUNNS = Lists.newArrayList(
"file_path", "file_format", "partition", "record_count", "file_size_in_bytes");
// Visible for testing
@@ -161,16 +161,20 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
List<ManifestEntry> adds = Lists.newArrayList();
List<ManifestEntry> deletes = Lists.newArrayList();
- for (ManifestEntry entry : entries(fileSchema.select(CHANGE_COLUNNS))) {
- switch (entry.status()) {
- case ADDED:
- adds.add(entry.copyWithoutStats());
- break;
- case DELETED:
- deletes.add(entry.copyWithoutStats());
- break;
- default:
+ try (CloseableIterable<ManifestEntry> entries = entries(fileSchema.select(CHANGE_COLUNNS))) {
+ for (ManifestEntry entry : entries) {
+ switch (entry.status()) {
+ case ADDED:
+ adds.add(entry.copyWithoutStats());
+ break;
+ case DELETED:
+ deletes.add(entry.copyWithoutStats());
+ break;
+ default:
+ }
}
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to close manifest entries");
}
this.cachedAdds = adds;