You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/06/12 19:59:05 UTC

[incubator-iceberg] branch master updated: Support multiple partitions derived from the same field (#203)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 649cbdd  Support multiple partitions derived from the same field (#203)
649cbdd is described below

commit 649cbdde83693ebda8e8dc6e75857426d25414ec
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Wed Jun 12 12:59:00 2019 -0700

    Support multiple partitions derived from the same field (#203)
---
 .../java/org/apache/iceberg/PartitionField.java    |   5 +
 .../java/org/apache/iceberg/PartitionSpec.java     |  52 +++++---
 .../apache/iceberg/expressions/Projections.java    |  50 ++++---
 .../iceberg/expressions/ResidualEvaluator.java     |  35 +++--
 .../org/apache/iceberg/TestPartitionPaths.java     |   4 +-
 .../iceberg/TestPartitionSpecValidation.java       | 148 +++++++++++++++++++++
 .../transforms/TestTransformSerialization.java     |  72 +++++-----
 .../org/apache/iceberg/spark/source/Writer.java    |   3 +-
 8 files changed, 284 insertions(+), 85 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/PartitionField.java b/api/src/main/java/org/apache/iceberg/PartitionField.java
index 4332181..ceb4db9 100644
--- a/api/src/main/java/org/apache/iceberg/PartitionField.java
+++ b/api/src/main/java/org/apache/iceberg/PartitionField.java
@@ -59,6 +59,11 @@ public class PartitionField implements Serializable {
   }
 
   @Override
+  public String toString() {
+    return name + ": " + transform + "(" + sourceId + ")";
+  }
+
+  @Override
   public boolean equals(Object other) {
     if (this == other) {
       return true;
diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java b/api/src/main/java/org/apache/iceberg/PartitionSpec.java
index 4d757dd..3d7475b 100644
--- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java
+++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java
@@ -22,7 +22,10 @@ package org.apache.iceberg;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimaps;
 import com.google.common.collect.Sets;
 import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
@@ -52,7 +55,7 @@ public class PartitionSpec implements Serializable {
   // this is ordered so that DataFile has a consistent schema
   private final int specId;
   private final PartitionField[] fields;
-  private transient Map<Integer, PartitionField> fieldsBySourceId = null;
+  private transient ListMultimap<Integer, PartitionField> fieldsBySourceId = null;
   private transient Map<String, PartitionField> fieldsByName = null;
   private transient Class<?>[] lazyJavaClasses = null;
   private transient List<PartitionField> fieldList = null;
@@ -91,7 +94,7 @@ public class PartitionSpec implements Serializable {
    * @param fieldId a field id from the source schema
    * @return the {@link PartitionField field} that partitions the given source field
    */
-  public PartitionField getFieldBySourceId(int fieldId) {
+  public List<PartitionField> getFieldsBySourceId(int fieldId) {
     return lazyFieldsBySourceId().get(fieldId);
   }
 
@@ -223,13 +226,13 @@ public class PartitionSpec implements Serializable {
     return fieldsByName;
   }
 
-  private Map<Integer, PartitionField> lazyFieldsBySourceId() {
+  private ListMultimap<Integer, PartitionField> lazyFieldsBySourceId() {
     if (fieldsBySourceId == null) {
-      ImmutableMap.Builder<Integer, PartitionField> byIdBuilder = ImmutableMap.builder();
+      this.fieldsBySourceId = Multimaps
+          .newListMultimap(Maps.newHashMap(), () -> Lists.newArrayListWithCapacity(fields.length));
       for (PartitionField field : fields) {
-        byIdBuilder.put(field.sourceId(), field);
+        fieldsBySourceId.put(field.sourceId(), field);
       }
-      this.fieldsBySourceId = byIdBuilder.build();
     }
 
     return fieldsBySourceId;
@@ -257,8 +260,7 @@ public class PartitionSpec implements Serializable {
     sb.append("[");
     for (PartitionField field : fields) {
       sb.append("\n");
-      sb.append("  ").append(field.name()).append(": ").append(field.transform())
-          .append("(").append(field.sourceId()).append(")");
+      sb.append("  ").append(field);
     }
     if (fields.length > 0) {
       sb.append("\n");
@@ -298,6 +300,7 @@ public class PartitionSpec implements Serializable {
     private final Schema schema;
     private final List<PartitionField> fields = Lists.newArrayList();
     private final Set<String> partitionNames = Sets.newHashSet();
+    private Map<Integer, PartitionField> timeFields = Maps.newHashMap();
     private int specId = 0;
 
     private Builder(Schema schema) {
@@ -312,6 +315,13 @@ public class PartitionSpec implements Serializable {
       partitionNames.add(name);
     }
 
+    private void checkForRedundantPartitions(PartitionField field) {
+      PartitionField timeField = timeFields.get(field.sourceId());
+      Preconditions.checkArgument(timeField == null,
+          "Cannot add redundant partition: %s conflicts with %s", timeField, field);
+      timeFields.put(field.sourceId(), field);
+    }
+
     public Builder withSpecId(int newSpecId) {
       this.specId = newSpecId;
       return this;
@@ -319,7 +329,7 @@ public class PartitionSpec implements Serializable {
 
     private Types.NestedField findSourceColumn(String sourceName) {
       Types.NestedField sourceColumn = schema.findField(sourceName);
-      Preconditions.checkNotNull(sourceColumn, "Cannot find source column: %s", sourceName);
+      Preconditions.checkArgument(sourceColumn != null, "Cannot find source column: %s", sourceName);
       return sourceColumn;
     }
 
@@ -335,8 +345,10 @@ public class PartitionSpec implements Serializable {
       String name = sourceName + "_year";
       checkAndAddPartitionName(name);
       Types.NestedField sourceColumn = findSourceColumn(sourceName);
-      fields.add(new PartitionField(
-          sourceColumn.fieldId(), name, Transforms.year(sourceColumn.type())));
+      PartitionField field = new PartitionField(
+          sourceColumn.fieldId(), name, Transforms.year(sourceColumn.type()));
+      checkForRedundantPartitions(field);
+      fields.add(field);
       return this;
     }
 
@@ -344,8 +356,10 @@ public class PartitionSpec implements Serializable {
       String name = sourceName + "_month";
       checkAndAddPartitionName(name);
       Types.NestedField sourceColumn = findSourceColumn(sourceName);
-      fields.add(new PartitionField(
-          sourceColumn.fieldId(), name, Transforms.month(sourceColumn.type())));
+      PartitionField field = new PartitionField(
+          sourceColumn.fieldId(), name, Transforms.month(sourceColumn.type()));
+      checkForRedundantPartitions(field);
+      fields.add(field);
       return this;
     }
 
@@ -353,8 +367,10 @@ public class PartitionSpec implements Serializable {
       String name = sourceName + "_day";
       checkAndAddPartitionName(name);
       Types.NestedField sourceColumn = findSourceColumn(sourceName);
-      fields.add(new PartitionField(
-          sourceColumn.fieldId(), name, Transforms.day(sourceColumn.type())));
+      PartitionField field = new PartitionField(
+          sourceColumn.fieldId(), name, Transforms.day(sourceColumn.type()));
+      checkForRedundantPartitions(field);
+      fields.add(field);
       return this;
     }
 
@@ -362,8 +378,10 @@ public class PartitionSpec implements Serializable {
       String name = sourceName + "_hour";
       checkAndAddPartitionName(name);
       Types.NestedField sourceColumn = findSourceColumn(sourceName);
-      fields.add(new PartitionField(
-          sourceColumn.fieldId(), name, Transforms.hour(sourceColumn.type())));
+      PartitionField field = new PartitionField(
+          sourceColumn.fieldId(), name, Transforms.hour(sourceColumn.type()));
+      checkForRedundantPartitions(field);
+      fields.add(field);
       return this;
     }
 
diff --git a/api/src/main/java/org/apache/iceberg/expressions/Projections.java b/api/src/main/java/org/apache/iceberg/expressions/Projections.java
index d27f75c..50d0693 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/Projections.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/Projections.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg.expressions;
 
+import java.util.Collection;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.expressions.ExpressionVisitors.ExpressionVisitor;
@@ -205,20 +206,27 @@ public class Projections {
     @Override
     @SuppressWarnings("unchecked")
     public <T> Expression predicate(BoundPredicate<T> pred) {
-      PartitionField part = spec().getFieldBySourceId(pred.ref().fieldId());
-      if (part == null) {
+      Collection<PartitionField> parts = spec().getFieldsBySourceId(pred.ref().fieldId());
+      if (parts == null) {
         // the predicate has no partition column
-        return alwaysTrue();
+        return Expressions.alwaysTrue();
       }
 
-      UnboundPredicate<?> result = ((Transform<T, ?>) part.transform()).project(part.name(), pred);
-
-      if (result != null) {
-        return result;
+      Expression result = Expressions.alwaysTrue();
+      for (PartitionField part : parts) {
+        // consider (d = 2019-01-01) with bucket(7, d) and bucket(5, d)
+        // projections: b1 = bucket(7, '2019-01-01') = 5, b2 = bucket(5, '2019-01-01') = 0
+        // any value where b1 != 5 or any value where b2 != 0 cannot be the '2019-01-01'
+        //
+        // similarly, if partitioning by day(ts) and hour(ts), the more restrictive
+        // projection should be used. ts = 2019-01-01T01:00:00 produces day=2019-01-01 and
+        // hour=2019-01-01-01. the value will be in 2019-01-01-01 and not in 2019-01-01-02.
+        result = Expressions.and(
+            result,
+            ((Transform<T, ?>) part.transform()).project(part.name(), pred));
       }
 
-      // if the predicate could not be projected, it always matches
-      return alwaysTrue();
+      return result;
     }
   }
 
@@ -230,21 +238,25 @@ public class Projections {
     @Override
     @SuppressWarnings("unchecked")
     public <T> Expression predicate(BoundPredicate<T> pred) {
-      PartitionField part = spec().getFieldBySourceId(pred.ref().fieldId());
-      if (part == null) {
+      Collection<PartitionField> parts = spec().getFieldsBySourceId(pred.ref().fieldId());
+      if (parts == null) {
         // the predicate has no partition column
-        return alwaysFalse();
+        return Expressions.alwaysFalse();
       }
 
-      UnboundPredicate<?> result = ((Transform<T, ?>) part.transform())
-          .projectStrict(part.name(), pred);
-
-      if (result != null) {
-        return result;
+      Expression result = Expressions.alwaysFalse();
+      for (PartitionField part : parts) {
+        // consider (ts > 2019-01-01T01:00:00) with day(ts) and hour(ts)
+        // projections: d >= 2019-01-02 and h >= 2019-01-01-02 (note the inclusive bounds).
+        // any timestamp where either projection predicate is true must match the original
+        // predicate. For example, ts = 2019-01-01T03:00:00 matches the hour projection but not
+        // the day, but does match the original predicate.
+        result = Expressions.or(
+            result,
+            ((Transform<T, ?>) part.transform()).projectStrict(part.name(), pred));
       }
 
-      // if the predicate could not be projected, it never matches
-      return alwaysFalse();
+      return result;
     }
   }
 }
diff --git a/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
index 867c7a8..4ffcc2f 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
@@ -19,8 +19,12 @@
 
 package org.apache.iceberg.expressions;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import java.io.Serializable;
 import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
@@ -163,25 +167,36 @@ public class ResidualEvaluator implements Serializable {
       // strict projection evaluates to true.
       //
       // If there is no strict projection or if it evaluates to false, then return the predicate.
-      PartitionField part = spec.getFieldBySourceId(pred.ref().fieldId());
-      if (part == null) {
+      List<PartitionField> parts = spec.getFieldsBySourceId(pred.ref().fieldId());
+      if (parts == null) {
         return pred; // not associated inclusive a partition field, can't be evaluated
       }
 
-      UnboundPredicate<?> strictProjection = ((Transform<T, ?>) part.transform())
-          .projectStrict(part.name(), pred);
+      List<UnboundPredicate<?>> strictProjections = Lists.transform(parts,
+          part -> ((Transform<T, ?>) part.transform()).projectStrict(part.name(), pred));
+
+      if (Iterables.all(strictProjections, Objects::isNull)) {
+        // if there are no strict projections, the predicate must be in the residual
+        return pred;
+      }
+
+      Expression result = Expressions.alwaysFalse();
+      for (UnboundPredicate<?> strictProjection : strictProjections) {
+        if (strictProjection == null) {
+          continue;
+        }
 
-      if (strictProjection != null) {
         Expression bound = strictProjection.bind(spec.partitionType(), caseSensitive);
         if (bound instanceof BoundPredicate) {
-          // the predicate methods will evaluate and return alwaysTrue or alwaysFalse
-          return super.predicate((BoundPredicate<?>) bound);
+          // evaluate the bound predicate, which will return alwaysTrue or alwaysFalse
+          result = Expressions.or(result, super.predicate((BoundPredicate<?>) bound));
+        } else {
+          // update the result expression with the non-predicate residual (e.g. alwaysTrue)
+          result = Expressions.or(result, bound);
         }
-        return bound; // use the non-predicate residual (e.g. alwaysTrue)
       }
 
-      // if the predicate could not be projected, it must be in the residual
-      return pred;
+      return result;
     }
 
     @Override
diff --git a/api/src/test/java/org/apache/iceberg/TestPartitionPaths.java b/api/src/test/java/org/apache/iceberg/TestPartitionPaths.java
index 8cb0e0f..8f2dedc 100644
--- a/api/src/test/java/org/apache/iceberg/TestPartitionPaths.java
+++ b/api/src/test/java/org/apache/iceberg/TestPartitionPaths.java
@@ -41,8 +41,8 @@ public class TestPartitionPaths {
         .bucket("id", 10)
         .build();
 
-    Transform hour = spec.getFieldBySourceId(3).transform();
-    Transform bucket = spec.getFieldBySourceId(1).transform();
+    Transform hour = spec.getFieldsBySourceId(3).get(0).transform();
+    Transform bucket = spec.getFieldsBySourceId(1).get(0).transform();
 
     Literal<Long> ts = Literal.of("2017-12-01T10:12:55.038194").to(Types.TimestampType.withoutZone());
     Object tsHour = hour.apply(ts.value());
diff --git a/api/src/test/java/org/apache/iceberg/TestPartitionSpecValidation.java b/api/src/test/java/org/apache/iceberg/TestPartitionSpecValidation.java
new file mode 100644
index 0000000..24a5716
--- /dev/null
+++ b/api/src/test/java/org/apache/iceberg/TestPartitionSpecValidation.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.junit.Test;
+
+public class TestPartitionSpecValidation {
+  private static final Schema SCHEMA = new Schema(
+      NestedField.required(1, "id", Types.LongType.get()),
+      NestedField.required(2, "ts", Types.TimestampType.withZone()),
+      NestedField.required(3, "another_ts", Types.TimestampType.withZone()),
+      NestedField.required(4, "d", Types.TimestampType.withZone()),
+      NestedField.required(5, "another_d", Types.TimestampType.withZone())
+  );
+
+  @Test
+  public void testMultipleTimestampPartitions() {
+    TestHelpers.assertThrows("Should not allow year(ts) and year(ts)",
+        IllegalArgumentException.class, "Cannot use partition name more than once",
+        () -> PartitionSpec.builderFor(SCHEMA).year("ts").year("ts").build());
+    TestHelpers.assertThrows("Should not allow year(ts) and month(ts)",
+        IllegalArgumentException.class, "Cannot add redundant partition",
+        () -> PartitionSpec.builderFor(SCHEMA).year("ts").month("ts").build());
+    TestHelpers.assertThrows("Should not allow year(ts) and day(ts)",
+        IllegalArgumentException.class, "Cannot add redundant partition",
+        () -> PartitionSpec.builderFor(SCHEMA).year("ts").day("ts").build());
+    TestHelpers.assertThrows("Should not allow year(ts) and hour(ts)",
+        IllegalArgumentException.class, "Cannot add redundant partition",
+        () -> PartitionSpec.builderFor(SCHEMA).year("ts").hour("ts").build());
+
+    TestHelpers.assertThrows("Should not allow month(ts) and month(ts)",
+        IllegalArgumentException.class, "Cannot use partition name more than once",
+        () -> PartitionSpec.builderFor(SCHEMA).month("ts").month("ts").build());
+    TestHelpers.assertThrows("Should not allow month(ts) and day(ts)",
+        IllegalArgumentException.class, "Cannot add redundant partition",
+        () -> PartitionSpec.builderFor(SCHEMA).month("ts").day("ts").build());
+    TestHelpers.assertThrows("Should not allow month(ts) and hour(ts)",
+        IllegalArgumentException.class, "Cannot add redundant partition",
+        () -> PartitionSpec.builderFor(SCHEMA).month("ts").hour("ts").build());
+
+    TestHelpers.assertThrows("Should not allow day(ts) and day(ts)",
+        IllegalArgumentException.class, "Cannot use partition name more than once",
+        () -> PartitionSpec.builderFor(SCHEMA).day("ts").day("ts").build());
+    TestHelpers.assertThrows("Should not allow day(ts) and hour(ts)",
+        IllegalArgumentException.class, "Cannot add redundant partition",
+        () -> PartitionSpec.builderFor(SCHEMA).day("ts").hour("ts").build());
+
+    TestHelpers.assertThrows("Should not allow hour(ts) and hour(ts)",
+        IllegalArgumentException.class, "Cannot use partition name more than once",
+        () -> PartitionSpec.builderFor(SCHEMA).hour("ts").hour("ts").build());
+  }
+
+  @Test
+  public void testMultipleDatePartitions() {
+    TestHelpers.assertThrows("Should not allow year(d) and year(d)",
+        IllegalArgumentException.class, "Cannot use partition name more than once",
+        () -> PartitionSpec.builderFor(SCHEMA).year("d").year("d").build());
+    TestHelpers.assertThrows("Should not allow year(d) and month(d)",
+        IllegalArgumentException.class, "Cannot add redundant partition",
+        () -> PartitionSpec.builderFor(SCHEMA).year("d").month("d").build());
+    TestHelpers.assertThrows("Should not allow year(d) and day(d)",
+        IllegalArgumentException.class, "Cannot add redundant partition",
+        () -> PartitionSpec.builderFor(SCHEMA).year("d").day("d").build());
+
+    TestHelpers.assertThrows("Should not allow month(d) and month(d)",
+        IllegalArgumentException.class, "Cannot use partition name more than once",
+        () -> PartitionSpec.builderFor(SCHEMA).month("d").month("d").build());
+    TestHelpers.assertThrows("Should not allow month(d) and day(d)",
+        IllegalArgumentException.class, "Cannot add redundant partition",
+        () -> PartitionSpec.builderFor(SCHEMA).month("d").day("d").build());
+
+    TestHelpers.assertThrows("Should not allow day(d) and day(d)",
+        IllegalArgumentException.class, "Cannot use partition name more than once",
+        () -> PartitionSpec.builderFor(SCHEMA).day("d").day("d").build());
+  }
+
+  @Test
+  public void testMultipleTimestampPartitionsWithDifferentSourceColumns() {
+    PartitionSpec.builderFor(SCHEMA).year("ts").year("another_ts").build();
+    PartitionSpec.builderFor(SCHEMA).year("ts").month("another_ts").build();
+    PartitionSpec.builderFor(SCHEMA).year("ts").day("another_ts").build();
+    PartitionSpec.builderFor(SCHEMA).year("ts").hour("another_ts").build();
+    PartitionSpec.builderFor(SCHEMA).month("ts").month("another_ts").build();
+    PartitionSpec.builderFor(SCHEMA).month("ts").day("another_ts").build();
+    PartitionSpec.builderFor(SCHEMA).month("ts").hour("another_ts").build();
+    PartitionSpec.builderFor(SCHEMA).day("ts").day("another_ts").build();
+    PartitionSpec.builderFor(SCHEMA).day("ts").hour("another_ts").build();
+    PartitionSpec.builderFor(SCHEMA).hour("ts").hour("another_ts").build();
+  }
+
+  @Test
+  public void testMultipleDatePartitionsWithDifferentSourceColumns() {
+    PartitionSpec.builderFor(SCHEMA).year("d").year("another_d").build();
+    PartitionSpec.builderFor(SCHEMA).year("d").month("another_d").build();
+    PartitionSpec.builderFor(SCHEMA).year("d").day("another_d").build();
+    PartitionSpec.builderFor(SCHEMA).year("d").hour("another_d").build();
+    PartitionSpec.builderFor(SCHEMA).month("d").month("another_d").build();
+    PartitionSpec.builderFor(SCHEMA).month("d").day("another_d").build();
+    PartitionSpec.builderFor(SCHEMA).month("d").hour("another_d").build();
+    PartitionSpec.builderFor(SCHEMA).day("d").day("another_d").build();
+    PartitionSpec.builderFor(SCHEMA).day("d").hour("another_d").build();
+    PartitionSpec.builderFor(SCHEMA).hour("d").hour("another_d").build();
+  }
+
+  @Test
+  public void testMissingSourceColumn() {
+    TestHelpers.assertThrows("Should detect missing source column",
+        IllegalArgumentException.class, "Cannot find source column",
+        () -> PartitionSpec.builderFor(SCHEMA).year("missing").build());
+    TestHelpers.assertThrows("Should detect missing source column",
+        IllegalArgumentException.class, "Cannot find source column",
+        () -> PartitionSpec.builderFor(SCHEMA).month("missing").build());
+    TestHelpers.assertThrows("Should detect missing source column",
+        IllegalArgumentException.class, "Cannot find source column",
+        () -> PartitionSpec.builderFor(SCHEMA).day("missing").build());
+    TestHelpers.assertThrows("Should detect missing source column",
+        IllegalArgumentException.class, "Cannot find source column",
+        () -> PartitionSpec.builderFor(SCHEMA).hour("missing").build());
+    TestHelpers.assertThrows("Should detect missing source column",
+        IllegalArgumentException.class, "Cannot find source column",
+        () -> PartitionSpec.builderFor(SCHEMA).bucket("missing", 4).build());
+    TestHelpers.assertThrows("Should detect missing source column",
+        IllegalArgumentException.class, "Cannot find source column",
+        () -> PartitionSpec.builderFor(SCHEMA).truncate("missing", 5).build());
+    TestHelpers.assertThrows("Should detect missing source column",
+        IllegalArgumentException.class, "Cannot find source column",
+        () -> PartitionSpec.builderFor(SCHEMA).identity("missing").build());
+  }
+}
diff --git a/api/src/test/java/org/apache/iceberg/transforms/TestTransformSerialization.java b/api/src/test/java/org/apache/iceberg/transforms/TestTransformSerialization.java
index c5561a3..0b2a945 100644
--- a/api/src/test/java/org/apache/iceberg/transforms/TestTransformSerialization.java
+++ b/api/src/test/java/org/apache/iceberg/transforms/TestTransformSerialization.java
@@ -43,41 +43,43 @@ public class TestTransformSerialization {
     );
 
     // a spec with all of the allowed transform/type pairs
-    PartitionSpec spec = PartitionSpec.builderFor(schema)
-        .identity("i")
-        .identity("l")
-        .identity("d")
-        .identity("t")
-        .identity("ts")
-        .identity("dec")
-        .identity("s")
-        .identity("u")
-        .identity("f")
-        .identity("b")
-        .bucket("i", 128)
-        .bucket("l", 128)
-        .bucket("d", 128)
-        .bucket("t", 128)
-        .bucket("ts", 128)
-        .bucket("dec", 128)
-        .bucket("s", 128)
-        .bucket("u", 128)
-        .bucket("f", 128)
-        .bucket("b", 128)
-        .year("d")
-        .month("d")
-        .day("d")
-        .year("ts")
-        .month("ts")
-        .day("ts")
-        .hour("ts")
-        .truncate("i", 10)
-        .truncate("l", 10)
-        .truncate("dec", 10)
-        .truncate("s", 10)
-        .build();
+    PartitionSpec[] specs = new PartitionSpec[] {
+        PartitionSpec.builderFor(schema).identity("i").build(),
+        PartitionSpec.builderFor(schema).identity("l").build(),
+        PartitionSpec.builderFor(schema).identity("d").build(),
+        PartitionSpec.builderFor(schema).identity("t").build(),
+        PartitionSpec.builderFor(schema).identity("ts").build(),
+        PartitionSpec.builderFor(schema).identity("dec").build(),
+        PartitionSpec.builderFor(schema).identity("s").build(),
+        PartitionSpec.builderFor(schema).identity("u").build(),
+        PartitionSpec.builderFor(schema).identity("f").build(),
+        PartitionSpec.builderFor(schema).identity("b").build(),
+        PartitionSpec.builderFor(schema).bucket("i", 128).build(),
+        PartitionSpec.builderFor(schema).bucket("l", 128).build(),
+        PartitionSpec.builderFor(schema).bucket("d", 128).build(),
+        PartitionSpec.builderFor(schema).bucket("t", 128).build(),
+        PartitionSpec.builderFor(schema).bucket("ts", 128).build(),
+        PartitionSpec.builderFor(schema).bucket("dec", 128).build(),
+        PartitionSpec.builderFor(schema).bucket("s", 128).build(),
+        PartitionSpec.builderFor(schema).bucket("u", 128).build(),
+        PartitionSpec.builderFor(schema).bucket("f", 128).build(),
+        PartitionSpec.builderFor(schema).bucket("b", 128).build(),
+        PartitionSpec.builderFor(schema).year("d").build(),
+        PartitionSpec.builderFor(schema).month("d").build(),
+        PartitionSpec.builderFor(schema).day("d").build(),
+        PartitionSpec.builderFor(schema).year("ts").build(),
+        PartitionSpec.builderFor(schema).month("ts").build(),
+        PartitionSpec.builderFor(schema).day("ts").build(),
+        PartitionSpec.builderFor(schema).hour("ts").build(),
+        PartitionSpec.builderFor(schema).truncate("i", 10).build(),
+        PartitionSpec.builderFor(schema).truncate("l", 10).build(),
+        PartitionSpec.builderFor(schema).truncate("dec", 10).build(),
+        PartitionSpec.builderFor(schema).truncate("s", 10).build(),
+    };
 
-    Assert.assertEquals("Deserialization should produce equal partition spec",
-        spec, TestHelpers.roundTripSerialize(spec));
+    for (PartitionSpec spec : specs) {
+      Assert.assertEquals("Deserialization should produce equal partition spec",
+          spec, TestHelpers.roundTripSerialize(spec));
+    }
   }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index c202671..d5c8b3c 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -145,8 +145,7 @@ class Writer implements DataSourceWriter {
 
   @Override
   public String toString() {
-    return String.format("IcebergWrite(table=%s, type=%s, format=%s)",
-        table, table.schema().asStruct(), format);
+    return String.format("IcebergWrite(table=%s, format=%s)", table, format);
   }