You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2018/12/20 20:05:16 UTC

[incubator-iceberg] branch master updated: Update ScanSummary behavior. (#29)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 8453b76  Update ScanSummary behavior. (#29)
8453b76 is described below

commit 8453b76fb45821f47d7e53e5b6182b54b87b6911
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Dec 20 12:05:12 2018 -0800

    Update ScanSummary behavior. (#29)
    
    * Use all manifests created after the start of the time range
    * Fail when the time range may include expired snapshots
    * Filter manifests with the data filter in ManifestGroup
---
 api/src/main/java/com/netflix/iceberg/Table.java   |  2 +-
 .../java/com/netflix/iceberg/ManifestGroup.java    | 17 ++++++++-
 .../main/java/com/netflix/iceberg/ScanSummary.java | 41 +++++++++++++++++-----
 3 files changed, 50 insertions(+), 10 deletions(-)

diff --git a/api/src/main/java/com/netflix/iceberg/Table.java b/api/src/main/java/com/netflix/iceberg/Table.java
index c785f8e..fe19fa2 100644
--- a/api/src/main/java/com/netflix/iceberg/Table.java
+++ b/api/src/main/java/com/netflix/iceberg/Table.java
@@ -69,7 +69,7 @@ public interface Table {
   String location();
 
   /**
-   * Get the current {@link Snapshot snapshot} for this table.
+   * Get the current {@link Snapshot snapshot} for this table, or null if there are no snapshots.
    *
    * @return the current table Snapshot.
    */
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestGroup.java b/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
index d05ceca..2f86fdf 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
@@ -19,6 +19,9 @@
 
 package com.netflix.iceberg;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -26,6 +29,7 @@ import com.google.common.collect.Sets;
 import com.netflix.iceberg.expressions.Evaluator;
 import com.netflix.iceberg.expressions.Expression;
 import com.netflix.iceberg.expressions.Expressions;
+import com.netflix.iceberg.expressions.InclusiveManifestEvaluator;
 import com.netflix.iceberg.io.CloseableIterable;
 import com.netflix.iceberg.types.Types;
 import java.io.Closeable;
@@ -43,6 +47,16 @@ class ManifestGroup {
   private final boolean ignoreDeleted;
   private final List<String> columns;
 
+  private final LoadingCache<Integer, InclusiveManifestEvaluator> EVAL_CACHE = CacheBuilder
+      .newBuilder()
+      .build(new CacheLoader<Integer, InclusiveManifestEvaluator>() {
+        @Override
+        public InclusiveManifestEvaluator load(Integer specId) {
+          PartitionSpec spec = ops.current().spec(specId);
+          return new InclusiveManifestEvaluator(spec, dataFilter);
+        }
+      });
+
   ManifestGroup(TableOperations ops, Iterable<ManifestFile> manifests) {
     this(ops, Sets.newHashSet(manifests), Expressions.alwaysTrue(), Expressions.alwaysTrue(),
         false, ImmutableList.of("*"));
@@ -94,7 +108,8 @@ class ManifestGroup {
     Evaluator evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), fileFilter);
     List<Closeable> toClose = Lists.newArrayList();
 
-    Iterable<ManifestFile> matchingManifests = manifests;
+    Iterable<ManifestFile> matchingManifests = Iterables.filter(manifests,
+        manifest -> EVAL_CACHE.getUnchecked(manifest.partitionSpecId()).eval(manifest));
 
     if (ignoreDeleted) {
       // remove any manifests that don't have any existing or added files. if either the added or
diff --git a/core/src/main/java/com/netflix/iceberg/ScanSummary.java b/core/src/main/java/com/netflix/iceberg/ScanSummary.java
index 7315786..909495c 100644
--- a/core/src/main/java/com/netflix/iceberg/ScanSummary.java
+++ b/core/src/main/java/com/netflix/iceberg/ScanSummary.java
@@ -149,6 +149,10 @@ public class ScanSummary {
      * @return a map from partition key to metrics for that partition.
      */
     public Map<String, PartitionMetrics> build() {
+      if (table.currentSnapshot() == null) {
+        return ImmutableMap.of(); // no snapshots, so there are no partitions
+      }
+
       TopN<String, PartitionMetrics> topN = new TopN<>(
           limit, throwIfLimited, Comparators.charSequences());
 
@@ -165,22 +169,43 @@ public class ScanSummary {
         long minTimestamp = range.first();
         long maxTimestamp = range.second();
 
+        Snapshot oldestSnapshot = table.currentSnapshot();
         for (Map.Entry<Long, Long> entry : snapshotTimestamps.entrySet()) {
           long snapshotId = entry.getKey();
           long timestamp = entry.getValue();
+
+          if (timestamp < oldestSnapshot.timestampMillis()) {
+            oldestSnapshot = ops.current().snapshot(snapshotId);
+          }
+
           if (timestamp >= minTimestamp && timestamp <= maxTimestamp) {
             snapshotsInTimeRange.add(snapshotId);
           }
         }
 
-        // when filtering by dateCreated or lastUpdated timestamp, this matches the set of files
-        // that were added in the time range. files are added in new snapshots, so to get the new
-        // files, this only needs to scan new manifests in the set of snapshots that match the
-        // filter. ManifestFile.snapshotId() returns the snapshot when the manifest was added, so
-        // the only manifests that need to be scanned are those with snapshotId() in the timestamp
-        // range, or those that don't have a snapshot ID.
-        manifests = Iterables.filter(manifests, manifest ->
-            manifest.snapshotId() == null || snapshotsInTimeRange.contains(manifest.snapshotId()));
+        // if oldest known snapshot is in the range, then there may be an expired snapshot that has
+        // been removed that matched the range. because the timestamp of that snapshot is unknown,
+        // it can't be included in the results and the results are not reliable.
+        if (snapshotsInTimeRange.contains(oldestSnapshot.snapshotId()) &&
+            minTimestamp < oldestSnapshot.timestampMillis()) {
+          throw new IllegalArgumentException(
+              "Cannot satisfy time filters: time range may include expired snapshots");
+        }
+
+        // filter down to the the set of manifest files that were added after the start of the
+        // time range. manifests after the end of the time range must be included because
+        // compaction may create a manifest after the time range that includes files added in the
+        // range.
+        manifests = Iterables.filter(manifests, manifest -> {
+          if (manifest.snapshotId() == null) {
+            return true; // can't tell when the manifest was written, so it may contain matches
+          }
+
+          Long timestamp = snapshotTimestamps.get(manifest.snapshotId());
+          // if the timestamp is null, then its snapshot has expired. the check for the oldest
+          // snapshot ensures that all expired snapshots are not in the time range.
+          return timestamp != null && timestamp >= minTimestamp;
+        });
       }
 
       try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops, manifests)