You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/08/15 16:43:14 UTC
[incubator-iceberg] branch master updated: Add FindFiles helper API
(#377)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new dd4004b Add FindFiles helper API (#377)
dd4004b is described below
commit dd4004b0c8e6df7174a1f083ff8240d1b6ad7e3f
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Aug 15 09:43:09 2019 -0700
Add FindFiles helper API (#377)
---
.../main/java/org/apache/iceberg/FindFiles.java | 205 +++++++++++++++++++++
.../java/org/apache/iceberg/ManifestGroup.java | 4 +-
.../java/org/apache/iceberg/TableTestBase.java | 8 +-
.../java/org/apache/iceberg/TestFindFiles.java | 163 ++++++++++++++++
4 files changed, 374 insertions(+), 6 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/FindFiles.java b/core/src/main/java/org/apache/iceberg/FindFiles.java
new file mode 100644
index 0000000..79459b5
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/FindFiles.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.base.Preconditions;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+
+public class FindFiles {
+ private FindFiles() {
+ }
+
+ private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+
+ public static Builder in(Table table) {
+ return new Builder(table);
+ }
+
+ public static class Builder {
+ private final Table table;
+ private final TableOperations ops;
+ private boolean caseSensitive = true;
+ private Long snapshotId = null;
+ private Expression rowFilter = Expressions.alwaysTrue();
+ private Expression fileFilter = Expressions.alwaysTrue();
+ private Expression partitionFilter = Expressions.alwaysTrue();
+
+ public Builder(Table table) {
+ this.table = table;
+ this.ops = ((HasTableOperations) table).operations();
+ }
+
+ public Builder caseInsensitive() {
+ this.caseSensitive = false;
+ return this;
+ }
+
+ public Builder caseSensitive(boolean findCaseSensitive) {
+ this.caseSensitive = findCaseSensitive;
+ return this;
+ }
+
+ /**
+ * Base results on the given snapshot.
+ *
+ * @param findSnapshotId a snapshot ID
+ * @return this for method chaining
+ */
+ public Builder inSnapshot(long findSnapshotId) {
+ Preconditions.checkArgument(this.snapshotId == null,
+ "Cannot set snapshot multiple times, already set to id=%s", findSnapshotId);
+ Preconditions.checkArgument(table.snapshot(findSnapshotId) != null,
+ "Cannot find snapshot for id=%s", findSnapshotId);
+ this.snapshotId = findSnapshotId;
+ return this;
+ }
+
+ /**
+ * Base results on files in the snapshot that was current as of a timestamp.
+ *
+ * @param timestampMillis a timestamp in milliseconds
+ * @return this for method chaining
+ */
+ public Builder asOfTime(long timestampMillis) {
+ Preconditions.checkArgument(this.snapshotId == null,
+ "Cannot set snapshot multiple times, already set to id=%s", snapshotId);
+
+ Long lastSnapshotId = null;
+ for (HistoryEntry logEntry : ops.current().snapshotLog()) {
+ if (logEntry.timestampMillis() <= timestampMillis) {
+ lastSnapshotId = logEntry.snapshotId();
+ } else {
+ // the last snapshot ID was the last one older than the timestamp
+ break;
+ }
+ }
+
+ // the snapshot ID could be null if no entries were older than the requested time. in that
+ // case, there is no valid snapshot to read.
+ Preconditions.checkArgument(lastSnapshotId != null,
+ "Cannot find a snapshot older than %s",
+ DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestampMillis), ZoneId.systemDefault())));
+ return inSnapshot(lastSnapshotId);
+ }
+
+ /**
+ * Filter results using a record filter. Files that may contain at least one matching record
+ * will be returned by {@link #collect()}.
+ *
+ * @param expr a record filter
+ * @return this for method chaining
+ */
+ public Builder withRecordsMatching(Expression expr) {
+ this.rowFilter = Expressions.and(rowFilter, expr);
+ return this;
+ }
+
+ /**
+ * Filter results using a metadata filter for the data in a {@link DataFile}.
+ *
+ * @param expr a filter for {@link DataFile} metadata columns
+ * @return this for method chaining
+ */
+ public Builder withMetadataMatching(Expression expr) {
+ this.fileFilter = Expressions.and(fileFilter, expr);
+ return this;
+ }
+
+ /**
+ * Filter results to files in any one of the given partitions.
+ *
+ * @param spec a spec for the partitions
+ * @param partition a StructLike that stores a partition tuple
+ * @return this for method chaining
+ */
+ public Builder inPartition(PartitionSpec spec, StructLike partition) {
+ return inPartitions(spec, partition);
+ }
+
+ /**
+ * Filter results to files in any one of the given partitions.
+ *
+ * @param spec a spec for the partitions
+ * @param partitions one or more StructLike that stores a partition tuple
+ * @return this for method chaining
+ */
+ public Builder inPartitions(PartitionSpec spec, StructLike... partitions) {
+ return inPartitions(spec, Arrays.asList(partitions));
+ }
+
+ /**
+ * Filter results to files in any one of the given partitions.
+ *
+ * @param spec a spec for the partitions
+ * @param partitions a list of StructLike that stores a partition tuple
+ * @return this for method chaining
+ */
+ public Builder inPartitions(PartitionSpec spec, List<StructLike> partitions) {
+ Preconditions.checkArgument(spec.equals(ops.current().spec(spec.specId())),
+ "Partition spec does not belong to table: %s", table);
+
+ Expression partitionSetFilter = Expressions.alwaysFalse();
+ for (StructLike partitionData : partitions) {
+ Expression partFilter = Expressions.alwaysTrue();
+ for (int i = 0; i < spec.fields().size(); i += 1) {
+ PartitionField field = spec.fields().get(i);
+ partFilter = Expressions.and(
+ partFilter,
+ Expressions.equal(field.name(), partitionData.get(i, Object.class)));
+ }
+ partitionSetFilter = Expressions.or(partitionSetFilter, partFilter);
+ }
+
+ if (partitionFilter != Expressions.alwaysTrue()) {
+ this.partitionFilter = Expressions.or(partitionFilter, partitionSetFilter);
+ } else {
+ this.partitionFilter = partitionSetFilter;
+ }
+
+ return this;
+ }
+
+ /**
+ * @return all files in the table that match all of the filters
+ */
+ public CloseableIterable<DataFile> collect() {
+ Snapshot snapshot = snapshotId != null ?
+ ops.current().snapshot(snapshotId) : ops.current().currentSnapshot();
+
+ CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops, snapshot.manifests())
+ .filterData(rowFilter)
+ .filterFiles(fileFilter)
+ .filterPartitions(partitionFilter)
+ .ignoreDeleted()
+ .caseSensitive(caseSensitive)
+ .entries();
+
+ return CloseableIterable.transform(entries, entry -> entry.file().copyWithoutStats());
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index 01fdf8b..0b14fa4 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -96,7 +96,7 @@ class ManifestGroup {
public ManifestGroup filterPartitions(Expression expr) {
return new ManifestGroup(
- ops, manifests, dataFilter, fileFilter, Expressions.and(fileFilter, expr),
+ ops, manifests, dataFilter, fileFilter, Expressions.and(partitionFilter, expr),
ignoreDeleted, ignoreExisting, columns, caseSensitive);
}
@@ -139,7 +139,7 @@ class ManifestGroup {
* @return a CloseableIterable of manifest entries.
*/
public CloseableIterable<ManifestEntry> entries() {
- Evaluator evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), fileFilter);
+ Evaluator evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), fileFilter, caseSensitive);
Iterable<ManifestFile> matchingManifests = Iterables.filter(manifests,
manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index c55c07d..a86481a 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -54,25 +54,25 @@ public class TableTestBase {
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(0)
.withPartitionPath("data_bucket=0") // easy way to set partition data for now
- .withRecordCount(0)
+ .withRecordCount(1)
.build();
static final DataFile FILE_B = DataFiles.builder(SPEC)
.withPath("/path/to/data-b.parquet")
.withFileSizeInBytes(0)
.withPartitionPath("data_bucket=1") // easy way to set partition data for now
- .withRecordCount(0)
+ .withRecordCount(1)
.build();
static final DataFile FILE_C = DataFiles.builder(SPEC)
.withPath("/path/to/data-c.parquet")
.withFileSizeInBytes(0)
.withPartitionPath("data_bucket=2") // easy way to set partition data for now
- .withRecordCount(0)
+ .withRecordCount(1)
.build();
static final DataFile FILE_D = DataFiles.builder(SPEC)
.withPath("/path/to/data-d.parquet")
.withFileSizeInBytes(0)
.withPartitionPath("data_bucket=3") // easy way to set partition data for now
- .withRecordCount(0)
+ .withRecordCount(1)
.build();
@Rule
diff --git a/core/src/test/java/org/apache/iceberg/TestFindFiles.java b/core/src/test/java/org/apache/iceberg/TestFindFiles.java
new file mode 100644
index 0000000..98d4482
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestFindFiles.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import java.util.Arrays;
+import java.util.Set;
+import org.apache.iceberg.expressions.Expressions;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFindFiles extends TableTestBase {
+ @Test
+ public void testBasicBehavior() {
+ table.newAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ Iterable<DataFile> files = FindFiles.in(table).collect();
+
+ Assert.assertEquals(pathSet(FILE_A, FILE_B), pathSet(files));
+ }
+
+ @Test
+ public void testWithMetadataMatching() {
+ table.newAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .appendFile(FILE_C)
+ .appendFile(FILE_D)
+ .commit();
+
+ Iterable<DataFile> files = FindFiles.in(table)
+ .withMetadataMatching(Expressions.startsWith("file_path", "/path/to/data-a"))
+ .collect();
+
+ Assert.assertEquals(pathSet(FILE_A), pathSet(files));
+ }
+
+ @Test
+ public void testInPartition() {
+ table.newAppend()
+ .appendFile(FILE_A) // bucket 0
+ .appendFile(FILE_B) // bucket 1
+ .appendFile(FILE_C) // bucket 2
+ .appendFile(FILE_D) // bucket 3
+ .commit();
+
+ Iterable<DataFile> files = FindFiles.in(table)
+ .inPartition(table.spec(), StaticDataTask.Row.of(1))
+ .inPartition(table.spec(), StaticDataTask.Row.of(2))
+ .collect();
+
+ Assert.assertEquals(pathSet(FILE_B, FILE_C), pathSet(files));
+ }
+
+ @Test
+ public void testInPartitions() {
+ table.newAppend()
+ .appendFile(FILE_A) // bucket 0
+ .appendFile(FILE_B) // bucket 1
+ .appendFile(FILE_C) // bucket 2
+ .appendFile(FILE_D) // bucket 3
+ .commit();
+
+ Iterable<DataFile> files = FindFiles.in(table)
+ .inPartitions(table.spec(), StaticDataTask.Row.of(1), StaticDataTask.Row.of(2))
+ .collect();
+
+ Assert.assertEquals(pathSet(FILE_B, FILE_C), pathSet(files));
+ }
+
+ @Test
+ public void testAsOfTimestamp() {
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ table.newAppend()
+ .appendFile(FILE_B)
+ .commit();
+
+ long timestamp = System.currentTimeMillis();
+
+ table.newAppend()
+ .appendFile(FILE_C)
+ .commit();
+
+ table.newAppend()
+ .appendFile(FILE_D)
+ .commit();
+
+ Iterable<DataFile> files = FindFiles.in(table).asOfTime(timestamp).collect();
+
+ Assert.assertEquals(pathSet(FILE_A, FILE_B), pathSet(files));
+ }
+
+ @Test
+ public void testSnapshotId() {
+ table.newAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ table.newAppend()
+ .appendFile(FILE_C)
+ .commit();
+
+ long snapshotId = table.currentSnapshot().snapshotId();
+
+ table.newAppend()
+ .appendFile(FILE_D)
+ .commit();
+
+ Iterable<DataFile> files = FindFiles.in(table).inSnapshot(snapshotId).collect();
+
+ Assert.assertEquals(pathSet(FILE_A, FILE_B, FILE_C), pathSet(files));
+ }
+
+ @Test
+ public void testCaseSensitivity() {
+ table.newAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .appendFile(FILE_C)
+ .appendFile(FILE_D)
+ .commit();
+
+ Iterable<DataFile> files = FindFiles.in(table)
+ .caseInsensitive()
+ .withMetadataMatching(Expressions.startsWith("FILE_PATH", "/path/to/data-a"))
+ .collect();
+
+ Assert.assertEquals(pathSet(FILE_A), pathSet(files));
+ }
+
+ private Set<String> pathSet(DataFile... files) {
+ return Sets.newHashSet(Iterables.transform(Arrays.asList(files), file -> file.path().toString()));
+ }
+
+ private Set<String> pathSet(Iterable<DataFile> files) {
+ return Sets.newHashSet(Iterables.transform(files, file -> file.path().toString()));
+ }
+}