You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/07/12 19:55:40 UTC
[incubator-iceberg] branch master updated: Add metadata tables
(#252)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new e4c6fb9 Add metadata tables (#252)
e4c6fb9 is described below
commit e4c6fb979003b1629961b0c950ec6acec994de3b
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Fri Jul 12 12:55:36 2019 -0700
Add metadata tables (#252)
This commit adds metadata tables:
* history -- the snapshot log
* snapshots -- known snapshots
* manifests -- manifest files for a snapshot
* files -- data files for a snapshot
* entries -- manifest entries for a snapshot
---
.../iceberg/{ScanTask.java => DataTask.java} | 30 +-
.../main/java/org/apache/iceberg/FileFormat.java | 22 +-
.../iceberg/{ScanTask.java => HistoryEntry.java} | 30 +-
.../main/java/org/apache/iceberg/ManifestFile.java | 27 ++
api/src/main/java/org/apache/iceberg/ScanTask.java | 15 +
api/src/main/java/org/apache/iceberg/Table.java | 8 +
.../main/java/org/apache/iceberg/TableScan.java | 10 +
...nifestEvaluator.java => ManifestEvaluator.java} | 23 +-
.../iceberg/expressions/ResidualEvaluator.java | 45 ++-
.../org/apache/iceberg/io/CloseableIterable.java | 11 +
.../TestInclusiveManifestEvaluator.java | 130 +++++----
.../apache/iceberg/transforms/TestResiduals.java | 25 +-
.../{BaseTable.java => BaseMetadataTable.java} | 98 +++----
.../org/apache/iceberg/BaseMetastoreCatalog.java | 49 +++-
.../main/java/org/apache/iceberg/BaseTable.java | 8 +-
.../java/org/apache/iceberg/BaseTableScan.java | 100 +++----
.../java/org/apache/iceberg/BaseTransaction.java | 5 +
.../main/java/org/apache/iceberg/DataFiles.java | 11 +
.../java/org/apache/iceberg/DataFilesTable.java | 161 +++++++++++
.../java/org/apache/iceberg/DataTableScan.java | 108 ++++++++
.../java/org/apache/iceberg/FilteredManifest.java | 12 +-
.../main/java/org/apache/iceberg/HistoryTable.java | 105 +++++++
.../org/apache/iceberg/ManifestEntriesTable.java | 114 ++++++++
.../java/org/apache/iceberg/ManifestGroup.java | 108 ++++++--
.../java/org/apache/iceberg/ManifestsTable.java | 123 +++++++++
.../java/org/apache/iceberg/SnapshotsTable.java | 93 +++++++
.../java/org/apache/iceberg/StaticDataTask.java | 116 ++++++++
.../java/org/apache/iceberg/StaticTableScan.java | 63 +++++
.../java/org/apache/iceberg/TableMetadata.java | 26 +-
.../org/apache/iceberg/TableMetadataParser.java | 2 +-
.../java/org/apache/iceberg/util/SnapshotUtil.java | 58 ++++
...stBaseTableScan.java => TestDataTableScan.java} | 2 +-
.../apache/iceberg/TestEntriesMetadataTable.java | 68 +++++
.../org/apache/iceberg/TestTableMetadataJson.java | 4 +-
.../java/org/apache/iceberg/hive/HiveCatalog.java | 2 +-
.../org/apache/iceberg/spark/source/Reader.java | 50 ++--
.../iceberg/spark/source/StructInternalRow.java | 276 +++++++++++++++++++
.../org/apache/iceberg/spark/data/TestHelpers.java | 2 +-
.../spark/source/TestIcebergSourceHiveTables.java | 305 +++++++++++++++++++++
39 files changed, 2107 insertions(+), 338 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/ScanTask.java b/api/src/main/java/org/apache/iceberg/DataTask.java
similarity index 53%
copy from api/src/main/java/org/apache/iceberg/ScanTask.java
copy to api/src/main/java/org/apache/iceberg/DataTask.java
index 7c7350e..329b7dc 100644
--- a/api/src/main/java/org/apache/iceberg/ScanTask.java
+++ b/api/src/main/java/org/apache/iceberg/DataTask.java
@@ -19,32 +19,24 @@
package org.apache.iceberg;
-import java.io.Serializable;
+import org.apache.iceberg.io.CloseableIterable;
/**
- * A scan task.
+ * A task that returns data as {@link StructLike rows} instead of where to read data.
*/
-public interface ScanTask extends Serializable {
- /**
- * @return true if this is a {@link FileScanTask}, false otherwise.
- */
- default boolean isFileScanTask() {
- return false;
+public interface DataTask extends FileScanTask {
+ @Override
+ default boolean isDataTask() {
+ return true;
}
- /**
- * @return this cast to {@link FileScanTask} if it is one
- * @throws IllegalStateException if this is not a {@link FileScanTask}
- */
- default FileScanTask asFileScanTask() {
- throw new IllegalStateException("Not a FileScanTask: " + this);
+ @Override
+ default DataTask asDataTask() {
+ return this;
}
/**
- * @return this cast to {@link CombinedScanTask} if it is one
- * @throws IllegalStateException if this is not a {@link CombinedScanTask}
+ * @return an iterable of {@link StructLike} rows
*/
- default CombinedScanTask asCombinedScanTask() {
- throw new IllegalStateException("Not a CombinedScanTask: " + this);
- }
+ CloseableIterable<StructLike> rows();
}
diff --git a/api/src/main/java/org/apache/iceberg/FileFormat.java b/api/src/main/java/org/apache/iceberg/FileFormat.java
index 27729af..6bcab8e 100644
--- a/api/src/main/java/org/apache/iceberg/FileFormat.java
+++ b/api/src/main/java/org/apache/iceberg/FileFormat.java
@@ -27,7 +27,8 @@ import org.apache.iceberg.types.Comparators;
public enum FileFormat {
ORC("orc", true),
PARQUET("parquet", true),
- AVRO("avro", true);
+ AVRO("avro", true),
+ METADATA("metadata.json", false);
private final String ext;
private final boolean splittable;
@@ -55,28 +56,13 @@ public enum FileFormat {
}
public static FileFormat fromFileName(CharSequence filename) {
- int lastIndex = lastIndexOf('.', filename);
- if (lastIndex < 0) {
- return null;
- }
-
- CharSequence ext = filename.subSequence(lastIndex, filename.length());
-
for (FileFormat format : FileFormat.values()) {
- if (Comparators.charSequences().compare(format.ext, ext) == 0) {
+ int extStart = filename.length() - format.ext.length();
+ if (Comparators.charSequences().compare(format.ext, filename.subSequence(extStart, filename.length())) == 0) {
return format;
}
}
return null;
}
-
- private static int lastIndexOf(char ch, CharSequence seq) {
- for (int i = seq.length() - 1; i >= 0; i -= 1) {
- if (seq.charAt(i) == ch) {
- return i;
- }
- }
- return -1;
- }
}
diff --git a/api/src/main/java/org/apache/iceberg/ScanTask.java b/api/src/main/java/org/apache/iceberg/HistoryEntry.java
similarity index 53%
copy from api/src/main/java/org/apache/iceberg/ScanTask.java
copy to api/src/main/java/org/apache/iceberg/HistoryEntry.java
index 7c7350e..5b0b595 100644
--- a/api/src/main/java/org/apache/iceberg/ScanTask.java
+++ b/api/src/main/java/org/apache/iceberg/HistoryEntry.java
@@ -19,32 +19,20 @@
package org.apache.iceberg;
-import java.io.Serializable;
-
/**
- * A scan task.
+ * Table history entry.
+ * <p>
+ * An entry contains a change to the table state. At the given timestamp, the current snapshot was
+ * set to the given snapshot ID.
*/
-public interface ScanTask extends Serializable {
- /**
- * @return true if this is a {@link FileScanTask}, false otherwise.
- */
- default boolean isFileScanTask() {
- return false;
- }
-
+public interface HistoryEntry {
/**
- * @return this cast to {@link FileScanTask} if it is one
- * @throws IllegalStateException if this is not a {@link FileScanTask}
+ * @return the timestamp in milliseconds of the change
*/
- default FileScanTask asFileScanTask() {
- throw new IllegalStateException("Not a FileScanTask: " + this);
- }
+ long timestampMillis();
/**
- * @return this cast to {@link CombinedScanTask} if it is one
- * @throws IllegalStateException if this is not a {@link CombinedScanTask}
+ * @return ID of the new current snapshot
*/
- default CombinedScanTask asCombinedScanTask() {
- throw new IllegalStateException("Not a CombinedScanTask: " + this);
- }
+ long snapshotId();
}
diff --git a/api/src/main/java/org/apache/iceberg/ManifestFile.java b/api/src/main/java/org/apache/iceberg/ManifestFile.java
index 2618037..7163980 100644
--- a/api/src/main/java/org/apache/iceberg/ManifestFile.java
+++ b/api/src/main/java/org/apache/iceberg/ManifestFile.java
@@ -69,16 +69,43 @@ public interface ManifestFile {
Long snapshotId();
/**
+ * Returns true if the manifest contains ADDED entries or if the count is not known.
+ *
+ * @return whether this manifest contains entries with ADDED status
+ */
+ default boolean hasAddedFiles() {
+ return addedFilesCount() == null || addedFilesCount() > 0;
+ }
+
+ /**
* @return the number of data files with status ADDED in the manifest file
*/
Integer addedFilesCount();
/**
+ * Returns true if the manifest contains EXISTING entries or if the count is not known.
+ *
+ * @return whether this manifest contains entries with EXISTING status
+ */
+ default boolean hasExistingFiles() {
+ return existingFilesCount() == null || existingFilesCount() > 0;
+ }
+
+ /**
* @return the number of data files with status EXISTING in the manifest file
*/
Integer existingFilesCount();
/**
+ * Returns true if the manifest contains DELETED entries or if the count is not known.
+ *
+ * @return whether this manifest contains entries with DELETED status
+ */
+ default boolean hasDeletedFiles() {
+ return deletedFilesCount() == null || deletedFilesCount() > 0;
+ }
+
+ /**
* @return the number of data files with status DELETED in the manifest file
*/
Integer deletedFilesCount();
diff --git a/api/src/main/java/org/apache/iceberg/ScanTask.java b/api/src/main/java/org/apache/iceberg/ScanTask.java
index 7c7350e..8a17144 100644
--- a/api/src/main/java/org/apache/iceberg/ScanTask.java
+++ b/api/src/main/java/org/apache/iceberg/ScanTask.java
@@ -41,6 +41,21 @@ public interface ScanTask extends Serializable {
}
/**
+ * @return true if this is a {@link DataTask}, false otherwise.
+ */
+ default boolean isDataTask() {
+ return false;
+ }
+
+ /**
+ * @return this cast to {@link DataTask} if it is one
+ * @throws IllegalStateException if this is not a {@link DataTask}
+ */
+ default DataTask asDataTask() {
+ throw new IllegalStateException("Not a DataTask: " + this);
+ }
+
+ /**
* @return this cast to {@link CombinedScanTask} if it is one
* @throws IllegalStateException if this is not a {@link CombinedScanTask}
*/
diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java
index b957f2e..db15d0e 100644
--- a/api/src/main/java/org/apache/iceberg/Table.java
+++ b/api/src/main/java/org/apache/iceberg/Table.java
@@ -19,6 +19,7 @@
package org.apache.iceberg;
+import java.util.List;
import java.util.Map;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
@@ -94,6 +95,13 @@ public interface Table {
Iterable<Snapshot> snapshots();
/**
+ * Get the snapshot history of this table.
+ *
+ * @return a list of {@link HistoryEntry history entries}
+ */
+ List<HistoryEntry> history();
+
+ /**
* Create a new {@link UpdateSchema} to alter the columns of this table and commit the change.
*
* @return a new {@link UpdateSchema}
diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java b/api/src/main/java/org/apache/iceberg/TableScan.java
index 8636a94..8259fd1 100644
--- a/api/src/main/java/org/apache/iceberg/TableScan.java
+++ b/api/src/main/java/org/apache/iceberg/TableScan.java
@@ -156,6 +156,16 @@ public interface TableScan {
Schema schema();
/**
+ * Returns the {@link Snapshot} that will be used by this scan.
+ * <p>
+ * If the snapshot was not configured using {@link #asOfTime(long)} or {@link #useSnapshot(long)}, the current table
+ * snapshot will be used.
+ *
+ * @return the Snapshot this scan will use
+ */
+ Snapshot snapshot();
+
+ /**
* Returns whether this scan should apply column name case sensitiveness as per {@link #caseSensitive(boolean)}.
* @return true if case sensitive, false otherwise.
*/
diff --git a/api/src/main/java/org/apache/iceberg/expressions/InclusiveManifestEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java
similarity index 90%
rename from api/src/main/java/org/apache/iceberg/expressions/InclusiveManifestEvaluator.java
rename to api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java
index 7550ea5..85bac1c 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/InclusiveManifestEvaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java
@@ -25,7 +25,6 @@ import org.apache.iceberg.Accessors;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFile.PartitionFieldSummary;
import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types.StructType;
@@ -36,15 +35,14 @@ import static org.apache.iceberg.expressions.Expressions.rewriteNot;
* Evaluates an {@link Expression} on a {@link ManifestFile} to test whether the file contains
* matching partitions.
* <p>
- * This evaluation is inclusive: it returns true if a file may match and false if it cannot match.
+ * For row expressions, evaluation is inclusive: it returns true if a file may match and false if it cannot match.
* <p>
* Files are passed to {@link #eval(ManifestFile)}, which returns true if the manifest may contain
* data files that match the partition expression. Manifest files may be skipped if and only if the
* return value of {@code eval} is false.
*/
-public class InclusiveManifestEvaluator {
+public class ManifestEvaluator {
private final StructType struct;
- private final Schema schema;
private final Expression expr;
private transient ThreadLocal<ManifestEvalVisitor> visitors = null;
@@ -55,17 +53,18 @@ public class InclusiveManifestEvaluator {
return visitors.get();
}
- public InclusiveManifestEvaluator(PartitionSpec spec, Expression rowFilter) {
- this(spec, rowFilter, true);
+ public static ManifestEvaluator forRowFilter(Expression rowFilter, PartitionSpec spec, boolean caseSensitive) {
+ return new ManifestEvaluator(spec, Projections.inclusive(spec, caseSensitive).project(rowFilter), caseSensitive);
}
- public InclusiveManifestEvaluator(PartitionSpec spec, Expression rowFilter, boolean caseSensitive) {
+ public static ManifestEvaluator forPartitionFilter(
+ Expression partitionFilter, PartitionSpec spec, boolean caseSensitive) {
+ return new ManifestEvaluator(spec, partitionFilter, caseSensitive);
+ }
+
+ private ManifestEvaluator(PartitionSpec spec, Expression partitionFilter, boolean caseSensitive) {
this.struct = spec.partitionType();
- this.expr = Binder.bind(
- struct,
- rewriteNot(Projections.inclusive(spec, caseSensitive).project(rowFilter)),
- caseSensitive);
- this.schema = new Schema(struct.fields());
+ this.expr = Binder.bind(struct, rewriteNot(partitionFilter), caseSensitive);
}
/**
diff --git a/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
index 4ffcc2f..0fae40d 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
@@ -51,6 +51,45 @@ import org.apache.iceberg.transforms.Transform;
* This class is thread-safe.
*/
public class ResidualEvaluator implements Serializable {
+ private static class UnpartitionedResidualEvaluator extends ResidualEvaluator {
+ private final Expression expr;
+
+ UnpartitionedResidualEvaluator(Expression expr) {
+ super(PartitionSpec.unpartitioned(), expr, false);
+ this.expr = expr;
+ }
+
+ @Override
+ public Expression residualFor(StructLike ignored) {
+ return expr;
+ }
+ }
+
+ /**
+ * Return a residual evaluator for an unpartitioned {@link PartitionSpec spec}.
+ *
+ * @param expr an expression
+ * @return a residual evaluator that always returns the expression
+ */
+ public static ResidualEvaluator unpartitioned(Expression expr) {
+ return new UnpartitionedResidualEvaluator(expr);
+ }
+
+ /**
+ * Return a residual evaluator for a {@link PartitionSpec spec} and {@link Expression expression}.
+ *
+ * @param spec a partition spec
+ * @param expr an expression
+ * @return a residual evaluator for the expression
+ */
+ public static ResidualEvaluator of(PartitionSpec spec, Expression expr, boolean caseSensitive) {
+ if (spec.fields().size() > 0) {
+ return new ResidualEvaluator(spec, expr, caseSensitive);
+ } else {
+ return unpartitioned(expr);
+ }
+ }
+
private final PartitionSpec spec;
private final Expression expr;
private final boolean caseSensitive;
@@ -63,7 +102,7 @@ public class ResidualEvaluator implements Serializable {
return visitors.get();
}
- public ResidualEvaluator(PartitionSpec spec, Expression expr, boolean caseSensitive) {
+ private ResidualEvaluator(PartitionSpec spec, Expression expr, boolean caseSensitive) {
this.spec = spec;
this.expr = expr;
this.caseSensitive = caseSensitive;
@@ -82,8 +121,8 @@ public class ResidualEvaluator implements Serializable {
private class ResidualVisitor extends ExpressionVisitors.BoundExpressionVisitor<Expression> {
private StructLike struct;
- private Expression eval(StructLike structLike) {
- this.struct = structLike;
+ private Expression eval(StructLike dataStruct) {
+ this.struct = dataStruct;
return ExpressionVisitors.visit(expr, this);
}
diff --git a/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java b/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
index 329bd4e..731d1ef 100644
--- a/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
+++ b/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
@@ -20,14 +20,21 @@
package org.apache.iceberg.io;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.function.Function;
+import java.util.function.Predicate;
import org.apache.iceberg.exceptions.RuntimeIOException;
public interface CloseableIterable<T> extends Iterable<T>, Closeable {
+ static <E> CloseableIterable<E> withNoopClose(E entry) {
+ return withNoopClose(ImmutableList.of(entry));
+ }
+
static <E> CloseableIterable<E> withNoopClose(Iterable<E> iterable) {
return new CloseableIterable<E>() {
@Override
@@ -59,6 +66,10 @@ public interface CloseableIterable<T> extends Iterable<T>, Closeable {
};
}
+ static <E> CloseableIterable<E> filter(CloseableIterable<E> iterable, Predicate<E> pred) {
+ return combine(Iterables.filter(iterable, pred::test), iterable);
+ }
+
static <I, O> CloseableIterable<O> transform(CloseableIterable<I> iterable, Function<I, O> transform) {
Preconditions.checkNotNull(transform, "Cannot apply a null transform");
diff --git a/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveManifestEvaluator.java b/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveManifestEvaluator.java
index c62d32c..3859f00 100644
--- a/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveManifestEvaluator.java
+++ b/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveManifestEvaluator.java
@@ -79,25 +79,25 @@ public class TestInclusiveManifestEvaluator {
@Test
public void testAllNulls() {
- boolean shouldRead = new InclusiveManifestEvaluator(SPEC, notNull("all_nulls")).eval(FILE);
+ boolean shouldRead = ManifestEvaluator.forRowFilter(notNull("all_nulls"), SPEC, true).eval(FILE);
Assert.assertFalse("Should skip: no non-null value in all null column", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, notNull("some_nulls")).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(notNull("some_nulls"), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: column with some nulls contains a non-null value", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, notNull("no_nulls")).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(notNull("no_nulls"), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: non-null column contains a non-null value", shouldRead);
}
@Test
public void testNoNulls() {
- boolean shouldRead = new InclusiveManifestEvaluator(SPEC, isNull("all_nulls")).eval(FILE);
+ boolean shouldRead = ManifestEvaluator.forRowFilter(isNull("all_nulls"), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: at least one null value in all null column", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, isNull("some_nulls")).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(isNull("some_nulls"), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: column with some nulls contains a null value", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, isNull("no_nulls")).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(isNull("no_nulls"), SPEC, true).eval(FILE);
Assert.assertFalse("Should skip: non-null column contains no null values", shouldRead);
}
@@ -105,7 +105,7 @@ public class TestInclusiveManifestEvaluator {
public void testMissingColumn() {
TestHelpers.assertThrows("Should complain about missing column in expression",
ValidationException.class, "Cannot find field 'missing'",
- () -> new InclusiveManifestEvaluator(SPEC, lessThan("missing", 5)).eval(FILE));
+ () -> ManifestEvaluator.forRowFilter(lessThan("missing", 5), SPEC, true).eval(FILE));
}
@Test
@@ -117,7 +117,7 @@ public class TestInclusiveManifestEvaluator {
};
for (Expression expr : exprs) {
- boolean shouldRead = new InclusiveManifestEvaluator(SPEC, expr).eval(NO_STATS);
+ boolean shouldRead = ManifestEvaluator.forRowFilter(expr, SPEC, true).eval(NO_STATS);
Assert.assertTrue("Should read when missing stats for expr: " + expr, shouldRead);
}
}
@@ -125,194 +125,190 @@ public class TestInclusiveManifestEvaluator {
@Test
public void testNot() {
// this test case must use a real predicate, not alwaysTrue(), or binding will simplify it out
- boolean shouldRead = new InclusiveManifestEvaluator(SPEC, not(lessThan("id", 5))).eval(FILE);
+ boolean shouldRead = ManifestEvaluator.forRowFilter(not(lessThan("id", 5)), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: not(false)", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, not(greaterThan("id", 5))).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(not(greaterThan("id", 5)), SPEC, true).eval(FILE);
Assert.assertFalse("Should skip: not(true)", shouldRead);
}
@Test
public void testAnd() {
// this test case must use a real predicate, not alwaysTrue(), or binding will simplify it out
- boolean shouldRead = new InclusiveManifestEvaluator(
- SPEC, and(lessThan("id", 5), greaterThanOrEqual("id", 0))).eval(FILE);
+ boolean shouldRead = ManifestEvaluator.forRowFilter(
+ and(lessThan("id", 5), greaterThanOrEqual("id", 0)), SPEC, true).eval(FILE);
Assert.assertFalse("Should skip: and(false, false)", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(
- SPEC, and(greaterThan("id", 5), lessThanOrEqual("id", 30))).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(
+ and(greaterThan("id", 5), lessThanOrEqual("id", 30)), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: and(true, true)", shouldRead);
}
@Test
public void testOr() {
// this test case must use a real predicate, not alwaysTrue(), or binding will simplify it out
- boolean shouldRead = new InclusiveManifestEvaluator(
- SPEC, or(lessThan("id", 5), greaterThanOrEqual("id", 80))).eval(FILE);
+ boolean shouldRead = ManifestEvaluator.forRowFilter(
+ or(lessThan("id", 5), greaterThanOrEqual("id", 80)), SPEC, true).eval(FILE);
Assert.assertFalse("Should skip: or(false, false)", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(
- SPEC, or(lessThan("id", 5), greaterThanOrEqual("id", 60))).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(
+ or(lessThan("id", 5), greaterThanOrEqual("id", 60)), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: or(false, true)", shouldRead);
}
@Test
public void testIntegerLt() {
- boolean shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 5)).eval(FILE);
+ boolean shouldRead = ManifestEvaluator.forRowFilter(lessThan("id", 5), SPEC, true).eval(FILE);
Assert.assertFalse("Should not read: id range below lower bound (5 < 30)", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 30)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(lessThan("id", 30), SPEC, true).eval(FILE);
Assert.assertFalse("Should not read: id range below lower bound (30 is not < 30)", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 31)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(lessThan("id", 31), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: one possible id", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 79)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(lessThan("id", 79), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: may possible ids", shouldRead);
}
@Test
public void testIntegerLtEq() {
- boolean shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 5)).eval(FILE);
+ boolean shouldRead = ManifestEvaluator.forRowFilter(lessThanOrEqual("id", 5), SPEC, true).eval(FILE);
Assert.assertFalse("Should not read: id range below lower bound (5 < 30)", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 29)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(lessThanOrEqual("id", 29), SPEC, true).eval(FILE);
Assert.assertFalse("Should not read: id range below lower bound (29 < 30)", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 30)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(lessThanOrEqual("id", 30), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: one possible id", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 79)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(lessThanOrEqual("id", 79), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: many possible ids", shouldRead);
}
@Test
public void testIntegerGt() {
- boolean shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 85)).eval(FILE);
+ boolean shouldRead = ManifestEvaluator.forRowFilter(greaterThan("id", 85), SPEC, true).eval(FILE);
Assert.assertFalse("Should not read: id range above upper bound (85 < 79)", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 79)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(greaterThan("id", 79), SPEC, true).eval(FILE);
Assert.assertFalse("Should not read: id range above upper bound (79 is not > 79)", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 78)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(greaterThan("id", 78), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: one possible id", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 75)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(greaterThan("id", 75), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: may possible ids", shouldRead);
}
@Test
public void testIntegerGtEq() {
- boolean shouldRead = new InclusiveManifestEvaluator(
- SPEC, greaterThanOrEqual("id", 85)).eval(FILE);
+ boolean shouldRead = ManifestEvaluator.forRowFilter(greaterThanOrEqual("id", 85), SPEC, true).eval(FILE);
Assert.assertFalse("Should not read: id range above upper bound (85 < 79)", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(
- SPEC, greaterThanOrEqual("id", 80)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(greaterThanOrEqual("id", 80), SPEC, true).eval(FILE);
Assert.assertFalse("Should not read: id range above upper bound (80 > 79)", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(
- SPEC, greaterThanOrEqual("id", 79)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(greaterThanOrEqual("id", 79), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: one possible id", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(
- SPEC, greaterThanOrEqual("id", 75)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(greaterThanOrEqual("id", 75), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: may possible ids", shouldRead);
}
@Test
public void testIntegerEq() {
- boolean shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 5)).eval(FILE);
+ boolean shouldRead = ManifestEvaluator.forRowFilter(equal("id", 5), SPEC, true).eval(FILE);
Assert.assertFalse("Should not read: id below lower bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 29)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(equal("id", 29), SPEC, true).eval(FILE);
Assert.assertFalse("Should not read: id below lower bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 30)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(equal("id", 30), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 75)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(equal("id", 75), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 79)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(equal("id", 79), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 80)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(equal("id", 80), SPEC, true).eval(FILE);
Assert.assertFalse("Should not read: id above upper bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 85)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(equal("id", 85), SPEC, true).eval(FILE);
Assert.assertFalse("Should not read: id above upper bound", shouldRead);
}
@Test
public void testIntegerNotEq() {
- boolean shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 5)).eval(FILE);
+ boolean shouldRead = ManifestEvaluator.forRowFilter(notEqual("id", 5), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: id below lower bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 29)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(notEqual("id", 29), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: id below lower bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 30)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(notEqual("id", 30), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 75)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(notEqual("id", 75), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 79)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(notEqual("id", 79), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 80)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(notEqual("id", 80), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: id above upper bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 85)).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(notEqual("id", 85), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: id above upper bound", shouldRead);
}
@Test
public void testIntegerNotEqRewritten() {
- boolean shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 5))).eval(FILE);
+ boolean shouldRead = ManifestEvaluator.forRowFilter(not(equal("id", 5)), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: id below lower bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 29))).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(not(equal("id", 29)), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: id below lower bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 30))).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(not(equal("id", 30)), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 75))).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(not(equal("id", 75)), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 79))).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(not(equal("id", 79)), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 80))).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(not(equal("id", 80)), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: id above upper bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 85))).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(not(equal("id", 85)), SPEC, true).eval(FILE);
Assert.assertTrue("Should read: id above upper bound", shouldRead);
}
@Test
public void testCaseInsensitiveIntegerNotEqRewritten() {
- boolean shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 5)), false).eval(FILE);
+ boolean shouldRead = ManifestEvaluator.forRowFilter(not(equal("ID", 5)), SPEC, false).eval(FILE);
Assert.assertTrue("Should read: id below lower bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 29)), false).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(not(equal("ID", 29)), SPEC, false).eval(FILE);
Assert.assertTrue("Should read: id below lower bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 30)), false).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(not(equal("ID", 30)), SPEC, false).eval(FILE);
Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 75)), false).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(not(equal("ID", 75)), SPEC, false).eval(FILE);
Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 79)), false).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(not(equal("ID", 79)), SPEC, false).eval(FILE);
Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 80)), false).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(not(equal("ID", 80)), SPEC, false).eval(FILE);
Assert.assertTrue("Should read: id above upper bound", shouldRead);
- shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 85)), false).eval(FILE);
+ shouldRead = ManifestEvaluator.forRowFilter(not(equal("ID", 85)), SPEC, false).eval(FILE);
Assert.assertTrue("Should read: id above upper bound", shouldRead);
}
@@ -320,6 +316,6 @@ public class TestInclusiveManifestEvaluator {
public void testCaseSensitiveIntegerNotEqRewritten() {
TestHelpers.assertThrows("Should complain about missing column in expression",
ValidationException.class, "Cannot find field 'ID'",
- () -> new InclusiveManifestEvaluator(SPEC, not(equal("ID", 5)), true).eval(FILE));
+ () -> ManifestEvaluator.forRowFilter(not(equal("ID", 5)), SPEC, true).eval(FILE));
}
}
diff --git a/api/src/test/java/org/apache/iceberg/transforms/TestResiduals.java b/api/src/test/java/org/apache/iceberg/transforms/TestResiduals.java
index 3b60f74..f3ca075 100644
--- a/api/src/test/java/org/apache/iceberg/transforms/TestResiduals.java
+++ b/api/src/test/java/org/apache/iceberg/transforms/TestResiduals.java
@@ -24,6 +24,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.TestHelpers.Row;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.types.Types;
@@ -53,7 +54,7 @@ public class TestResiduals {
.identity("dateint")
.build();
- ResidualEvaluator resEval = new ResidualEvaluator(spec, or(or(
+ ResidualEvaluator resEval = ResidualEvaluator.of(spec, or(or(
and(lessThan("dateint", 20170815), greaterThan("dateint", 20170801)),
and(equal("dateint", 20170815), lessThan("hour", 12))),
and(equal("dateint", 20170801), greaterThan("hour", 11))),
@@ -93,7 +94,7 @@ public class TestResiduals {
.identity("dateint")
.build();
- ResidualEvaluator resEval = new ResidualEvaluator(spec, or(or(
+ ResidualEvaluator resEval = ResidualEvaluator.of(spec, or(or(
and(lessThan("DATEINT", 20170815), greaterThan("dateint", 20170801)),
and(equal("dateint", 20170815), lessThan("HOUR", 12))),
and(equal("DateInt", 20170801), greaterThan("hOUr", 11))),
@@ -133,8 +134,26 @@ public class TestResiduals {
.identity("dateint")
.build();
- ResidualEvaluator resEval = new ResidualEvaluator(spec, lessThan("DATEINT", 20170815), true);
+ ResidualEvaluator resEval = ResidualEvaluator.of(spec, lessThan("DATEINT", 20170815), true);
resEval.residualFor(Row.of(20170815));
}
+
+ @Test
+ public void testUnpartitionedResiduals() {
+ Expression[] expressions = new Expression[] {
+ Expressions.alwaysTrue(),
+ Expressions.alwaysFalse(),
+ Expressions.lessThan("a", 5),
+ Expressions.greaterThanOrEqual("b", 16),
+ Expressions.notNull("c"),
+ Expressions.isNull("d")
+ };
+
+ for (Expression expr : expressions) {
+ ResidualEvaluator residualEvaluator = ResidualEvaluator.of(PartitionSpec.unpartitioned(), expr, true);
+ Assert.assertEquals("Should return expression",
+ expr, residualEvaluator.residualFor(Row.of()));
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
similarity index 57%
copy from core/src/main/java/org/apache/iceberg/BaseTable.java
copy to core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
index 32182ab..4bebc13 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
@@ -19,157 +19,129 @@
package org.apache.iceberg;
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
import java.util.Map;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
-/**
- * Base {@link Table} implementation.
- * <p>
- * This can be extended by providing a {@link TableOperations} to the constructor.
- */
-public class BaseTable implements Table, HasTableOperations {
- private final TableOperations ops;
- private final String name;
-
- public BaseTable(TableOperations ops, String name) {
- this.ops = ops;
- this.name = name;
- }
+abstract class BaseMetadataTable implements Table {
+ abstract Table table();
+ abstract String metadataTableName();
@Override
- public TableOperations operations() {
- return ops;
+ public FileIO io() {
+ return table().io();
}
@Override
- public void refresh() {
- ops.refresh();
+ public EncryptionManager encryption() {
+ return table().encryption();
}
@Override
- public TableScan newScan() {
- return new BaseTableScan(ops, this);
+ public LocationProvider locationProvider() {
+ return table().locationProvider();
}
@Override
- public Schema schema() {
- return ops.current().schema();
+ public void refresh() {
+ table().refresh();
}
@Override
public PartitionSpec spec() {
- return ops.current().spec();
+ return PartitionSpec.unpartitioned();
}
@Override
public Map<String, String> properties() {
- return ops.current().properties();
+ return ImmutableMap.of();
}
@Override
- public String location() {
- return ops.current().location();
+ public Snapshot currentSnapshot() {
+ return table().currentSnapshot();
}
@Override
- public Snapshot currentSnapshot() {
- return ops.current().currentSnapshot();
+ public Iterable<Snapshot> snapshots() {
+ return table().snapshots();
}
@Override
public Snapshot snapshot(long snapshotId) {
- return ops.current().snapshot(snapshotId);
+ return table().snapshot(snapshotId);
}
@Override
- public Iterable<Snapshot> snapshots() {
- return ops.current().snapshots();
+ public List<HistoryEntry> history() {
+ return table().history();
}
@Override
public UpdateSchema updateSchema() {
- return new SchemaUpdate(ops);
+ throw new UnsupportedOperationException("Cannot update the schema of a metadata table");
}
@Override
public UpdateProperties updateProperties() {
- return new PropertiesUpdate(ops);
+ throw new UnsupportedOperationException("Cannot update the properties of a metadata table");
}
@Override
public UpdateLocation updateLocation() {
- return new SetLocation(ops);
+ throw new UnsupportedOperationException("Cannot update the location of a metadata table");
}
@Override
public AppendFiles newAppend() {
- return new MergeAppend(ops);
- }
-
- @Override
- public AppendFiles newFastAppend() {
- return new FastAppend(ops);
+ throw new UnsupportedOperationException("Cannot append to a metadata table");
}
@Override
public RewriteFiles newRewrite() {
- return new ReplaceFiles(ops);
+ throw new UnsupportedOperationException("Cannot rewrite in a metadata table");
}
@Override
public RewriteManifests rewriteManifests() {
- return new ReplaceManifests(ops);
+ throw new UnsupportedOperationException("Cannot rewrite manifests in a metadata table");
}
@Override
public OverwriteFiles newOverwrite() {
- return new OverwriteData(ops);
+ throw new UnsupportedOperationException("Cannot overwrite in a metadata table");
}
@Override
public ReplacePartitions newReplacePartitions() {
- return new ReplacePartitionsOperation(ops);
+ throw new UnsupportedOperationException("Cannot replace partitions in a metadata table");
}
@Override
public DeleteFiles newDelete() {
- return new StreamingDelete(ops);
+ throw new UnsupportedOperationException("Cannot delete from a metadata table");
}
@Override
public ExpireSnapshots expireSnapshots() {
- return new RemoveSnapshots(ops);
+ throw new UnsupportedOperationException("Cannot expire snapshots from a metadata table");
}
@Override
public Rollback rollback() {
- return new RollbackToSnapshot(ops);
+ throw new UnsupportedOperationException("Cannot roll back a metadata table");
}
@Override
public Transaction newTransaction() {
- return Transactions.newTransaction(ops);
- }
-
- @Override
- public FileIO io() {
- return operations().io();
- }
-
- @Override
- public EncryptionManager encryption() {
- return operations().encryption();
- }
-
- @Override
- public LocationProvider locationProvider() {
- return operations().locationProvider();
+ throw new UnsupportedOperationException("Cannot create transactions for a metadata table");
}
@Override
public String toString() {
- return name;
+ return table().toString() + "." + metadataTableName();
}
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
index 9e2dcdd..06d0a1e 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
@@ -20,6 +20,7 @@
package org.apache.iceberg;
import com.google.common.collect.Maps;
+import java.util.Locale;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Catalog;
@@ -29,6 +30,22 @@ import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchTableException;
public abstract class BaseMetastoreCatalog implements Catalog {
+ enum TableType {
+ ENTRIES,
+ FILES,
+ HISTORY,
+ SNAPSHOTS,
+ MANIFESTS;
+
+ static TableType from(String name) {
+ try {
+ return TableType.valueOf(name.toUpperCase(Locale.ROOT));
+ } catch (IllegalArgumentException ignored) {
+ return null;
+ }
+ }
+ }
+
private final Configuration conf;
protected BaseMetastoreCatalog(Configuration conf) {
@@ -70,12 +87,42 @@ public abstract class BaseMetastoreCatalog implements Catalog {
public Table loadTable(TableIdentifier identifier) {
TableOperations ops = newTableOps(conf, identifier);
if (ops.current() == null) {
- throw new NoSuchTableException("Table does not exist: " + identifier);
+ String name = identifier.name();
+ TableType type = TableType.from(name);
+ if (type != null) {
+ return loadMetadataTable(TableIdentifier.of(identifier.namespace().levels()), type);
+ } else {
+ throw new NoSuchTableException("Table does not exist: " + identifier);
+ }
}
return new BaseTable(ops, identifier.toString());
}
+ private Table loadMetadataTable(TableIdentifier identifier, TableType type) {
+ TableOperations ops = newTableOps(conf, identifier);
+ if (ops.current() == null) {
+ throw new NoSuchTableException("Table does not exist: " + identifier);
+ }
+
+ Table baseTable = new BaseTable(ops, identifier.toString());
+
+ switch (type) {
+ case ENTRIES:
+ return new ManifestEntriesTable(ops, baseTable);
+ case FILES:
+ return new DataFilesTable(ops, baseTable);
+ case HISTORY:
+ return new HistoryTable(ops, baseTable);
+ case SNAPSHOTS:
+ return new SnapshotsTable(ops, baseTable);
+ case MANIFESTS:
+ return new ManifestsTable(ops, baseTable);
+ default:
+ throw new NoSuchTableException(String.format("Unknown metadata table type: %s for %s", type, identifier));
+ }
+ }
+
protected abstract TableOperations newTableOps(Configuration newConf, TableIdentifier tableIdentifier);
protected abstract String defaultWarehouseLocation(Configuration hadoopConf, TableIdentifier tableIdentifier);
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java
index 32182ab..7d4ef02 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -19,6 +19,7 @@
package org.apache.iceberg;
+import java.util.List;
import java.util.Map;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
@@ -50,7 +51,7 @@ public class BaseTable implements Table, HasTableOperations {
@Override
public TableScan newScan() {
- return new BaseTableScan(ops, this);
+ return new DataTableScan(ops, this);
}
@Override
@@ -89,6 +90,11 @@ public class BaseTable implements Table, HasTableOperations {
}
@Override
+ public List<HistoryEntry> history() {
+ return ops.current().snapshotLog();
+ }
+
+ @Override
public UpdateSchema updateSchema() {
return new SchemaUpdate(ops);
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index abbd4c7..b0c52d7 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -19,13 +19,9 @@
package org.apache.iceberg;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.time.Instant;
import java.time.LocalDateTime;
@@ -33,42 +29,27 @@ import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import java.util.Set;
import java.util.function.Function;
-import org.apache.iceberg.TableMetadata.SnapshotLogEntry;
import org.apache.iceberg.events.Listeners;
import org.apache.iceberg.events.ScanEvent;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.expressions.InclusiveManifestEvaluator;
-import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.BinPacking;
-import org.apache.iceberg.util.ParallelIterable;
-import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class for {@link TableScan} implementations.
*/
-class BaseTableScan implements TableScan {
+@SuppressWarnings("checkstyle:OverloadMethodsDeclarationOrder")
+abstract class BaseTableScan implements TableScan {
private static final Logger LOG = LoggerFactory.getLogger(TableScan.class);
private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
- private static final List<String> SCAN_COLUMNS = ImmutableList.of(
- "snapshot_id", "file_path", "file_ordinal", "file_format", "block_size_in_bytes",
- "file_size_in_bytes", "record_count", "partition"
- );
- private static final List<String> SCAN_WITH_STATS_COLUMNS = ImmutableList.<String>builder()
- .addAll(SCAN_COLUMNS)
- .add("value_counts", "null_value_counts", "lower_bounds", "upper_bounds")
- .build();
- private static final boolean PLAN_SCANS_WITH_WORKER_POOL =
- SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true);
private final TableOperations ops;
private final Table table;
@@ -78,13 +59,12 @@ class BaseTableScan implements TableScan {
private final boolean caseSensitive;
private final boolean colStats;
private final Collection<String> selectedColumns;
- private final LoadingCache<Integer, InclusiveManifestEvaluator> evalCache;
- BaseTableScan(TableOperations ops, Table table) {
- this(ops, table, null, table.schema(), Expressions.alwaysTrue(), true, false, null);
+ protected BaseTableScan(TableOperations ops, Table table, Schema schema) {
+ this(ops, table, null, schema, Expressions.alwaysTrue(), true, false, null);
}
- private BaseTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
+ protected BaseTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
Expression rowFilter, boolean caseSensitive, boolean colStats,
Collection<String> selectedColumns) {
this.ops = ops;
@@ -95,12 +75,20 @@ class BaseTableScan implements TableScan {
this.caseSensitive = caseSensitive;
this.colStats = colStats;
this.selectedColumns = selectedColumns;
- this.evalCache = Caffeine.newBuilder().build(specId -> {
- PartitionSpec spec = ops.current().spec(specId);
- return new InclusiveManifestEvaluator(spec, rowFilter, caseSensitive);
- });
}
+ @SuppressWarnings("checkstyle:HiddenField")
+ protected abstract long targetSplitSize(TableOperations ops);
+
+ @SuppressWarnings("checkstyle:HiddenField")
+ protected abstract TableScan newRefinedScan(
+ TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
+ boolean caseSensitive, boolean colStats, Collection<String> selectedColumns);
+
+ @SuppressWarnings("checkstyle:HiddenField")
+ protected abstract CloseableIterable<FileScanTask> planFiles(
+ TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats);
+
@Override
public Table table() {
return table;
@@ -112,7 +100,7 @@ class BaseTableScan implements TableScan {
"Cannot override snapshot, already set to id=%s", scanSnapshotId);
Preconditions.checkArgument(ops.current().snapshot(scanSnapshotId) != null,
"Cannot find snapshot with ID %s", scanSnapshotId);
- return new BaseTableScan(ops, table, scanSnapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+ return newRefinedScan(ops, table, scanSnapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
}
@Override
@@ -121,7 +109,7 @@ class BaseTableScan implements TableScan {
"Cannot override snapshot, already set to id=%s", snapshotId);
Long lastSnapshotId = null;
- for (SnapshotLogEntry logEntry : ops.current().snapshotLog()) {
+ for (HistoryEntry logEntry : ops.current().snapshotLog()) {
if (logEntry.timestampMillis() <= timestampMillis) {
lastSnapshotId = logEntry.snapshotId();
}
@@ -137,28 +125,28 @@ class BaseTableScan implements TableScan {
@Override
public TableScan project(Schema projectedSchema) {
- return new BaseTableScan(
+ return newRefinedScan(
ops, table, snapshotId, projectedSchema, rowFilter, caseSensitive, colStats, selectedColumns);
}
@Override
public TableScan caseSensitive(boolean scanCaseSensitive) {
- return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, scanCaseSensitive, colStats, selectedColumns);
+ return newRefinedScan(ops, table, snapshotId, schema, rowFilter, scanCaseSensitive, colStats, selectedColumns);
}
@Override
public TableScan includeColumnStats() {
- return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, true, selectedColumns);
+ return newRefinedScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, true, selectedColumns);
}
@Override
public TableScan select(Collection<String> columns) {
- return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, columns);
+ return newRefinedScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, columns);
}
@Override
public TableScan filter(Expression expr) {
- return new BaseTableScan(
+ return newRefinedScan(
ops, table, snapshotId, schema, Expressions.and(rowFilter, expr), caseSensitive, colStats, selectedColumns);
}
@@ -169,10 +157,7 @@ class BaseTableScan implements TableScan {
@Override
public CloseableIterable<FileScanTask> planFiles() {
- Snapshot snapshot = snapshotId != null ?
- ops.current().snapshot(snapshotId) :
- ops.current().currentSnapshot();
-
+ Snapshot snapshot = snapshot();
if (snapshot != null) {
LOG.info("Scanning table {} snapshot {} created at {} with filter {}", table,
snapshot.snapshotId(), formatTimestampMillis(snapshot.timestampMillis()),
@@ -181,30 +166,7 @@ class BaseTableScan implements TableScan {
Listeners.notifyAll(
new ScanEvent(table.toString(), snapshot.snapshotId(), rowFilter, schema()));
- Iterable<ManifestFile> matchingManifests = Iterables.filter(snapshot.manifests(),
- manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
-
- Iterable<CloseableIterable<FileScanTask>> readers = Iterables.transform(
- matchingManifests,
- manifest -> {
- ManifestReader reader = ManifestReader
- .read(ops.io().newInputFile(manifest.path()), ops.current()::spec)
- .caseSensitive(caseSensitive);
- PartitionSpec spec = ops.current().spec(manifest.partitionSpecId());
- String schemaString = SchemaParser.toJson(spec.schema());
- String specString = PartitionSpecParser.toJson(spec);
- ResidualEvaluator residuals = new ResidualEvaluator(spec, rowFilter, caseSensitive);
- return CloseableIterable.transform(
- reader.filterRows(rowFilter).select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS),
- file -> new BaseFileScanTask(file, schemaString, specString, residuals)
- );
- });
-
- if (PLAN_SCANS_WITH_WORKER_POOL && snapshot.manifests().size() > 1) {
- return new ParallelIterable<>(readers, ThreadPools.getWorkerPool());
- } else {
- return CloseableIterable.concat(readers);
- }
+ return planFiles(ops, snapshot, rowFilter, caseSensitive, colStats);
} else {
LOG.info("Scanning empty table {}", table);
@@ -214,8 +176,7 @@ class BaseTableScan implements TableScan {
@Override
public CloseableIterable<CombinedScanTask> planTasks() {
- long splitSize = ops.current().propertyAsLong(
- TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT);
+ long splitSize = targetSplitSize(ops);
int lookback = ops.current().propertyAsInt(
TableProperties.SPLIT_LOOKBACK, TableProperties.SPLIT_LOOKBACK_DEFAULT);
long openFileCost = ops.current().propertyAsLong(
@@ -237,6 +198,13 @@ class BaseTableScan implements TableScan {
}
@Override
+ public Snapshot snapshot() {
+ return snapshotId != null ?
+ ops.current().snapshot(snapshotId) :
+ ops.current().currentSnapshot();
+ }
+
+ @Override
public boolean isCaseSensitive() {
return caseSensitive;
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index ee70433..1353bb7 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -367,6 +367,11 @@ class BaseTransaction implements Transaction {
}
@Override
+ public List<HistoryEntry> history() {
+ return current.snapshotLog();
+ }
+
+ @Override
public UpdateSchema updateSchema() {
return BaseTransaction.this.updateSchema();
}
diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java b/core/src/main/java/org/apache/iceberg/DataFiles.java
index 1819b64..7478d2f 100644
--- a/core/src/main/java/org/apache/iceberg/DataFiles.java
+++ b/core/src/main/java/org/apache/iceberg/DataFiles.java
@@ -150,6 +150,17 @@ public class DataFiles {
location, format, partition, file.getLength(), metrics, keyMetadata.buffer(), splitOffsets);
}
+ public static DataFile fromManifest(ManifestFile manifest) {
+ Preconditions.checkArgument(
+ manifest.addedFilesCount() != null && manifest.existingFilesCount() != null,
+ "Cannot create data file from manifest: data file counts are missing.");
+
+ return new GenericDataFile(manifest.path(),
+ FileFormat.AVRO,
+ manifest.addedFilesCount() + manifest.existingFilesCount(),
+ manifest.length());
+ }
+
public static Builder builder(PartitionSpec spec) {
return new Builder(spec);
}
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
new file mode 100644
index 0000000..63ca855
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collection;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+
+/**
+ * A {@link Table} implementation that exposes a table's data files as rows.
+ */
+class DataFilesTable extends BaseMetadataTable {
+ private final TableOperations ops;
+ private final Table table;
+
+ DataFilesTable(TableOperations ops, Table table) {
+ this.ops = ops;
+ this.table = table;
+ }
+
+ @Override
+ Table table() {
+ return table;
+ }
+
+ @Override
+ String metadataTableName() {
+ return "files";
+ }
+
+ @Override
+ public TableScan newScan() {
+ return new FilesTableScan(ops, table);
+ }
+
+ @Override
+ public Schema schema() {
+ return new Schema(DataFile.getType(table.spec().partitionType()).fields());
+ }
+
+ @Override
+ public String location() {
+ return table.currentSnapshot().manifestListLocation();
+ }
+
+ public static class FilesTableScan extends BaseTableScan {
+ private static final long TARGET_SPLIT_SIZE = 32 * 1024 * 1024; // 32 MB
+
+ FilesTableScan(TableOperations ops, Table table) {
+ super(ops, table, ManifestEntry.getSchema(table.spec().partitionType()));
+ }
+
+ private FilesTableScan(
+ TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
+ boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+ super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+ }
+
+ @Override
+ protected TableScan newRefinedScan(
+ TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
+ boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+ return new FilesTableScan(
+ ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+ }
+
+ @Override
+ protected long targetSplitSize(TableOperations ops) {
+ return TARGET_SPLIT_SIZE;
+ }
+
+ @Override
+ protected CloseableIterable<FileScanTask> planFiles(
+ TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
+ CloseableIterable<ManifestFile> manifests = Avro
+ .read(ops.io().newInputFile(snapshot.manifestListLocation()))
+ .rename("manifest_file", GenericManifestFile.class.getName())
+ .rename("partitions", GenericPartitionFieldSummary.class.getName())
+ // 508 is the id used for the partition field, and r508 is the record name created for it in Avro schemas
+ .rename("r508", GenericPartitionFieldSummary.class.getName())
+ .project(ManifestFile.schema())
+ .reuseContainers(false)
+ .build();
+
+ String schemaString = SchemaParser.toJson(schema());
+ String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
+
+ return CloseableIterable.transform(manifests, manifest ->
+ new ManifestReadTask(ops.io(), new BaseFileScanTask(
+ DataFiles.fromManifest(manifest), schemaString, specString, ResidualEvaluator.unpartitioned(rowFilter))));
+ }
+ }
+
+ private static class ManifestReadTask implements DataTask {
+ private final FileIO io;
+ private final FileScanTask manifestTask;
+
+ private ManifestReadTask(FileIO io, FileScanTask manifestTask) {
+ this.io = io;
+ this.manifestTask = manifestTask;
+ }
+
+ @Override
+ public CloseableIterable<StructLike> rows() {
+ return CloseableIterable.transform(
+ ManifestReader.read(io.newInputFile(manifestTask.file().path().toString())),
+ file -> (GenericDataFile) file);
+ }
+
+ @Override
+ public DataFile file() {
+ return manifestTask.file();
+ }
+
+ @Override
+ public PartitionSpec spec() {
+ return manifestTask.spec();
+ }
+
+ @Override
+ public long start() {
+ return 0;
+ }
+
+ @Override
+ public long length() {
+ return manifestTask.length();
+ }
+
+ @Override
+ public Expression residual() {
+ return manifestTask.residual();
+ }
+
+ @Override
+ public Iterable<FileScanTask> split(long splitSize) {
+ return ImmutableList.of(this); // don't split
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java
new file mode 100644
index 0000000..8ae9b31
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.List;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataTableScan extends BaseTableScan {
+ private static final Logger LOG = LoggerFactory.getLogger(DataTableScan.class);
+
+ private static final List<String> SCAN_COLUMNS = ImmutableList.of(
+ "snapshot_id", "file_path", "file_ordinal", "file_format", "block_size_in_bytes",
+ "file_size_in_bytes", "record_count", "partition"
+ );
+ private static final List<String> SCAN_WITH_STATS_COLUMNS = ImmutableList.<String>builder()
+ .addAll(SCAN_COLUMNS)
+ .add("value_counts", "null_value_counts", "lower_bounds", "upper_bounds")
+ .build();
+ private static final boolean PLAN_SCANS_WITH_WORKER_POOL =
+ SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true);
+
+ public DataTableScan(TableOperations ops, Table table) {
+ super(ops, table, table.schema());
+ }
+
+ protected DataTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
+ Expression rowFilter, boolean caseSensitive, boolean colStats,
+ Collection<String> selectedColumns) {
+ super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+ }
+
+ @Override
+ protected TableScan newRefinedScan(
+ TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
+ boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+ return new DataTableScan(
+ ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+ }
+
+ public CloseableIterable<FileScanTask> planFiles(TableOperations ops, Snapshot snapshot,
+ Expression rowFilter, boolean caseSensitive, boolean colStats) {
+ LoadingCache<Integer, ManifestEvaluator> evalCache = Caffeine.newBuilder().build(specId -> {
+ PartitionSpec spec = ops.current().spec(specId);
+ return ManifestEvaluator.forRowFilter(rowFilter, spec, caseSensitive);
+ });
+
+ Iterable<ManifestFile> nonEmptyManifests = Iterables.filter(snapshot.manifests(),
+ manifest -> manifest.hasAddedFiles() || manifest.hasExistingFiles());
+ Iterable<ManifestFile> matchingManifests = Iterables.filter(nonEmptyManifests,
+ manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+ Iterable<CloseableIterable<FileScanTask>> readers = Iterables.transform(
+ matchingManifests,
+ manifest -> {
+ ManifestReader reader = ManifestReader
+ .read(ops.io().newInputFile(manifest.path()), ops.current()::spec)
+ .caseSensitive(caseSensitive);
+ PartitionSpec spec = ops.current().spec(manifest.partitionSpecId());
+ String schemaString = SchemaParser.toJson(spec.schema());
+ String specString = PartitionSpecParser.toJson(spec);
+ ResidualEvaluator residuals = ResidualEvaluator.of(spec, rowFilter, caseSensitive);
+ return CloseableIterable.transform(
+ reader.filterRows(rowFilter).select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS),
+ file -> new BaseFileScanTask(file, schemaString, specString, residuals)
+ );
+ });
+
+ if (PLAN_SCANS_WITH_WORKER_POOL && snapshot.manifests().size() > 1) {
+ return new ParallelIterable<>(readers, ThreadPools.getWorkerPool());
+ } else {
+ return CloseableIterable.concat(readers);
+ }
+ }
+
+ protected long targetSplitSize(TableOperations ops) {
+ return ops.current().propertyAsLong(
+ TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/FilteredManifest.java b/core/src/main/java/org/apache/iceberg/FilteredManifest.java
index f4ce193..73bb86f 100644
--- a/core/src/main/java/org/apache/iceberg/FilteredManifest.java
+++ b/core/src/main/java/org/apache/iceberg/FilteredManifest.java
@@ -20,7 +20,6 @@
package org.apache.iceberg;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -35,6 +34,7 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.InclusiveMetricsEvaluator;
import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.io.CloseableIterable;
public class FilteredManifest implements Filterable<FilteredManifest> {
private static final Set<String> STATS_COLUMNS = Sets.newHashSet(
@@ -84,13 +84,13 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
caseSensitive);
}
- Iterable<ManifestEntry> allEntries() {
+ CloseableIterable<ManifestEntry> allEntries() {
if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
(partFilter != null && partFilter != Expressions.alwaysTrue())) {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
- return Iterables.filter(reader.entries(columns),
+ return CloseableIterable.filter(reader.entries(columns),
entry -> entry != null &&
evaluator.eval(entry.file().partition()) &&
metricsEvaluator.eval(entry.file()));
@@ -100,20 +100,20 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
}
}
- Iterable<ManifestEntry> liveEntries() {
+ CloseableIterable<ManifestEntry> liveEntries() {
if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
(partFilter != null && partFilter != Expressions.alwaysTrue())) {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
- return Iterables.filter(reader.entries(columns),
+ return CloseableIterable.filter(reader.entries(columns),
entry -> entry != null &&
entry.status() != Status.DELETED &&
evaluator.eval(entry.file().partition()) &&
metricsEvaluator.eval(entry.file()));
} else {
- return Iterables.filter(reader.entries(columns),
+ return CloseableIterable.filter(reader.entries(columns),
entry -> entry != null && entry.status() != Status.DELETED);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/HistoryTable.java b/core/src/main/java/org/apache/iceberg/HistoryTable.java
new file mode 100644
index 0000000..d7070f7
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/HistoryTable.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+
+/**
+ * A {@link Table} implementation that exposes a table's history as rows.
+ * <p>
+ * History is based on the table's snapshot log, which logs each update to the table's current snapshot.
+ */
+class HistoryTable extends BaseMetadataTable {
+ private static final Schema HISTORY_SCHEMA = new Schema(
+ Types.NestedField.required(1, "made_current_at", Types.TimestampType.withZone()),
+ Types.NestedField.required(2, "snapshot_id", Types.LongType.get()),
+ Types.NestedField.optional(3, "parent_id", Types.LongType.get()),
+ Types.NestedField.required(4, "is_current_ancestor", Types.BooleanType.get())
+ );
+
+ private final TableOperations ops;
+ private final Table table;
+
+ HistoryTable(TableOperations ops, Table table) {
+ this.ops = ops;
+ this.table = table;
+ }
+
+ @Override
+ Table table() {
+ return table;
+ }
+
+ @Override
+ String metadataTableName() {
+ return "history";
+ }
+
+ @Override
+ public TableScan newScan() {
+ return new HistoryScan();
+ }
+
+ @Override
+ public String location() {
+ return ops.current().file().location();
+ }
+
+ @Override
+ public Schema schema() {
+ return HISTORY_SCHEMA;
+ }
+
+ private DataTask task(TableScan scan) {
+ return StaticDataTask.of(ops.current().file(), ops.current().snapshotLog(), convertHistoryEntryFunc(table));
+ }
+
+ private class HistoryScan extends StaticTableScan {
+ HistoryScan() {
+ super(ops, table, HISTORY_SCHEMA, HistoryTable.this::task);
+ }
+ }
+
+ private static Function<HistoryEntry, StaticDataTask.Row> convertHistoryEntryFunc(Table table) {
+ Map<Long, Snapshot> snapshots = Maps.newHashMap();
+ for (Snapshot snap : table.snapshots()) {
+ snapshots.put(snap.snapshotId(), snap);
+ }
+
+ Set<Long> ancestorIds = Sets.newHashSet(SnapshotUtil.currentAncestors(table));
+
+ return historyEntry -> {
+ long snapshotId = historyEntry.snapshotId();
+ Snapshot snap = snapshots.get(snapshotId);
+ return StaticDataTask.Row.of(
+ historyEntry.timestampMillis() * 1000,
+ historyEntry.snapshotId(),
+ snap != null ? snap.parentId() : null,
+ ancestorIds.contains(snapshotId)
+ );
+ };
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
new file mode 100644
index 0000000..848b0c3
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Collection;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+
+/**
+ * A {@link Table} implementation that exposes a table's manifest entries as rows.
+ * <p>
+ * WARNING: this table exposes internal details, like files that have been deleted. For a table of the live data files,
+ * use {@link DataFilesTable}.
+ */
+class ManifestEntriesTable extends BaseMetadataTable {
+ private final TableOperations ops;
+ private final Table table;
+
+ ManifestEntriesTable(TableOperations ops, Table table) {
+ this.ops = ops;
+ this.table = table;
+ }
+
+ @Override
+ Table table() {
+ return table;
+ }
+
+ @Override
+ String metadataTableName() {
+ return "entries";
+ }
+
+ @Override
+ public TableScan newScan() {
+ return new EntriesTableScan(ops, table);
+ }
+
+ @Override
+ public Schema schema() {
+ return ManifestEntry.getSchema(table.spec().partitionType());
+ }
+
+ @Override
+ public String location() {
+ return table.currentSnapshot().manifestListLocation();
+ }
+
+ private static class EntriesTableScan extends BaseTableScan {
+ private static final long TARGET_SPLIT_SIZE = 32 * 1024 * 1024; // 32 MB
+
+ EntriesTableScan(TableOperations ops, Table table) {
+ super(ops, table, ManifestEntry.getSchema(table.spec().partitionType()));
+ }
+
+ private EntriesTableScan(
+ TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
+ boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+ super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+ }
+
+ @Override
+ protected TableScan newRefinedScan(
+ TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
+ boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+ return new EntriesTableScan(
+ ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+ }
+
+ @Override
+ protected long targetSplitSize(TableOperations ops) {
+ return TARGET_SPLIT_SIZE;
+ }
+
+ @Override
+ protected CloseableIterable<FileScanTask> planFiles(
+ TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
+ CloseableIterable<ManifestFile> manifests = Avro
+ .read(ops.io().newInputFile(snapshot.manifestListLocation()))
+ .rename("manifest_file", GenericManifestFile.class.getName())
+ .rename("partitions", GenericPartitionFieldSummary.class.getName())
+ // 508 is the id used for the partition field, and r508 is the record name created for it in Avro schemas
+ .rename("r508", GenericPartitionFieldSummary.class.getName())
+ .project(ManifestFile.schema())
+ .reuseContainers(false)
+ .build();
+
+ String schemaString = SchemaParser.toJson(schema());
+ String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
+
+ return CloseableIterable.transform(manifests, manifest -> new BaseFileScanTask(
+ DataFiles.fromManifest(manifest), schemaString, specString, ResidualEvaluator.unpartitioned(rowFilter)));
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index 0d8e6c4..5381e8e 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -31,7 +31,8 @@ import java.util.Set;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.expressions.InclusiveManifestEvaluator;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
@@ -42,52 +43,91 @@ class ManifestGroup {
private final Set<ManifestFile> manifests;
private final Expression dataFilter;
private final Expression fileFilter;
+ private final Expression partitionFilter;
private final boolean ignoreDeleted;
+ private final boolean ignoreExisting;
private final List<String> columns;
+ private final boolean caseSensitive;
- private final LoadingCache<Integer, InclusiveManifestEvaluator> evalCache;
+ private final LoadingCache<Integer, ManifestEvaluator> evalCache;
ManifestGroup(TableOperations ops, Iterable<ManifestFile> manifests) {
this(ops, Sets.newHashSet(manifests), Expressions.alwaysTrue(), Expressions.alwaysTrue(),
- false, ImmutableList.of("*"));
+ Expressions.alwaysTrue(), false, false, ImmutableList.of("*"), true);
}
private ManifestGroup(TableOperations ops, Set<ManifestFile> manifests,
- Expression dataFilter, Expression fileFilter, boolean ignoreDeleted,
- List<String> columns) {
+ Expression dataFilter, Expression fileFilter, Expression partitionFilter,
+ boolean ignoreDeleted, boolean ignoreExisting, List<String> columns,
+ boolean caseSensitive) {
this.ops = ops;
this.manifests = manifests;
this.dataFilter = dataFilter;
this.fileFilter = fileFilter;
+ this.partitionFilter = partitionFilter;
this.ignoreDeleted = ignoreDeleted;
+ this.ignoreExisting = ignoreExisting;
this.columns = columns;
+ this.caseSensitive = caseSensitive;
this.evalCache = Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = ops.current().spec(specId);
- return new InclusiveManifestEvaluator(spec, dataFilter);
+ return ManifestEvaluator.forPartitionFilter(
+ Expressions.and(partitionFilter, Projections.inclusive(spec).project(dataFilter)),
+ spec, caseSensitive);
});
}
+ public ManifestGroup caseSensitive(boolean filterCaseSensitive) {
+ return new ManifestGroup(ops, manifests, dataFilter, fileFilter, partitionFilter,
+ ignoreDeleted, ignoreExisting, columns, filterCaseSensitive);
+ }
+
public ManifestGroup filterData(Expression expr) {
return new ManifestGroup(
- ops, manifests, Expressions.and(dataFilter, expr), fileFilter, ignoreDeleted, columns);
+ ops, manifests, Expressions.and(dataFilter, expr), fileFilter, partitionFilter,
+ ignoreDeleted, ignoreExisting, columns, caseSensitive);
}
public ManifestGroup filterFiles(Expression expr) {
return new ManifestGroup(
- ops, manifests, dataFilter, Expressions.and(fileFilter, expr), ignoreDeleted, columns);
+ ops, manifests, dataFilter, Expressions.and(fileFilter, expr), partitionFilter,
+ ignoreDeleted, ignoreExisting, columns, caseSensitive);
+ }
+
+ public ManifestGroup filterPartitions(Expression expr) {
+ return new ManifestGroup(
+ ops, manifests, dataFilter, fileFilter, Expressions.and(fileFilter, expr),
+ ignoreDeleted, ignoreExisting, columns, caseSensitive);
}
public ManifestGroup ignoreDeleted() {
- return new ManifestGroup(ops, manifests, dataFilter, fileFilter, true, columns);
+ return new ManifestGroup(ops, manifests, dataFilter, fileFilter, partitionFilter, true,
+ ignoreExisting, columns, caseSensitive);
+ }
+
+ public ManifestGroup ignoreDeleted(boolean shouldIgnoreDeleted) {
+ return new ManifestGroup(ops, manifests, dataFilter, fileFilter, partitionFilter,
+ shouldIgnoreDeleted, ignoreExisting, columns, caseSensitive);
+ }
+
+ public ManifestGroup ignoreExisting() {
+ return new ManifestGroup(ops, manifests, dataFilter, fileFilter, partitionFilter,
+ ignoreDeleted, true, columns, caseSensitive);
}
- public ManifestGroup select(List<String> selectedColumns) {
+ public ManifestGroup ignoreExisting(boolean shouldIgnoreExisting) {
+ return new ManifestGroup(ops, manifests, dataFilter, fileFilter, partitionFilter,
+ ignoreDeleted, shouldIgnoreExisting, columns, caseSensitive);
+ }
+
+ public ManifestGroup select(List<String> columnNames) {
return new ManifestGroup(
- ops, manifests, dataFilter, fileFilter, ignoreDeleted, Lists.newArrayList(selectedColumns));
+ ops, manifests, dataFilter, fileFilter, partitionFilter, ignoreDeleted, ignoreExisting,
+ Lists.newArrayList(columnNames), caseSensitive);
}
- public ManifestGroup select(String... selectedColumns) {
- return select(Arrays.asList(selectedColumns));
+ public ManifestGroup select(String... columnNames) {
+ return select(Arrays.asList(columnNames));
}
/**
@@ -105,11 +145,19 @@ class ManifestGroup {
manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
if (ignoreDeleted) {
+ // only scan manifests that have entries other than deletes
// remove any manifests that don't have any existing or added files. if either the added or
// existing files count is missing, the manifest must be scanned.
- matchingManifests = Iterables.filter(manifests, manifest ->
- manifest.addedFilesCount() == null || manifest.existingFilesCount() == null ||
- manifest.addedFilesCount() + manifest.existingFilesCount() > 0);
+ matchingManifests = Iterables.filter(manifests,
+ manifest -> manifest.hasAddedFiles() || manifest.hasExistingFiles());
+ }
+
+ if (ignoreExisting) {
+ // only scan manifests that have entries other than existing
+ // remove any manifests that don't have any deleted or added files. if either the added or
+ // deleted files count is missing, the manifest must be scanned.
+ matchingManifests = Iterables.filter(manifests,
+ manifest -> manifest.hasAddedFiles() || manifest.hasDeletedFiles());
}
Iterable<CloseableIterable<ManifestEntry>> readers = Iterables.transform(
@@ -118,12 +166,28 @@ class ManifestGroup {
ManifestReader reader = ManifestReader.read(
ops.io().newInputFile(manifest.path()),
ops.current()::spec);
- FilteredManifest filtered = reader.filterRows(dataFilter).select(columns);
- return CloseableIterable.combine(
- Iterables.filter(
- ignoreDeleted ? filtered.liveEntries() : filtered.allEntries(),
- entry -> evaluator.eval((GenericDataFile) entry.file())),
- reader);
+
+ FilteredManifest filtered = reader
+ .filterRows(dataFilter)
+ .filterPartitions(partitionFilter)
+ .select(columns);
+
+ CloseableIterable<ManifestEntry> entries = filtered.allEntries();
+ if (ignoreDeleted) {
+ entries = filtered.liveEntries();
+ }
+
+ if (ignoreExisting) {
+ entries = CloseableIterable.filter(entries,
+ entry -> entry.status() != ManifestEntry.Status.EXISTING);
+ }
+
+ if (fileFilter != null && fileFilter != Expressions.alwaysTrue()) {
+ entries = CloseableIterable.filter(entries,
+ entry -> evaluator.eval((GenericDataFile) entry.file()));
+ }
+
+ return entries;
});
return CloseableIterable.concat(readers);
diff --git a/core/src/main/java/org/apache/iceberg/ManifestsTable.java b/core/src/main/java/org/apache/iceberg/ManifestsTable.java
new file mode 100644
index 0000000..eb2875c
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ManifestsTable.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
+
+/**
+ * A {@link Table} implementation that exposes a table's manifest files as rows.
+ */
+class ManifestsTable extends BaseMetadataTable {
+ private static final Schema SNAPSHOT_SCHEMA = new Schema(
+ Types.NestedField.required(1, "path", Types.StringType.get()),
+ Types.NestedField.required(2, "length", Types.LongType.get()),
+ Types.NestedField.required(3, "partition_spec_id", Types.IntegerType.get()),
+ Types.NestedField.required(4, "added_snapshot_id", Types.LongType.get()),
+ Types.NestedField.required(5, "added_data_files_count", Types.IntegerType.get()),
+ Types.NestedField.required(6, "existing_data_files_count", Types.IntegerType.get()),
+ Types.NestedField.required(7, "deleted_data_files_count", Types.IntegerType.get()),
+ Types.NestedField.required(8, "partition_summaries", Types.ListType.ofRequired(9, Types.StructType.of(
+ Types.NestedField.required(10, "contains_null", Types.BooleanType.get()),
+ Types.NestedField.optional(11, "lower_bound", Types.StringType.get()),
+ Types.NestedField.optional(12, "upper_bound", Types.StringType.get())
+ )))
+ );
+
+ private final TableOperations ops;
+ private final Table table;
+ private final PartitionSpec spec;
+
+ ManifestsTable(TableOperations ops, Table table) {
+ this.ops = ops;
+ this.table = table;
+ this.spec = table.spec();
+ }
+
+ @Override
+ Table table() {
+ return table;
+ }
+
+ @Override
+ String metadataTableName() {
+ return "manifests";
+ }
+
+ @Override
+ public TableScan newScan() {
+ return new SnapshotsTableScan();
+ }
+
+ @Override
+ public String location() {
+ return ops.current().file().location();
+ }
+
+ @Override
+ public Schema schema() {
+ return SNAPSHOT_SCHEMA;
+ }
+
+ protected DataTask task(TableScan scan) {
+ return StaticDataTask.of(
+ ops.io().newInputFile(scan.snapshot().manifestListLocation()),
+ scan.snapshot().manifests(),
+ this::manifestFileToRow);
+ }
+
+ private class SnapshotsTableScan extends StaticTableScan {
+ SnapshotsTableScan() {
+ super(ops, table, SNAPSHOT_SCHEMA, ManifestsTable.this::task);
+ }
+ }
+
+ private StaticDataTask.Row manifestFileToRow(ManifestFile manifest) {
+ return StaticDataTask.Row.of(
+ manifest.path(),
+ manifest.length(),
+ manifest.partitionSpecId(),
+ manifest.snapshotId(),
+ manifest.addedFilesCount(),
+ manifest.existingFilesCount(),
+ manifest.deletedFilesCount(),
+ partitionSummariesToRows(manifest.partitions())
+ );
+ }
+
+ private List<StaticDataTask.Row> partitionSummariesToRows(List<ManifestFile.PartitionFieldSummary> summaries) {
+ List<StaticDataTask.Row> rows = Lists.newArrayList();
+
+ for (int i = 0; i < spec.fields().size(); i += 1) {
+ ManifestFile.PartitionFieldSummary summary = summaries.get(i);
+ rows.add(StaticDataTask.Row.of(
+ summary.containsNull(),
+ spec.fields().get(i).transform().toHumanString(
+ Conversions.fromByteBuffer(spec.partitionType().fields().get(i).type(), summary.lowerBound())),
+ spec.fields().get(i).transform().toHumanString(
+ Conversions.fromByteBuffer(spec.partitionType().fields().get(i).type(), summary.upperBound()))
+ ));
+ }
+
+ return rows;
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotsTable.java b/core/src/main/java/org/apache/iceberg/SnapshotsTable.java
new file mode 100644
index 0000000..0ac4377
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/SnapshotsTable.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.types.Types;
+
+/**
+ * A {@link Table} implementation that exposes a table's known snapshots as rows.
+ * <p>
+ * This does not include snapshots that have been expired using {@link ExpireSnapshots}.
+ */
+class SnapshotsTable extends BaseMetadataTable {
+ private static final Schema SNAPSHOT_SCHEMA = new Schema(
+ Types.NestedField.required(1, "committed_at", Types.TimestampType.withZone()),
+ Types.NestedField.required(2, "snapshot_id", Types.LongType.get()),
+ Types.NestedField.optional(3, "parent_id", Types.LongType.get()),
+ Types.NestedField.optional(4, "operation", Types.StringType.get()),
+ Types.NestedField.optional(5, "manifest_list", Types.StringType.get()),
+ Types.NestedField.optional(6, "summary",
+ Types.MapType.ofRequired(7, 8, Types.StringType.get(), Types.StringType.get()))
+ );
+
+ private final TableOperations ops;
+ private final Table table;
+
+ SnapshotsTable(TableOperations ops, Table table) {
+ this.ops = ops;
+ this.table = table;
+ }
+
+ @Override
+ Table table() {
+ return table;
+ }
+
+ @Override
+ String metadataTableName() {
+ return "snapshots";
+ }
+
+ @Override
+ public TableScan newScan() {
+ return new SnapshotsTableScan();
+ }
+
+ @Override
+ public String location() {
+ return ops.current().file().location();
+ }
+
+ @Override
+ public Schema schema() {
+ return SNAPSHOT_SCHEMA;
+ }
+
+ private DataTask task(BaseTableScan scan) {
+ return StaticDataTask.of(ops.current().file(), ops.current().snapshots(), SnapshotsTable::snapshotToRow);
+ }
+
+ private class SnapshotsTableScan extends StaticTableScan {
+ SnapshotsTableScan() {
+ super(ops, table, SNAPSHOT_SCHEMA, SnapshotsTable.this::task);
+ }
+ }
+
+ private static StaticDataTask.Row snapshotToRow(Snapshot snap) {
+ return StaticDataTask.Row.of(
+ snap.timestampMillis() * 1000,
+ snap.snapshotId(),
+ snap.parentId(),
+ snap.operation(),
+ snap.manifestListLocation(),
+ snap.summary()
+ );
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/StaticDataTask.java b/core/src/main/java/org/apache/iceberg/StaticDataTask.java
new file mode 100644
index 0000000..9daecd2
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/StaticDataTask.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.function.Function;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+
+class StaticDataTask implements DataTask {
+
+ static <T> DataTask of(InputFile metadata, Iterable<T> values, Function<T, Row> transform) {
+ return new StaticDataTask(metadata,
+ Lists.newArrayList(Iterables.transform(values, transform::apply)).toArray(new Row[0]));
+ }
+
+ private final DataFile metadataFile;
+ private final StructLike[] rows;
+
+ private StaticDataTask(InputFile metadata, StructLike[] rows) {
+ this.metadataFile = DataFiles.builder()
+ .withInputFile(metadata)
+ .withRecordCount(rows.length)
+ .withFormat(FileFormat.METADATA)
+ .build();
+ this.rows = rows;
+ }
+
+ @Override
+ public CloseableIterable<StructLike> rows() {
+ return CloseableIterable.withNoopClose(Arrays.asList(rows));
+ }
+
+ @Override
+ public DataFile file() {
+ return metadataFile;
+ }
+
+ @Override
+ public PartitionSpec spec() {
+ return PartitionSpec.unpartitioned();
+ }
+
+ @Override
+ public long start() {
+ return 0;
+ }
+
+ @Override
+ public long length() {
+ return metadataFile.fileSizeInBytes();
+ }
+
+ @Override
+ public Expression residual() {
+ return Expressions.alwaysTrue();
+ }
+
+ @Override
+ public Iterable<FileScanTask> split(long splitSize) {
+ return ImmutableList.of(this);
+ }
+
+ /**
+ * Implements {@link StructLike#get} for passing static rows.
+ */
+ static class Row implements StructLike, Serializable {
+ public static Row of(Object... values) {
+ return new Row(values);
+ }
+
+ private final Object[] values;
+
+ private Row(Object... values) {
+ this.values = values;
+ }
+
+ @Override
+ public int size() {
+ return values.length;
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ return javaClass.cast(values[pos]);
+ }
+
+ @Override
+ public <T> void set(int pos, T value) {
+ throw new UnsupportedOperationException("Setting values is not supported");
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/StaticTableScan.java b/core/src/main/java/org/apache/iceberg/StaticTableScan.java
new file mode 100644
index 0000000..c29c14b
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/StaticTableScan.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Collection;
+import java.util.function.Function;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+
+class StaticTableScan extends BaseTableScan {
+ private static final long TARGET_SPLIT_SIZE = 32 * 1024 * 1024; // 32 MB
+
+ private final Function<StaticTableScan, DataTask> buildTask;
+
+ StaticTableScan(TableOperations ops, Table table, Schema schema, Function<StaticTableScan, DataTask> buildTask) {
+ super(ops, table, schema);
+ this.buildTask = buildTask;
+ }
+
+ private StaticTableScan(
+ TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
+ boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ Function<StaticTableScan, DataTask> buildTask) {
+ super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+ this.buildTask = buildTask;
+ }
+
+ @Override
+ protected long targetSplitSize(TableOperations ops) {
+ return TARGET_SPLIT_SIZE;
+ }
+
+ @Override
+ protected TableScan newRefinedScan(
+ TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
+ boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+ return new StaticTableScan(
+ ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, buildTask);
+ }
+
+ @Override
+ protected CloseableIterable<FileScanTask> planFiles(
+ TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
+ return CloseableIterable.withNoopClose(buildTask.apply(this));
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 342fb5a..ef289b4 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -81,7 +81,7 @@ public class TableMetadata {
ImmutableMap.copyOf(properties), -1, ImmutableList.of(), ImmutableList.of());
}
- public static class SnapshotLogEntry {
+ public static class SnapshotLogEntry implements HistoryEntry {
private final long timestampMillis;
private final long snapshotId;
@@ -90,10 +90,12 @@ public class TableMetadata {
this.snapshotId = snapshotId;
}
+ @Override
public long timestampMillis() {
return timestampMillis;
}
+ @Override
public long snapshotId() {
return snapshotId;
}
@@ -140,7 +142,7 @@ public class TableMetadata {
private final List<Snapshot> snapshots;
private final Map<Long, Snapshot> snapshotsById;
private final Map<Integer, PartitionSpec> specsById;
- private final List<SnapshotLogEntry> snapshotLog;
+ private final List<HistoryEntry> snapshotLog;
TableMetadata(TableOperations ops,
InputFile file,
@@ -154,7 +156,7 @@ public class TableMetadata {
Map<String, String> properties,
long currentSnapshotId,
List<Snapshot> snapshots,
- List<SnapshotLogEntry> snapshotLog) {
+ List<HistoryEntry> snapshotLog) {
this.ops = ops;
this.file = file;
this.uuid = uuid;
@@ -172,8 +174,8 @@ public class TableMetadata {
this.snapshotsById = indexSnapshots(snapshots);
this.specsById = indexSpecs(specs);
- SnapshotLogEntry last = null;
- for (SnapshotLogEntry logEntry : snapshotLog) {
+ HistoryEntry last = null;
+ for (HistoryEntry logEntry : snapshotLog) {
if (last != null) {
Preconditions.checkArgument(
(logEntry.timestampMillis() - last.timestampMillis()) >= 0,
@@ -259,7 +261,7 @@ public class TableMetadata {
return snapshots;
}
- public List<SnapshotLogEntry> snapshotLog() {
+ public List<HistoryEntry> snapshotLog() {
return snapshotLog;
}
@@ -324,7 +326,7 @@ public class TableMetadata {
.addAll(snapshots)
.add(snapshot)
.build();
- List<SnapshotLogEntry> newSnapshotLog = ImmutableList.<SnapshotLogEntry>builder()
+ List<HistoryEntry> newSnapshotLog = ImmutableList.<HistoryEntry>builder()
.addAll(snapshotLog)
.add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId()))
.build();
@@ -344,8 +346,8 @@ public class TableMetadata {
// update the snapshot log
Set<Long> validIds = Sets.newHashSet(Iterables.transform(filtered, Snapshot::snapshotId));
- List<SnapshotLogEntry> newSnapshotLog = Lists.newArrayList();
- for (SnapshotLogEntry logEntry : snapshotLog) {
+ List<HistoryEntry> newSnapshotLog = Lists.newArrayList();
+ for (HistoryEntry logEntry : snapshotLog) {
if (validIds.contains(logEntry.snapshotId())) {
// copy the log entries that are still valid
newSnapshotLog.add(logEntry);
@@ -369,7 +371,7 @@ public class TableMetadata {
"Cannot set current snapshot to unknown: %s", snapshot.snapshotId());
long nowMillis = System.currentTimeMillis();
- List<SnapshotLogEntry> newSnapshotLog = ImmutableList.<SnapshotLogEntry>builder()
+ List<HistoryEntry> newSnapshotLog = ImmutableList.<HistoryEntry>builder()
.addAll(snapshotLog)
.add(new SnapshotLogEntry(nowMillis, snapshot.snapshotId()))
.build();
@@ -387,8 +389,8 @@ public class TableMetadata {
}
public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {
- List<SnapshotLogEntry> newSnapshotLog = Lists.newArrayList();
- for (SnapshotLogEntry logEntry : snapshotLog) {
+ List<HistoryEntry> newSnapshotLog = Lists.newArrayList();
+ for (HistoryEntry logEntry : snapshotLog) {
if (!snapshotIds.contains(logEntry.snapshotId())) {
// copy the log entries that are still valid
newSnapshotLog.add(logEntry);
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
index 2df52a0..99bc1d7 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
@@ -168,7 +168,7 @@ public class TableMetadataParser {
generator.writeEndArray();
generator.writeArrayFieldStart(SNAPSHOT_LOG);
- for (SnapshotLogEntry logEntry : metadata.snapshotLog()) {
+ for (HistoryEntry logEntry : metadata.snapshotLog()) {
generator.writeStartObject();
generator.writeNumberField(TIMESTAMP_MS, logEntry.timestampMillis());
generator.writeNumberField(SNAPSHOT_ID, logEntry.snapshotId());
diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
new file mode 100644
index 0000000..ac38a86
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+
+public class SnapshotUtil {
+ private SnapshotUtil() {
+ }
+
+ /**
+ * Return the snapshot IDs for the ancestors of the current table state.
+ * <p>
+ * Ancestor IDs are ordered by commit time, descending. The first ID is the current snapshot, followed by its parent,
+ * and so on.
+ *
+ * @param table a {@link Table}
+ * @return a set of snapshot IDs of the known ancestor snapshots, including the current ID
+ */
+ public static List<Long> currentAncestors(Table table) {
+ return ancestorIds(table.currentSnapshot(), table::snapshot);
+ }
+
+ public static List<Long> ancestorIds(Snapshot snapshot, Function<Long, Snapshot> lookup) {
+ List<Long> ancestorIds = Lists.newArrayList();
+ Snapshot current = snapshot;
+ while (current != null) {
+ ancestorIds.add(current.snapshotId());
+ if (current.parentId() != null) {
+ current = lookup.apply(current.parentId());
+ } else {
+ current = null;
+ }
+ }
+ return ancestorIds;
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestBaseTableScan.java b/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
similarity index 98%
rename from core/src/test/java/org/apache/iceberg/TestBaseTableScan.java
rename to core/src/test/java/org/apache/iceberg/TestDataTableScan.java
index dd90467..5c3dc90 100644
--- a/core/src/test/java/org/apache/iceberg/TestBaseTableScan.java
+++ b/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
@@ -31,7 +31,7 @@ import org.junit.rules.TemporaryFolder;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.junit.Assert.assertEquals;
-public class TestBaseTableScan {
+public class TestDataTableScan {
@Rule
public TemporaryFolder temp = new TemporaryFolder();
diff --git a/core/src/test/java/org/apache/iceberg/TestEntriesMetadataTable.java b/core/src/test/java/org/apache/iceberg/TestEntriesMetadataTable.java
new file mode 100644
index 0000000..7db931c
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestEntriesMetadataTable.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestEntriesMetadataTable extends TableTestBase {
+
+ @Test
+ public void testEntriesTable() {
+ table.newAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ Table entriesTable = new ManifestEntriesTable(table.ops(), table);
+
+ Schema expectedSchema = ManifestEntry.getSchema(table.spec().partitionType());
+
+ assertEquals("A tableScan.select() should prune the schema",
+ expectedSchema.asStruct(),
+ entriesTable.schema().asStruct());
+ }
+
+ @Test
+ public void testEntriesTableScan() {
+ table.newAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ Table entriesTable = new ManifestEntriesTable(table.ops(), table);
+ TableScan scan = entriesTable.newScan();
+
+ Schema expectedSchema = ManifestEntry.getSchema(table.spec().partitionType());
+
+ assertEquals("A tableScan.select() should prune the schema",
+ expectedSchema.asStruct(),
+ scan.schema().asStruct());
+
+ FileScanTask file = Iterables.getOnlyElement(scan.planFiles());
+ Assert.assertEquals("Data file should be the table's manifest",
+ Iterables.getOnlyElement(table.currentSnapshot().manifests()).path(), file.file().path());
+ Assert.assertEquals("Should contain 2 data file records", 2, file.file().recordCount());
+ }
+
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadataJson.java b/core/src/test/java/org/apache/iceberg/TestTableMetadataJson.java
index 1823122..d58bfe6 100644
--- a/core/src/test/java/org/apache/iceberg/TestTableMetadataJson.java
+++ b/core/src/test/java/org/apache/iceberg/TestTableMetadataJson.java
@@ -76,7 +76,7 @@ public class TestTableMetadataJson {
ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
- List<SnapshotLogEntry> snapshotLog = ImmutableList.<SnapshotLogEntry>builder()
+ List<HistoryEntry> snapshotLog = ImmutableList.<HistoryEntry>builder()
.add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshot.snapshotId()))
.add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshot.snapshotId()))
.build();
@@ -140,7 +140,7 @@ public class TestTableMetadataJson {
ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
- List<SnapshotLogEntry> reversedSnapshotLog = Lists.newArrayList();
+ List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
TableMetadata expected = new TableMetadata(ops, null, UUID.randomUUID().toString(), "s3://bucket/test/location",
System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 837c589..b55521c 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -51,7 +51,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable {
@Override
public org.apache.iceberg.Table loadTable(TableIdentifier identifier) {
- Preconditions.checkArgument(identifier.namespace().levels().length == 1,
+ Preconditions.checkArgument(identifier.namespace().levels().length >= 1,
"Missing database in table identifier: %s", identifier);
return super.loadTable(identifier);
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 943db87..e0b2851 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -35,6 +35,7 @@ import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
@@ -415,25 +416,31 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
}
private Iterator<InternalRow> open(FileScanTask task, Schema readSchema) {
- InputFile location = inputFiles.get(task.file().path().toString());
- Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
CloseableIterable<InternalRow> iter;
- switch (task.file().format()) {
- case PARQUET:
- iter = newParquetIterable(location, task, readSchema);
- break;
-
- case AVRO:
- iter = newAvroIterable(location, task, readSchema);
- break;
-
- case ORC:
- iter = newOrcIterable(location, task, readSchema);
- break;
-
- default:
- throw new UnsupportedOperationException(
- "Cannot read unknown format: " + task.file().format());
+ if (task.isDataTask()) {
+ iter = newDataIterable(task.asDataTask(), readSchema);
+
+ } else {
+ InputFile location = inputFiles.get(task.file().path().toString());
+ Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
+
+ switch (task.file().format()) {
+ case PARQUET:
+ iter = newParquetIterable(location, task, readSchema);
+ break;
+
+ case AVRO:
+ iter = newAvroIterable(location, task, readSchema);
+ break;
+
+ case ORC:
+ iter = newOrcIterable(location, task, readSchema);
+ break;
+
+ default:
+ throw new UnsupportedOperationException(
+ "Cannot read unknown format: " + task.file().format());
+ }
}
this.currentCloseable = iter;
@@ -496,6 +503,13 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
.caseSensitive(caseSensitive)
.build();
}
+
+ private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema readSchema) {
+ StructInternalRow row = new StructInternalRow(tableSchema.asStruct());
+ Iterable<InternalRow> asSparkRows = Iterables.transform(task.asDataTask().rows(), row::setStruct);
+ return CloseableIterable.withNoopClose(
+ Iterables.transform(asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke));
+ }
}
private static class PartitionRowConverter implements Function<StructLike, InternalRow> {
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java b/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
new file mode 100644
index 0000000..1ed5a78
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+class StructInternalRow extends InternalRow {
+ private final Types.StructType type;
+ private StructLike struct;
+
+ StructInternalRow(Types.StructType type) {
+ this.type = type;
+ }
+
+ private StructInternalRow(Types.StructType type, StructLike struct) {
+ this.type = type;
+ this.struct = struct;
+ }
+
+ public StructInternalRow setStruct(StructLike newStruct) {
+ this.struct = newStruct;
+ return this;
+ }
+
+ @Override
+ public int numFields() {
+ return struct.size();
+ }
+
+ @Override
+ public void setNullAt(int i) {
+ throw new UnsupportedOperationException("StructInternalRow is read-only");
+ }
+
+ @Override
+ public void update(int i, Object value) {
+ throw new UnsupportedOperationException("StructInternalRow is read-only");
+ }
+
+ @Override
+ public InternalRow copy() {
+ return this;
+ }
+
+ @Override
+ public boolean isNullAt(int ordinal) {
+ return struct.get(ordinal, Object.class) == null;
+ }
+
+ @Override
+ public boolean getBoolean(int ordinal) {
+ return struct.get(ordinal, Boolean.class);
+ }
+
+ @Override
+ public byte getByte(int ordinal) {
+ return (byte) (int) struct.get(ordinal, Integer.class);
+ }
+
+ @Override
+ public short getShort(int ordinal) {
+ return (short) (int) struct.get(ordinal, Integer.class);
+ }
+
+ @Override
+ public int getInt(int ordinal) {
+ return struct.get(ordinal, Integer.class);
+ }
+
+ @Override
+ public long getLong(int ordinal) {
+ return struct.get(ordinal, Long.class);
+ }
+
+ @Override
+ public float getFloat(int ordinal) {
+ return struct.get(ordinal, Float.class);
+ }
+
+ @Override
+ public double getDouble(int ordinal) {
+ return struct.get(ordinal, Double.class);
+ }
+
+ @Override
+ public Decimal getDecimal(int ordinal, int precision, int scale) {
+ return Decimal.apply(struct.get(ordinal, BigDecimal.class));
+ }
+
+ @Override
+ public UTF8String getUTF8String(int ordinal) {
+ CharSequence seq = struct.get(ordinal, CharSequence.class);
+ return UTF8String.fromString(seq.toString());
+ }
+
+ @Override
+ public byte[] getBinary(int ordinal) {
+ ByteBuffer bytes = struct.get(ordinal, ByteBuffer.class);
+ return ByteBuffers.toByteArray(bytes);
+ }
+
+ @Override
+ public CalendarInterval getInterval(int ordinal) {
+ throw new UnsupportedOperationException("Unsupported type: interval");
+ }
+
+ @Override
+ public InternalRow getStruct(int ordinal, int numFields) {
+ return new StructInternalRow(
+ type.fields().get(ordinal).type().asStructType(),
+ struct.get(ordinal, StructLike.class));
+ }
+
+ @Override
+ public ArrayData getArray(int ordinal) {
+ return collectionToArrayData(
+ type.fields().get(ordinal).type().asListType().elementType(),
+ struct.get(ordinal, Collection.class));
+ }
+
+ @Override
+ public MapData getMap(int ordinal) {
+ return mapToMapData(type.fields().get(ordinal).type().asMapType(), struct.get(ordinal, Map.class));
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ public Object get(int ordinal, DataType dataType) {
+ if (dataType instanceof IntegerType) {
+ return getInt(ordinal);
+ } else if (dataType instanceof LongType) {
+ return getLong(ordinal);
+ } else if (dataType instanceof StringType) {
+ return getUTF8String(ordinal);
+ } else if (dataType instanceof FloatType) {
+ return getFloat(ordinal);
+ } else if (dataType instanceof DoubleType) {
+ return getDouble(ordinal);
+ } else if (dataType instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) dataType;
+ return getDecimal(ordinal, decimalType.precision(), decimalType.scale());
+ } else if (dataType instanceof BinaryType) {
+ return getBinary(ordinal);
+ } else if (dataType instanceof StructType) {
+ return getStruct(ordinal, ((StructType) dataType).size());
+ } else if (dataType instanceof ArrayType) {
+ return getArray(ordinal);
+ } else if (dataType instanceof MapType) {
+ return getMap(ordinal);
+ } else if (dataType instanceof BooleanType) {
+ return getBoolean(ordinal);
+ } else if (dataType instanceof ByteType) {
+ return getByte(ordinal);
+ } else if (dataType instanceof ShortType) {
+ return getShort(ordinal);
+ }
+ return null;
+ }
+
+ private MapData mapToMapData(Types.MapType mapType, Map<?, ?> map) {
+ // make a defensive copy to ensure entries do not change
+ List<Map.Entry<?, ?>> entries = ImmutableList.copyOf(map.entrySet());
+ return new ArrayBasedMapData(
+ collectionToArrayData(mapType.keyType(), Lists.transform(entries, Map.Entry::getKey)),
+ collectionToArrayData(mapType.valueType(), Lists.transform(entries, Map.Entry::getValue)));
+ }
+
+ private ArrayData collectionToArrayData(Type elementType, Collection<?> values) {
+ switch (elementType.typeId()) {
+ case BOOLEAN:
+ return fillArray(values, array -> (BiConsumer<Integer, Boolean>) array::setBoolean);
+ case INTEGER:
+ case DATE:
+ case TIME:
+ return fillArray(values, array -> (BiConsumer<Integer, Integer>) array::setInt);
+ case LONG:
+ case TIMESTAMP:
+ return fillArray(values, array -> (BiConsumer<Integer, Long>) array::setLong);
+ case FLOAT:
+ return fillArray(values, array -> (BiConsumer<Integer, Float>) array::setFloat);
+ case DOUBLE:
+ return fillArray(values, array -> (BiConsumer<Integer, Double>) array::setDouble);
+ case STRING:
+ return fillArray(values, array ->
+ (BiConsumer<Integer, CharSequence>) (pos, seq) -> array.update(pos, UTF8String.fromString(seq.toString())));
+ case FIXED:
+ case BINARY:
+ return fillArray(values, array ->
+ (BiConsumer<Integer, ByteBuffer>) (pos, buf) -> array.update(pos, ByteBuffers.toByteArray(buf)));
+ case DECIMAL:
+ return fillArray(values, array ->
+ (BiConsumer<Integer, BigDecimal>) (pos, dec) -> array.update(pos, Decimal.apply(dec)));
+ case STRUCT:
+ return fillArray(values, array -> (BiConsumer<Integer, StructLike>) (pos, tuple) ->
+ array.update(pos, new StructInternalRow(elementType.asStructType(), tuple)));
+ case LIST:
+ return fillArray(values, array -> (BiConsumer<Integer, Collection<?>>) (pos, list) ->
+ array.update(pos, collectionToArrayData(elementType.asListType(), list)));
+ case MAP:
+ return fillArray(values, array -> (BiConsumer<Integer, Map<?, ?>>) (pos, map) ->
+ array.update(pos, mapToMapData(elementType.asMapType(), map)));
+ default:
+ throw new UnsupportedOperationException("Unsupported array element type: " + elementType);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> GenericArrayData fillArray(Collection<?> values, Function<ArrayData, BiConsumer<Integer, T>> makeSetter) {
+ GenericArrayData array = new GenericArrayData(new Object[values.size()]);
+ BiConsumer<Integer, T> setter = makeSetter.apply(array);
+
+ int index = 0;
+ for (Object value : values) {
+ if (value == null) {
+ array.setNullAt(index);
+ } else {
+ setter.accept(index, (T) value);
+ }
+
+ index += 1;
+ }
+
+ return array;
+ }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index fd901c3..c20bc67 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -144,7 +144,7 @@ public class TestHelpers {
break;
case STRING:
Assert.assertTrue("Should be a String", actual instanceof String);
- Assert.assertEquals("Strings should be equal", expected, actual);
+ Assert.assertEquals("Strings should be equal", String.valueOf(expected), actual);
break;
case UUID:
Assert.assertTrue("Should expect a UUID", expected instanceof UUID);
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
index 72fd1b2..5a50731 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
@@ -19,17 +19,30 @@
package org.apache.iceberg.spark.source;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import java.io.IOException;
import java.util.HashMap;
import java.util.List;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@@ -114,4 +127,296 @@ public class TestIcebergSourceHiveTables {
metastoreClient.dropTable(DB_NAME, TABLE_NAME);
}
}
+
+ @Test
+ public void testHiveEntriesTable() throws TException, IOException {
+ try (HiveCatalog catalog = new HiveCatalog(hiveConf)) {
+ Table table = catalog.createTable(TABLE_IDENTIFIER, SCHEMA, PartitionSpec.unpartitioned());
+ Table entriesTable = catalog.loadTable(TableIdentifier.of(DB_NAME, TABLE_NAME, "entries"));
+
+ List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
+
+ Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
+ inputDf.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(TABLE_IDENTIFIER.toString());
+
+ table.refresh();
+
+ List<Row> actual = spark.read()
+ .format("iceberg")
+ .load(DB_NAME + "." + TABLE_NAME + ".entries")
+ .collectAsList();
+
+ Assert.assertEquals("Should only contain one manifest", 1, table.currentSnapshot().manifests().size());
+ InputFile manifest = table.io().newInputFile(table.currentSnapshot().manifests().get(0).path());
+ List<GenericData.Record> expected;
+ try (CloseableIterable<GenericData.Record> rows = Avro.read(manifest).project(entriesTable.schema()).build()) {
+ expected = Lists.newArrayList(rows);
+ }
+
+ Assert.assertEquals("Entries table should have one row", 1, expected.size());
+ Assert.assertEquals("Actual results should have one row", 1, actual.size());
+ TestHelpers.assertEqualsSafe(entriesTable.schema().asStruct(), expected.get(0), actual.get(0));
+
+ } finally {
+ metastoreClient.dropTable(DB_NAME, TABLE_NAME);
+ }
+ }
+
+ @Test
+ public void testHiveFilesTable() throws TException, IOException {
+ try (HiveCatalog catalog = new HiveCatalog(hiveConf)) {
+ Table table = catalog.createTable(TABLE_IDENTIFIER, SCHEMA,
+ PartitionSpec.builderFor(SCHEMA).identity("id").build());
+ Table entriesTable = catalog.loadTable(TableIdentifier.of(DB_NAME, TABLE_NAME, "entries"));
+ Table filesTable = catalog.loadTable(TableIdentifier.of(DB_NAME, TABLE_NAME, "files"));
+
+ Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
+ Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
+
+ df1.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(TABLE_IDENTIFIER.toString());
+
+ // add a second file
+ df2.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(TABLE_IDENTIFIER.toString());
+
+ // delete the first file to test that only live files are listed
+ table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
+
+ List<Row> actual = spark.read()
+ .format("iceberg")
+ .load(DB_NAME + "." + TABLE_NAME + ".files")
+ .collectAsList();
+
+ List<GenericData.Record> expected = Lists.newArrayList();
+ for (ManifestFile manifest : table.currentSnapshot().manifests()) {
+ InputFile in = table.io().newInputFile(manifest.path());
+ try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
+ for (GenericData.Record record : rows) {
+ if ((Integer) record.get("status") < 2 /* added or existing */) {
+ expected.add((GenericData.Record) record.get("data_file"));
+ }
+ }
+ }
+ }
+
+ Assert.assertEquals("Files table should have one row", 1, expected.size());
+ Assert.assertEquals("Actual results should have one row", 1, actual.size());
+ TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(0), actual.get(0));
+
+ } finally {
+ metastoreClient.dropTable(DB_NAME, TABLE_NAME);
+ }
+ }
+
+ @Test
+ public void testHiveHistoryTable() throws TException {
+ try (HiveCatalog catalog = new HiveCatalog(hiveConf)) {
+ Table table = catalog.createTable(TABLE_IDENTIFIER, SCHEMA, PartitionSpec.unpartitioned());
+ Table historyTable = catalog.loadTable(TableIdentifier.of(DB_NAME, TABLE_NAME, "history"));
+
+ List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
+ Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
+
+ inputDf.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(TABLE_IDENTIFIER.toString());
+
+ table.refresh();
+ long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
+ long firstSnapshotId = table.currentSnapshot().snapshotId();
+
+ inputDf.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(TABLE_IDENTIFIER.toString());
+
+ table.refresh();
+ long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis();
+ long secondSnapshotId = table.currentSnapshot().snapshotId();
+
+ // rollback the table state to the first snapshot
+ table.rollback().toSnapshotId(firstSnapshotId).commit();
+ long rollbackTimestamp = Iterables.getLast(table.history()).timestampMillis();
+
+ inputDf.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(TABLE_IDENTIFIER.toString());
+
+ table.refresh();
+ long thirdSnapshotTimestamp = table.currentSnapshot().timestampMillis();
+ long thirdSnapshotId = table.currentSnapshot().snapshotId();
+
+ List<Row> actual = spark.read()
+ .format("iceberg")
+ .load(DB_NAME + "." + TABLE_NAME + ".history")
+ .collectAsList();
+
+ GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(historyTable.schema(), "history"));
+ List<GenericData.Record> expected = Lists.newArrayList(
+ builder.set("made_current_at", firstSnapshotTimestamp * 1000)
+ .set("snapshot_id", firstSnapshotId)
+ .set("parent_id", null)
+ .set("is_current_ancestor", true)
+ .build(),
+ builder.set("made_current_at", secondSnapshotTimestamp * 1000)
+ .set("snapshot_id", secondSnapshotId)
+ .set("parent_id", firstSnapshotId)
+ .set("is_current_ancestor", false) // commit rolled back, not an ancestor of the current table state
+ .build(),
+ builder.set("made_current_at", rollbackTimestamp * 1000)
+ .set("snapshot_id", firstSnapshotId)
+ .set("parent_id", null)
+ .set("is_current_ancestor", true)
+ .build(),
+ builder.set("made_current_at", thirdSnapshotTimestamp * 1000)
+ .set("snapshot_id", thirdSnapshotId)
+ .set("parent_id", firstSnapshotId)
+ .set("is_current_ancestor", true)
+ .build()
+ );
+
+ Assert.assertEquals("History table should have a row for each commit", 4, actual.size());
+ TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(0), actual.get(0));
+ TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(1), actual.get(1));
+ TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(2), actual.get(2));
+
+ } finally {
+ metastoreClient.dropTable(DB_NAME, TABLE_NAME);
+ }
+ }
+
+ @Test
+ public void testHiveSnapshotsTable() throws TException {
+ try (HiveCatalog catalog = new HiveCatalog(hiveConf)) {
+ Table table = catalog.createTable(TABLE_IDENTIFIER, SCHEMA, PartitionSpec.unpartitioned());
+ Table snapTable = catalog.loadTable(TableIdentifier.of(DB_NAME, TABLE_NAME, "snapshots"));
+
+ List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
+ Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
+
+ inputDf.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(TABLE_IDENTIFIER.toString());
+
+ table.refresh();
+ long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
+ long firstSnapshotId = table.currentSnapshot().snapshotId();
+ String firstManifestList = table.currentSnapshot().manifestListLocation();
+
+ table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
+
+ long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis();
+ long secondSnapshotId = table.currentSnapshot().snapshotId();
+ String secondManifestList = table.currentSnapshot().manifestListLocation();
+
+ // rollback the table state to the first snapshot
+ table.rollback().toSnapshotId(firstSnapshotId).commit();
+
+ List<Row> actual = spark.read()
+ .format("iceberg")
+ .load(DB_NAME + "." + TABLE_NAME + ".snapshots")
+ .collectAsList();
+
+ GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(snapTable.schema(), "snapshots"));
+ List<GenericData.Record> expected = Lists.newArrayList(
+ builder.set("committed_at", firstSnapshotTimestamp * 1000)
+ .set("snapshot_id", firstSnapshotId)
+ .set("parent_id", null)
+ .set("operation", "append")
+ .set("manifest_list", firstManifestList)
+ .set("summary", ImmutableMap.of(
+ "added-records", "1",
+ "added-data-files", "1",
+ "changed-partition-count", "1",
+ "total-data-files", "1",
+ "total-records", "1"
+ ))
+ .build(),
+ builder.set("committed_at", secondSnapshotTimestamp * 1000)
+ .set("snapshot_id", secondSnapshotId)
+ .set("parent_id", firstSnapshotId)
+ .set("operation", "delete")
+ .set("manifest_list", secondManifestList)
+ .set("summary", ImmutableMap.of(
+ "deleted-records", "1",
+ "deleted-data-files", "1",
+ "changed-partition-count", "1",
+ "total-records", "0",
+ "total-data-files", "0"
+ ))
+ .build()
+ );
+
+ Assert.assertEquals("Snapshots table should have a row for each snapshot", 2, actual.size());
+ TestHelpers.assertEqualsSafe(snapTable.schema().asStruct(), expected.get(0), actual.get(0));
+ TestHelpers.assertEqualsSafe(snapTable.schema().asStruct(), expected.get(1), actual.get(1));
+
+ } finally {
+ metastoreClient.dropTable(DB_NAME, TABLE_NAME);
+ }
+ }
+
+ @Test
+ public void testHiveManifestsTable() throws TException {
+ try (HiveCatalog catalog = new HiveCatalog(hiveConf)) {
+ Table table = catalog.createTable(
+ TABLE_IDENTIFIER,
+ SCHEMA,
+ PartitionSpec.builderFor(SCHEMA).identity("id").build());
+ Table manifestTable = catalog.loadTable(TableIdentifier.of(DB_NAME, TABLE_NAME, "manifests"));
+
+ Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
+
+ df1.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(TABLE_IDENTIFIER.toString());
+
+ List<Row> actual = spark.read()
+ .format("iceberg")
+ .load(DB_NAME + "." + TABLE_NAME + ".manifests")
+ .collectAsList();
+
+ table.refresh();
+
+ GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(
+ manifestTable.schema(), "manifests"));
+ GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(
+ manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary"));
+ List<GenericData.Record> expected = Lists.transform(table.currentSnapshot().manifests(), manifest ->
+ builder.set("path", manifest.path())
+ .set("length", manifest.length())
+ .set("partition_spec_id", manifest.partitionSpecId())
+ .set("added_snapshot_id", manifest.snapshotId())
+ .set("added_data_files_count", manifest.addedFilesCount())
+ .set("existing_data_files_count", manifest.existingFilesCount())
+ .set("deleted_data_files_count", manifest.deletedFilesCount())
+ .set("partition_summaries", Lists.transform(manifest.partitions(), partition ->
+ summaryBuilder
+ .set("contains_null", false)
+ .set("lower_bound", "1")
+ .set("upper_bound", "1")
+ .build()
+ ))
+ .build()
+ );
+
+ Assert.assertEquals("Manifests table should have one manifest row", 1, actual.size());
+ TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(0), actual.get(0));
+
+ } finally {
+ metastoreClient.dropTable(DB_NAME, TABLE_NAME);
+ }
+ }
}