You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/04/04 16:25:32 UTC

[incubator-iceberg] branch master updated: Use the current schema in all partition specs in scan planning. (#108)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a419df  Use the current schema in all partition specs in scan planning. (#108)
9a419df is described below

commit 9a419df715bc206c4b3cef4a4df2bc65f2512370
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Apr 4 09:25:24 2019 -0700

    Use the current schema in all partition specs in scan planning. (#108)
---
 .../main/java/org/apache/iceberg/BaseSnapshot.java |   4 +-
 .../java/org/apache/iceberg/BaseTableScan.java     |  18 ++--
 .../java/org/apache/iceberg/ManifestGroup.java     |   9 +-
 .../java/org/apache/iceberg/ManifestReader.java    |  36 ++++---
 .../org/apache/iceberg/MergingSnapshotUpdate.java  |   6 +-
 .../java/org/apache/iceberg/RemoveSnapshots.java   |   3 +-
 .../java/org/apache/iceberg/SnapshotUpdate.java    |   3 +-
 .../java/org/apache/iceberg/TableMetadata.java     |  17 +++-
 .../iceberg/TestScansAndSchemaEvolution.java       | 113 +++++++++++++++++++++
 9 files changed, 176 insertions(+), 33 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
index 6b43032..425e3ae 100644
--- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
@@ -158,7 +158,9 @@ class BaseSnapshot implements Snapshot {
     // accumulate adds and deletes from all manifests.
     // because manifests can be reused in newer snapshots, filter the changes by snapshot id.
     for (String manifest : Iterables.transform(manifests(), ManifestFile::path)) {
-      try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest))) {
+      try (ManifestReader reader = ManifestReader.read(
+          ops.io().newInputFile(manifest),
+          ops.current()::spec)) {
         for (ManifestEntry add : reader.addedFiles()) {
           if (add.snapshotId() == snapshotId) {
             adds.add(add.file().copy());
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index 74e7e28..2e580d5 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -44,6 +44,7 @@ import org.apache.iceberg.expressions.Binder;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.InclusiveManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.types.TypeUtil;
@@ -147,7 +148,7 @@ class BaseTableScan implements TableScan {
                              caseSensitive, selectedColumns);
   }
 
-  private final LoadingCache<Integer, InclusiveManifestEvaluator> EVAL_CACHE = CacheBuilder
+  private final LoadingCache<Integer, InclusiveManifestEvaluator> evalCache = CacheBuilder
       .newBuilder()
       .build(new CacheLoader<Integer, InclusiveManifestEvaluator>() {
         @Override
@@ -172,19 +173,20 @@ class BaseTableScan implements TableScan {
           new ScanEvent(table.toString(), snapshot.snapshotId(), rowFilter, schema()));
 
       Iterable<ManifestFile> matchingManifests = Iterables.filter(snapshot.manifests(),
-          manifest -> EVAL_CACHE.getUnchecked(manifest.partitionSpecId()).eval(manifest));
+          manifest -> evalCache.getUnchecked(manifest.partitionSpecId()).eval(manifest));
 
       ConcurrentLinkedQueue<Closeable> toClose = new ConcurrentLinkedQueue<>();
       Iterable<Iterable<FileScanTask>> readers = Iterables.transform(
           matchingManifests,
           manifest -> {
-            ManifestReader reader = ManifestReader
-                .read(ops.io().newInputFile(manifest.path()))
-                .caseSensitive(caseSensitive);
+              ManifestReader reader = ManifestReader
+                  .read(ops.io().newInputFile(manifest.path()), ops.current()::spec)
+                  .caseSensitive(caseSensitive);
+            PartitionSpec spec = ops.current().spec(manifest.partitionSpecId());
             toClose.add(reader);
-            String schemaString = SchemaParser.toJson(reader.spec().schema());
-            String specString = PartitionSpecParser.toJson(reader.spec());
-            ResidualEvaluator residuals = new ResidualEvaluator(reader.spec(), rowFilter, caseSensitive);
+            String schemaString = SchemaParser.toJson(spec.schema());
+            String specString = PartitionSpecParser.toJson(spec);
+            ResidualEvaluator residuals = new ResidualEvaluator(spec, rowFilter, caseSensitive);
             return Iterables.transform(
                 reader.filterRows(rowFilter).select(SNAPSHOT_COLUMNS),
                 file -> new BaseFileScanTask(file, schemaString, specString, residuals)
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index 5bc2bab..3ca6b54 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.expressions.Evaluator;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.InclusiveManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.types.Types;
 
@@ -47,7 +48,7 @@ class ManifestGroup {
   private final boolean ignoreDeleted;
   private final List<String> columns;
 
-  private final LoadingCache<Integer, InclusiveManifestEvaluator> EVAL_CACHE = CacheBuilder
+  private final LoadingCache<Integer, InclusiveManifestEvaluator> evalCache = CacheBuilder
       .newBuilder()
       .build(new CacheLoader<Integer, InclusiveManifestEvaluator>() {
         @Override
@@ -109,7 +110,7 @@ class ManifestGroup {
     List<Closeable> toClose = Lists.newArrayList();
 
     Iterable<ManifestFile> matchingManifests = Iterables.filter(manifests,
-        manifest -> EVAL_CACHE.getUnchecked(manifest.partitionSpecId()).eval(manifest));
+        manifest -> evalCache.getUnchecked(manifest.partitionSpecId()).eval(manifest));
 
     if (ignoreDeleted) {
       // remove any manifests that don't have any existing or added files. if either the added or
@@ -122,7 +123,9 @@ class ManifestGroup {
     Iterable<Iterable<ManifestEntry>> readers = Iterables.transform(
         matchingManifests,
         manifest -> {
-          ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()));
+          ManifestReader reader = ManifestReader.read(
+              ops.io().newInputFile(manifest.path()),
+              ops.current()::spec);
           FilteredManifest filtered = reader.filterRows(dataFilter).select(columns);
           toClose.add(reader);
           return Iterables.filter(
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 6b428f0..32fba63 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -27,6 +27,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.avro.AvroIterable;
 import org.apache.iceberg.exceptions.RuntimeIOException;
@@ -45,7 +46,7 @@ import static org.apache.iceberg.expressions.Expressions.alwaysTrue;
 /**
  * Reader for manifest files.
  * <p>
- * Readers are created using the builder from {@link #read(InputFile)}.
+ * Readers are created using the builder from {@link #read(InputFile, Function)}.
  */
 public class ManifestReader extends CloseableGroup implements Filterable<FilteredManifest> {
   private static final Logger LOG = LoggerFactory.getLogger(ManifestReader.class);
@@ -54,25 +55,20 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
   private static final List<String> CHANGE_COLUNNS = Lists.newArrayList(
       "file_path", "file_format", "partition", "record_count", "file_size_in_bytes");
 
-  /**
-   * Returns a new {@link ManifestReader} for an {@link InputFile}.
-   *
-   * @param file an InputFile
-   * @return a manifest reader
-   */
-  public static ManifestReader read(InputFile file) {
-    return new ManifestReader(file, true);
+  // Visible for testing
+  static ManifestReader read(InputFile file) {
+    return new ManifestReader(file, null);
   }
 
   /**
    * Returns a new {@link ManifestReader} for an {@link InputFile}.
    *
    * @param file an InputFile
-   * @param caseSensitive whether column name matching should have case sensitivity
+   * @param specLookup a function to look up the manifest's partition spec by ID
    * @return a manifest reader
    */
-  public static ManifestReader read(InputFile file, boolean caseSensitive) {
-    return new ManifestReader(file, caseSensitive);
+  public static ManifestReader read(InputFile file, Function<Integer, PartitionSpec> specLookup) {
+    return new ManifestReader(file, specLookup);
   }
 
   private final InputFile file;
@@ -85,9 +81,8 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
   private List<ManifestEntry> adds = null;
   private List<ManifestEntry> deletes = null;
 
-  private ManifestReader(InputFile file, boolean caseSensitive) {
+  private ManifestReader(InputFile file, Function<Integer, PartitionSpec> specLookup) {
     this.file = file;
-    this.caseSensitive = caseSensitive;
 
     try {
       try (AvroIterable<ManifestEntry> headerReader = Avro.read(file)
@@ -98,13 +93,22 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
     } catch (IOException e) {
       throw new RuntimeIOException(e);
     }
-    this.schema = SchemaParser.fromJson(metadata.get("schema"));
+
     int specId = TableMetadata.INITIAL_SPEC_ID;
     String specProperty = metadata.get("partition-spec-id");
     if (specProperty != null) {
       specId = Integer.parseInt(specProperty);
     }
-    this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec"));
+
+    if (specLookup != null) {
+      this.spec = specLookup.apply(specId);
+      this.schema = spec.schema();
+    } else {
+      this.schema = SchemaParser.fromJson(metadata.get("schema"));
+      this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec"));
+    }
+
+    this.caseSensitive = true;
   }
 
   private ManifestReader(InputFile file, Map<String, String> metadata,
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
index af873c7..bec57c8 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
@@ -334,7 +334,8 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
       return manifest;
     }
 
-    try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
+    try (ManifestReader reader = ManifestReader.read(
+        ops.io().newInputFile(manifest.path()), ops.current()::spec)) {
       Expression inclusiveExpr = Projections
           .inclusive(reader.spec())
           .project(deleteExpression);
@@ -487,7 +488,8 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
     try {
 
       for (ManifestFile manifest : bin) {
-        try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
+        try (ManifestReader reader = ManifestReader.read(
+            ops.io().newInputFile(manifest.path()), ops.current()::spec)) {
           for (ManifestEntry entry : reader.entries()) {
             if (entry.status() == Status.DELETED) {
               // suppress deletes from previous snapshots. only files deleted by this snapshot
diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
index e4ca10e..1accb7d 100644
--- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
+++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
@@ -174,7 +174,8 @@ class RemoveSnapshots implements ExpireSnapshots {
           }
 
           // the manifest has deletes, scan it to find files to delete
-          try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
+          try (ManifestReader reader = ManifestReader.read(
+              ops.io().newInputFile(manifest.path()), ops.current()::spec)) {
             for (ManifestEntry entry : reader.entries()) {
               // if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted
               if (entry.status() == ManifestEntry.Status.DELETED &&
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotUpdate.java b/core/src/main/java/org/apache/iceberg/SnapshotUpdate.java
index df5105b..fda30a3 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotUpdate.java
@@ -280,7 +280,8 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
   }
 
   private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) {
-    try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
+    try (ManifestReader reader = ManifestReader.read(
+        ops.io().newInputFile(manifest.path()), ops.current()::spec)) {
       PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId()));
       int addedFiles = 0;
       int existingFiles = 0;
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index efae379..14d246b 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -258,8 +258,11 @@ public class TableMetadata {
 
   public TableMetadata updateSchema(Schema schema, int lastColumnId) {
     PartitionSpec.checkCompatibility(spec(), schema);
+    // rebuild all of the partition specs for the new current schema
+    List<PartitionSpec> updatedSpecs = Lists.transform(specs,
+        spec -> updateSpecSchema(schema, spec));
     return new TableMetadata(ops, null, location,
-        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, updatedSpecs, properties,
         currentSnapshotId, snapshots, snapshotLog);
   }
 
@@ -423,6 +426,18 @@ public class TableMetadata {
         currentSnapshotId, snapshots, snapshotLog);
   }
 
+  private static PartitionSpec updateSpecSchema(Schema schema, PartitionSpec partitionSpec) {
+    PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema)
+        .withSpecId(partitionSpec.specId());
+
+    // add all of the fields to the builder. IDs should not change.
+    for (PartitionField field : partitionSpec.fields()) {
+      specBuilder.add(field.sourceId(), field.name(), field.transform().toString());
+    }
+
+    return specBuilder.build();
+  }
+
   private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec partitionSpec) {
     PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema)
         .withSpecId(specId);
diff --git a/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java b/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
new file mode 100644
index 0000000..098c8e1
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.Lists;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.avro.RandomAvroData;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.types.Types;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestScansAndSchemaEvolution {
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      required(2, "data", Types.StringType.get()),
+      required(3, "part", Types.StringType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
+      .identity("part")
+      .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private DataFile createDataFile(File dataPath, String partValue) throws IOException {
+    List<GenericData.Record> expected = RandomAvroData.generate(SCHEMA, 100, 0L);
+
+    File dataFile = new File(dataPath, FileFormat.AVRO.addExtension(UUID.randomUUID().toString()));
+    try (FileAppender<GenericData.Record> writer = Avro.write(Files.localOutput(dataFile))
+        .schema(SCHEMA)
+        .named("test")
+        .build()) {
+      for (GenericData.Record rec : expected) {
+        rec.put("part", partValue); // create just one partition
+        writer.add(rec);
+      }
+    }
+
+    PartitionData partition = new PartitionData(SPEC.partitionType());
+    partition.set(0, partValue);
+    return DataFiles.builder(SPEC)
+        .withInputFile(Files.localInput(dataFile))
+        .withPartition(partition)
+        .withRecordCount(100)
+        .build();
+  }
+
+  @After
+  public void cleanupTables() {
+    TestTables.clearTables();
+  }
+
+  @Test
+  public void testPartitionSourceRename() throws IOException {
+    File location = temp.newFolder();
+    File dataLocation = new File(location, "data");
+    Assert.assertTrue(location.delete()); // should be created by table create
+
+    Table table = TestTables.create(location, "test", SCHEMA, SPEC);
+
+    DataFile fileOne = createDataFile(dataLocation, "one");
+    DataFile fileTwo = createDataFile(dataLocation, "two");
+
+    table.newAppend()
+        .appendFile(fileOne)
+        .appendFile(fileTwo)
+        .commit();
+
+    List<FileScanTask> tasks = Lists.newArrayList(
+        table.newScan().filter(Expressions.equal("part", "one")).planFiles());
+
+    Assert.assertEquals("Should produce 1 matching file task", 1, tasks.size());
+
+    table.updateSchema()
+        .renameColumn("part", "p")
+        .commit();
+
+    // plan the scan using the new name in a filter
+    tasks = Lists.newArrayList(
+        table.newScan().filter(Expressions.equal("p", "one")).planFiles());
+
+    Assert.assertEquals("Should produce 1 matching file task", 1, tasks.size());
+  }
+}