You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2018/12/20 20:05:16 UTC
[incubator-iceberg] branch master updated: Update ScanSummary
behavior. (#29)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 8453b76 Update ScanSummary behavior. (#29)
8453b76 is described below
commit 8453b76fb45821f47d7e53e5b6182b54b87b6911
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Dec 20 12:05:12 2018 -0800
Update ScanSummary behavior. (#29)
* Use all manifests created after the start of the time range
* Fail when the time range may include expired snapshots
* Filter manifests with the data filter in ManifestGroup
---
api/src/main/java/com/netflix/iceberg/Table.java | 2 +-
.../java/com/netflix/iceberg/ManifestGroup.java | 17 ++++++++-
.../main/java/com/netflix/iceberg/ScanSummary.java | 41 +++++++++++++++++-----
3 files changed, 50 insertions(+), 10 deletions(-)
diff --git a/api/src/main/java/com/netflix/iceberg/Table.java b/api/src/main/java/com/netflix/iceberg/Table.java
index c785f8e..fe19fa2 100644
--- a/api/src/main/java/com/netflix/iceberg/Table.java
+++ b/api/src/main/java/com/netflix/iceberg/Table.java
@@ -69,7 +69,7 @@ public interface Table {
String location();
/**
- * Get the current {@link Snapshot snapshot} for this table.
+ * Get the current {@link Snapshot snapshot} for this table, or null if there are no snapshots.
*
* @return the current table Snapshot.
*/
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestGroup.java b/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
index d05ceca..2f86fdf 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
@@ -19,6 +19,9 @@
package com.netflix.iceberg;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -26,6 +29,7 @@ import com.google.common.collect.Sets;
import com.netflix.iceberg.expressions.Evaluator;
import com.netflix.iceberg.expressions.Expression;
import com.netflix.iceberg.expressions.Expressions;
+import com.netflix.iceberg.expressions.InclusiveManifestEvaluator;
import com.netflix.iceberg.io.CloseableIterable;
import com.netflix.iceberg.types.Types;
import java.io.Closeable;
@@ -43,6 +47,16 @@ class ManifestGroup {
private final boolean ignoreDeleted;
private final List<String> columns;
+ private final LoadingCache<Integer, InclusiveManifestEvaluator> EVAL_CACHE = CacheBuilder
+ .newBuilder()
+ .build(new CacheLoader<Integer, InclusiveManifestEvaluator>() {
+ @Override
+ public InclusiveManifestEvaluator load(Integer specId) {
+ PartitionSpec spec = ops.current().spec(specId);
+ return new InclusiveManifestEvaluator(spec, dataFilter);
+ }
+ });
+
ManifestGroup(TableOperations ops, Iterable<ManifestFile> manifests) {
this(ops, Sets.newHashSet(manifests), Expressions.alwaysTrue(), Expressions.alwaysTrue(),
false, ImmutableList.of("*"));
@@ -94,7 +108,8 @@ class ManifestGroup {
Evaluator evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), fileFilter);
List<Closeable> toClose = Lists.newArrayList();
- Iterable<ManifestFile> matchingManifests = manifests;
+ Iterable<ManifestFile> matchingManifests = Iterables.filter(manifests,
+ manifest -> EVAL_CACHE.getUnchecked(manifest.partitionSpecId()).eval(manifest));
if (ignoreDeleted) {
// remove any manifests that don't have any existing or added files. if either the added or
diff --git a/core/src/main/java/com/netflix/iceberg/ScanSummary.java b/core/src/main/java/com/netflix/iceberg/ScanSummary.java
index 7315786..909495c 100644
--- a/core/src/main/java/com/netflix/iceberg/ScanSummary.java
+++ b/core/src/main/java/com/netflix/iceberg/ScanSummary.java
@@ -149,6 +149,10 @@ public class ScanSummary {
* @return a map from partition key to metrics for that partition.
*/
public Map<String, PartitionMetrics> build() {
+ if (table.currentSnapshot() == null) {
+ return ImmutableMap.of(); // no snapshots, so there are no partitions
+ }
+
TopN<String, PartitionMetrics> topN = new TopN<>(
limit, throwIfLimited, Comparators.charSequences());
@@ -165,22 +169,43 @@ public class ScanSummary {
long minTimestamp = range.first();
long maxTimestamp = range.second();
+ Snapshot oldestSnapshot = table.currentSnapshot();
for (Map.Entry<Long, Long> entry : snapshotTimestamps.entrySet()) {
long snapshotId = entry.getKey();
long timestamp = entry.getValue();
+
+ if (timestamp < oldestSnapshot.timestampMillis()) {
+ oldestSnapshot = ops.current().snapshot(snapshotId);
+ }
+
if (timestamp >= minTimestamp && timestamp <= maxTimestamp) {
snapshotsInTimeRange.add(snapshotId);
}
}
- // when filtering by dateCreated or lastUpdated timestamp, this matches the set of files
- // that were added in the time range. files are added in new snapshots, so to get the new
- // files, this only needs to scan new manifests in the set of snapshots that match the
- // filter. ManifestFile.snapshotId() returns the snapshot when the manifest was added, so
- // the only manifests that need to be scanned are those with snapshotId() in the timestamp
- // range, or those that don't have a snapshot ID.
- manifests = Iterables.filter(manifests, manifest ->
- manifest.snapshotId() == null || snapshotsInTimeRange.contains(manifest.snapshotId()));
+ // if oldest known snapshot is in the range, then there may be an expired snapshot that has
+ // been removed that matched the range. because the timestamp of that snapshot is unknown,
+ // it can't be included in the results and the results are not reliable.
+ if (snapshotsInTimeRange.contains(oldestSnapshot.snapshotId()) &&
+ minTimestamp < oldestSnapshot.timestampMillis()) {
+ throw new IllegalArgumentException(
+ "Cannot satisfy time filters: time range may include expired snapshots");
+ }
+
+ // filter down to the the set of manifest files that were added after the start of the
+ // time range. manifests after the end of the time range must be included because
+ // compaction may create a manifest after the time range that includes files added in the
+ // range.
+ manifests = Iterables.filter(manifests, manifest -> {
+ if (manifest.snapshotId() == null) {
+ return true; // can't tell when the manifest was written, so it may contain matches
+ }
+
+ Long timestamp = snapshotTimestamps.get(manifest.snapshotId());
+ // if the timestamp is null, then its snapshot has expired. the check for the oldest
+ // snapshot ensures that all expired snapshots are not in the time range.
+ return timestamp != null && timestamp >= minTimestamp;
+ });
}
try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops, manifests)