You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/04/04 16:25:32 UTC
[incubator-iceberg] branch master updated: Use the current schema
in all partition specs in scan planning. (#108)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 9a419df Use the current schema in all partition specs in scan planning. (#108)
9a419df is described below
commit 9a419df715bc206c4b3cef4a4df2bc65f2512370
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Apr 4 09:25:24 2019 -0700
Use the current schema in all partition specs in scan planning. (#108)
---
.../main/java/org/apache/iceberg/BaseSnapshot.java | 4 +-
.../java/org/apache/iceberg/BaseTableScan.java | 18 ++--
.../java/org/apache/iceberg/ManifestGroup.java | 9 +-
.../java/org/apache/iceberg/ManifestReader.java | 36 ++++---
.../org/apache/iceberg/MergingSnapshotUpdate.java | 6 +-
.../java/org/apache/iceberg/RemoveSnapshots.java | 3 +-
.../java/org/apache/iceberg/SnapshotUpdate.java | 3 +-
.../java/org/apache/iceberg/TableMetadata.java | 17 +++-
.../iceberg/TestScansAndSchemaEvolution.java | 113 +++++++++++++++++++++
9 files changed, 176 insertions(+), 33 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
index 6b43032..425e3ae 100644
--- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
@@ -158,7 +158,9 @@ class BaseSnapshot implements Snapshot {
// accumulate adds and deletes from all manifests.
// because manifests can be reused in newer snapshots, filter the changes by snapshot id.
for (String manifest : Iterables.transform(manifests(), ManifestFile::path)) {
- try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest))) {
+ try (ManifestReader reader = ManifestReader.read(
+ ops.io().newInputFile(manifest),
+ ops.current()::spec)) {
for (ManifestEntry add : reader.addedFiles()) {
if (add.snapshotId() == snapshotId) {
adds.add(add.file().copy());
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index 74e7e28..2e580d5 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -44,6 +44,7 @@ import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.InclusiveManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.TypeUtil;
@@ -147,7 +148,7 @@ class BaseTableScan implements TableScan {
caseSensitive, selectedColumns);
}
- private final LoadingCache<Integer, InclusiveManifestEvaluator> EVAL_CACHE = CacheBuilder
+ private final LoadingCache<Integer, InclusiveManifestEvaluator> evalCache = CacheBuilder
.newBuilder()
.build(new CacheLoader<Integer, InclusiveManifestEvaluator>() {
@Override
@@ -172,19 +173,20 @@ class BaseTableScan implements TableScan {
new ScanEvent(table.toString(), snapshot.snapshotId(), rowFilter, schema()));
Iterable<ManifestFile> matchingManifests = Iterables.filter(snapshot.manifests(),
- manifest -> EVAL_CACHE.getUnchecked(manifest.partitionSpecId()).eval(manifest));
+ manifest -> evalCache.getUnchecked(manifest.partitionSpecId()).eval(manifest));
ConcurrentLinkedQueue<Closeable> toClose = new ConcurrentLinkedQueue<>();
Iterable<Iterable<FileScanTask>> readers = Iterables.transform(
matchingManifests,
manifest -> {
- ManifestReader reader = ManifestReader
- .read(ops.io().newInputFile(manifest.path()))
- .caseSensitive(caseSensitive);
+ ManifestReader reader = ManifestReader
+ .read(ops.io().newInputFile(manifest.path()), ops.current()::spec)
+ .caseSensitive(caseSensitive);
+ PartitionSpec spec = ops.current().spec(manifest.partitionSpecId());
toClose.add(reader);
- String schemaString = SchemaParser.toJson(reader.spec().schema());
- String specString = PartitionSpecParser.toJson(reader.spec());
- ResidualEvaluator residuals = new ResidualEvaluator(reader.spec(), rowFilter, caseSensitive);
+ String schemaString = SchemaParser.toJson(spec.schema());
+ String specString = PartitionSpecParser.toJson(spec);
+ ResidualEvaluator residuals = new ResidualEvaluator(spec, rowFilter, caseSensitive);
return Iterables.transform(
reader.filterRows(rowFilter).select(SNAPSHOT_COLUMNS),
file -> new BaseFileScanTask(file, schemaString, specString, residuals)
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index 5bc2bab..3ca6b54 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.InclusiveManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
@@ -47,7 +48,7 @@ class ManifestGroup {
private final boolean ignoreDeleted;
private final List<String> columns;
- private final LoadingCache<Integer, InclusiveManifestEvaluator> EVAL_CACHE = CacheBuilder
+ private final LoadingCache<Integer, InclusiveManifestEvaluator> evalCache = CacheBuilder
.newBuilder()
.build(new CacheLoader<Integer, InclusiveManifestEvaluator>() {
@Override
@@ -109,7 +110,7 @@ class ManifestGroup {
List<Closeable> toClose = Lists.newArrayList();
Iterable<ManifestFile> matchingManifests = Iterables.filter(manifests,
- manifest -> EVAL_CACHE.getUnchecked(manifest.partitionSpecId()).eval(manifest));
+ manifest -> evalCache.getUnchecked(manifest.partitionSpecId()).eval(manifest));
if (ignoreDeleted) {
// remove any manifests that don't have any existing or added files. if either the added or
@@ -122,7 +123,9 @@ class ManifestGroup {
Iterable<Iterable<ManifestEntry>> readers = Iterables.transform(
matchingManifests,
manifest -> {
- ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()));
+ ManifestReader reader = ManifestReader.read(
+ ops.io().newInputFile(manifest.path()),
+ ops.current()::spec);
FilteredManifest filtered = reader.filterRows(dataFilter).select(columns);
toClose.add(reader);
return Iterables.filter(
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 6b428f0..32fba63 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroIterable;
import org.apache.iceberg.exceptions.RuntimeIOException;
@@ -45,7 +46,7 @@ import static org.apache.iceberg.expressions.Expressions.alwaysTrue;
/**
* Reader for manifest files.
* <p>
- * Readers are created using the builder from {@link #read(InputFile)}.
+ * Readers are created using the builder from {@link #read(InputFile, Function)}.
*/
public class ManifestReader extends CloseableGroup implements Filterable<FilteredManifest> {
private static final Logger LOG = LoggerFactory.getLogger(ManifestReader.class);
@@ -54,25 +55,20 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
private static final List<String> CHANGE_COLUNNS = Lists.newArrayList(
"file_path", "file_format", "partition", "record_count", "file_size_in_bytes");
- /**
- * Returns a new {@link ManifestReader} for an {@link InputFile}.
- *
- * @param file an InputFile
- * @return a manifest reader
- */
- public static ManifestReader read(InputFile file) {
- return new ManifestReader(file, true);
+ // Visible for testing
+ static ManifestReader read(InputFile file) {
+ return new ManifestReader(file, null);
}
/**
* Returns a new {@link ManifestReader} for an {@link InputFile}.
*
* @param file an InputFile
- * @param caseSensitive whether column name matching should have case sensitivity
+ * @param specLookup a function to look up the manifest's partition spec by ID
* @return a manifest reader
*/
- public static ManifestReader read(InputFile file, boolean caseSensitive) {
- return new ManifestReader(file, caseSensitive);
+ public static ManifestReader read(InputFile file, Function<Integer, PartitionSpec> specLookup) {
+ return new ManifestReader(file, specLookup);
}
private final InputFile file;
@@ -85,9 +81,8 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
private List<ManifestEntry> adds = null;
private List<ManifestEntry> deletes = null;
- private ManifestReader(InputFile file, boolean caseSensitive) {
+ private ManifestReader(InputFile file, Function<Integer, PartitionSpec> specLookup) {
this.file = file;
- this.caseSensitive = caseSensitive;
try {
try (AvroIterable<ManifestEntry> headerReader = Avro.read(file)
@@ -98,13 +93,22 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
} catch (IOException e) {
throw new RuntimeIOException(e);
}
- this.schema = SchemaParser.fromJson(metadata.get("schema"));
+
int specId = TableMetadata.INITIAL_SPEC_ID;
String specProperty = metadata.get("partition-spec-id");
if (specProperty != null) {
specId = Integer.parseInt(specProperty);
}
- this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec"));
+
+ if (specLookup != null) {
+ this.spec = specLookup.apply(specId);
+ this.schema = spec.schema();
+ } else {
+ this.schema = SchemaParser.fromJson(metadata.get("schema"));
+ this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec"));
+ }
+
+ this.caseSensitive = true;
}
private ManifestReader(InputFile file, Map<String, String> metadata,
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
index af873c7..bec57c8 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
@@ -334,7 +334,8 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
return manifest;
}
- try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
+ try (ManifestReader reader = ManifestReader.read(
+ ops.io().newInputFile(manifest.path()), ops.current()::spec)) {
Expression inclusiveExpr = Projections
.inclusive(reader.spec())
.project(deleteExpression);
@@ -487,7 +488,8 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
try {
for (ManifestFile manifest : bin) {
- try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
+ try (ManifestReader reader = ManifestReader.read(
+ ops.io().newInputFile(manifest.path()), ops.current()::spec)) {
for (ManifestEntry entry : reader.entries()) {
if (entry.status() == Status.DELETED) {
// suppress deletes from previous snapshots. only files deleted by this snapshot
diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
index e4ca10e..1accb7d 100644
--- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
+++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
@@ -174,7 +174,8 @@ class RemoveSnapshots implements ExpireSnapshots {
}
// the manifest has deletes, scan it to find files to delete
- try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
+ try (ManifestReader reader = ManifestReader.read(
+ ops.io().newInputFile(manifest.path()), ops.current()::spec)) {
for (ManifestEntry entry : reader.entries()) {
// if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted
if (entry.status() == ManifestEntry.Status.DELETED &&
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotUpdate.java b/core/src/main/java/org/apache/iceberg/SnapshotUpdate.java
index df5105b..fda30a3 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotUpdate.java
@@ -280,7 +280,8 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
}
private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) {
- try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
+ try (ManifestReader reader = ManifestReader.read(
+ ops.io().newInputFile(manifest.path()), ops.current()::spec)) {
PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId()));
int addedFiles = 0;
int existingFiles = 0;
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index efae379..14d246b 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -258,8 +258,11 @@ public class TableMetadata {
public TableMetadata updateSchema(Schema schema, int lastColumnId) {
PartitionSpec.checkCompatibility(spec(), schema);
+ // rebuild all of the partition specs for the new current schema
+ List<PartitionSpec> updatedSpecs = Lists.transform(specs,
+ spec -> updateSpecSchema(schema, spec));
return new TableMetadata(ops, null, location,
- System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+ System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, updatedSpecs, properties,
currentSnapshotId, snapshots, snapshotLog);
}
@@ -423,6 +426,18 @@ public class TableMetadata {
currentSnapshotId, snapshots, snapshotLog);
}
+ private static PartitionSpec updateSpecSchema(Schema schema, PartitionSpec partitionSpec) {
+ PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema)
+ .withSpecId(partitionSpec.specId());
+
+ // add all of the fields to the builder. IDs should not change.
+ for (PartitionField field : partitionSpec.fields()) {
+ specBuilder.add(field.sourceId(), field.name(), field.transform().toString());
+ }
+
+ return specBuilder.build();
+ }
+
private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec partitionSpec) {
PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema)
.withSpecId(specId);
diff --git a/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java b/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
new file mode 100644
index 0000000..098c8e1
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.Lists;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.avro.RandomAvroData;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.types.Types;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestScansAndSchemaEvolution {
+ private static final Schema SCHEMA = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "data", Types.StringType.get()),
+ required(3, "part", Types.StringType.get()));
+
+ private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
+ .identity("part")
+ .build();
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ private DataFile createDataFile(File dataPath, String partValue) throws IOException {
+ List<GenericData.Record> expected = RandomAvroData.generate(SCHEMA, 100, 0L);
+
+ File dataFile = new File(dataPath, FileFormat.AVRO.addExtension(UUID.randomUUID().toString()));
+ try (FileAppender<GenericData.Record> writer = Avro.write(Files.localOutput(dataFile))
+ .schema(SCHEMA)
+ .named("test")
+ .build()) {
+ for (GenericData.Record rec : expected) {
+ rec.put("part", partValue); // create just one partition
+ writer.add(rec);
+ }
+ }
+
+ PartitionData partition = new PartitionData(SPEC.partitionType());
+ partition.set(0, partValue);
+ return DataFiles.builder(SPEC)
+ .withInputFile(Files.localInput(dataFile))
+ .withPartition(partition)
+ .withRecordCount(100)
+ .build();
+ }
+
+ @After
+ public void cleanupTables() {
+ TestTables.clearTables();
+ }
+
+ @Test
+ public void testPartitionSourceRename() throws IOException {
+ File location = temp.newFolder();
+ File dataLocation = new File(location, "data");
+ Assert.assertTrue(location.delete()); // should be created by table create
+
+ Table table = TestTables.create(location, "test", SCHEMA, SPEC);
+
+ DataFile fileOne = createDataFile(dataLocation, "one");
+ DataFile fileTwo = createDataFile(dataLocation, "two");
+
+ table.newAppend()
+ .appendFile(fileOne)
+ .appendFile(fileTwo)
+ .commit();
+
+ List<FileScanTask> tasks = Lists.newArrayList(
+ table.newScan().filter(Expressions.equal("part", "one")).planFiles());
+
+ Assert.assertEquals("Should produce 1 matching file task", 1, tasks.size());
+
+ table.updateSchema()
+ .renameColumn("part", "p")
+ .commit();
+
+ // plan the scan using the new name in a filter
+ tasks = Lists.newArrayList(
+ table.newScan().filter(Expressions.equal("p", "one")).planFiles());
+
+ Assert.assertEquals("Should produce 1 matching file task", 1, tasks.size());
+ }
+}