You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2019/08/01 23:12:12 UTC

[incubator-iceberg] branch vectorized-read updated: [Vectorization] Batch sizing (#344)

This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch vectorized-read
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/vectorized-read by this push:
     new 189ab45  [Vectorization] Batch sizing (#344)
189ab45 is described below

commit 189ab456492dd972bd29b3089637b728515b5dca
Author: Gautam <ga...@gmail.com>
AuthorDate: Thu Aug 1 16:12:07 2019 -0700

    [Vectorization] Batch sizing (#344)
    
    * Add argument validation to HadoopTables#create (#298)
    
    * Install source JAR when running install target (#310)
    
    * Add projectStrict for Dates and Timestamps (#283)
    
    * Correctly publish artifacts on JitPack (#321)
    
    The Gradle install target produces invalid POM files that are missing
    the dependencyManagement section and versions for some dependencies.
    Instead, we directly tell JitPack to run the correct Gradle target.
    
    * Add build info to README.md (#304)
    
    * Convert Iceberg time type to Hive string type (#325)
    
    * Add overwrite option to write builders (#318)
    
    * Fix out of order Pig partition fields (#326)
    
    * Add mapping to Iceberg for external name-based schemas (#338)
    
    * Site: Fix broken link to Iceberg API (#333)
    
    * Add forTable method for Avro WriteBuilder (#322)
    
    * Remove multiple literal strings check rule for scala (#335)
    
    * Fix invalid javadoc url in README.md (#336)
    
    * Use UnicodeUtil.truncateString for Truncate transform. (#340)
    
    This truncates by unicode codepoint instead of Java chars.
    
    * Refactor metrics tests for reuse (#331)
    
    * Spark: Add support for write-audit-publish workflows (#342)
    
    * Avoid write failures if metrics mode is invalid (#301)
    
    * Fix truncateStringMax in UnicodeUtil (#334)
    
    Fixes #328, fixes #329.
    
    Index to codePointAt should be the offset calculated by code points
    
    * [Vectorization] Added batch sizing, switched to BufferAllocator, other minor style fixes.
---
 README.md                                          |   7 +-
 .../java/org/apache/iceberg/SnapshotUpdate.java    |   7 +
 .../apache/iceberg/expressions/Projections.java    |  14 +-
 .../iceberg/expressions/ResidualEvaluator.java     |  71 +++--
 .../java/org/apache/iceberg/transforms/Dates.java  |   7 +-
 .../apache/iceberg/transforms/ProjectionUtil.java  |  84 ++++++
 .../org/apache/iceberg/transforms/Timestamps.java  |   7 +-
 .../org/apache/iceberg/transforms/Truncate.java    |   3 +-
 .../java/org/apache/iceberg/util/UnicodeUtil.java  |   6 +-
 .../iceberg/transforms/TestDatesProjection.java    | 217 +++++++++++++++
 .../transforms/TestTimestampsProjection.java       | 293 +++++++++++++++++++++
 build.gradle                                       |  26 +-
 .../org/apache/iceberg/ManifestListWriter.java     |   1 +
 .../java/org/apache/iceberg/ManifestWriter.java    |   1 +
 .../java/org/apache/iceberg/MetricsConfig.java     |  29 +-
 .../main/java/org/apache/iceberg/SchemaUpdate.java |  32 ++-
 .../java/org/apache/iceberg/SnapshotProducer.java  |  15 +-
 .../java/org/apache/iceberg/TableMetadata.java     |  10 +
 .../java/org/apache/iceberg/TableProperties.java   |   6 +
 .../main/java/org/apache/iceberg/avro/Avro.java    |  19 +-
 .../org/apache/iceberg/avro/AvroFileAppender.java  |   6 +-
 .../org/apache/iceberg/mapping/MappedField.java    |  96 +++++++
 .../org/apache/iceberg/mapping/MappedFields.java   | 110 ++++++++
 .../org/apache/iceberg/mapping/MappingUtil.java    | 261 ++++++++++++++++++
 .../org/apache/iceberg/mapping/NameMapping.java    |  66 +++++
 .../apache/iceberg/mapping/NameMappingParser.java  | 144 ++++++++++
 .../java/org/apache/iceberg/util/JsonUtil.java     |  10 +
 .../java/org/apache/iceberg/TableTestBase.java     |   6 +-
 .../test/java/org/apache/iceberg/TestMetrics.java  |  36 +--
 .../java/org/apache/iceberg/TestMetricsModes.java  |  24 ++
 .../org/apache/iceberg/TestMetricsTruncation.java  |  32 ++-
 .../test/java/org/apache/iceberg/TestTables.java   |   8 +-
 .../apache/iceberg/mapping/TestMappingUpdates.java | 249 +++++++++++++++++
 .../apache/iceberg/mapping/TestNameMapping.java    | 250 ++++++++++++++++++
 .../org/apache/iceberg/hive/HiveTypeConverter.java |   2 +-
 jitpack.yml                                        |   2 +
 orc/src/main/java/org/apache/iceberg/orc/ORC.java  |   9 +
 .../java/org/apache/iceberg/parquet/Parquet.java   |  16 +-
 .../iceberg/parquet/ParquetValueReaders.java       |   2 +-
 .../org/apache/iceberg/parquet/ParquetWriter.java  |   5 +-
 ...itingTest.java => ParquetWritingTestUtils.java} |  15 +-
 .../org/apache/iceberg/parquet/TestParquet.java    |  10 +-
 .../apache/iceberg/parquet/TestParquetMetrics.java |  49 ++++
 .../apache/iceberg/pig/IcebergPigInputFormat.java  |   8 +-
 project/scalastyle_config.xml                      |   6 -
 site/docs/spark.md                                 |   2 +-
 ...cebergSourceFlatParquetDataFilterBenchmark.java |  47 +++-
 .../IcebergSourceFlatParquetDataReadBenchmark.java |  86 +++++-
 .../data/vector/VectorizedParquetValueReaders.java | 110 +++++---
 .../data/vector/VectorizedSparkParquetReaders.java |  67 +++--
 .../apache/iceberg/spark/source/IcebergSource.java |   3 +-
 .../org/apache/iceberg/spark/source/Reader.java    |  24 +-
 .../org/apache/iceberg/spark/source/Writer.java    |  20 ++
 .../data/TestSparkParquetVectorizedReader.java     |   2 +-
 54 files changed, 2437 insertions(+), 201 deletions(-)

diff --git a/README.md b/README.md
index 38bb709..7551daf 100644
--- a/README.md
+++ b/README.md
@@ -35,9 +35,9 @@ The core Java library that tracks table snapshots and metadata is complete, but
 
 The [Iceberg format specification][iceberg-spec] is being actively updated and is open for comment. Until the specification is complete and released, it carries no compatibility guarantees. The spec is currently evolving as the Java reference implementation changes.
 
-[Java API javadocs][iceberg-javadocs] are available for the 0.6.0 tag.
+[Java API javadocs][iceberg-javadocs] are available for the master.
 
-[iceberg-javadocs]: https://iceberg.apache.org/javadoc/0.6.0/index.html?com/netflix/iceberg/package-summary.html
+[iceberg-javadocs]: https://iceberg.apache.org/javadoc/master
 [iceberg-spec]: https://iceberg.apache.org/spec
 
 
@@ -54,6 +54,9 @@ Community discussions happen primarily on the [dev mailing list][dev-list] or on
 
 Iceberg is built using Gradle 5.2.1.
 
+* To invoke a build and run tests: `./gradlew build`
+* To skip tests: `./gradlew build -x test`
+
 Iceberg table support is organized in library modules:
 
 * `iceberg-common` contains utility classes used in other modules
diff --git a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
index 5cabc02..fdd1a63 100644
--- a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
+++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
@@ -45,4 +45,11 @@ public interface SnapshotUpdate<ThisT> extends PendingUpdate<Snapshot> {
    */
   ThisT deleteWith(Consumer<String> deleteFunc);
 
+  /**
+   * Called to stage a snapshot in table metadata, but not update the current snapshot id.
+   *
+   * @return this for method chaining
+   */
+  ThisT stageOnly();
+
 }
diff --git a/api/src/main/java/org/apache/iceberg/expressions/Projections.java b/api/src/main/java/org/apache/iceberg/expressions/Projections.java
index 50d0693..f800b35 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/Projections.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/Projections.java
@@ -221,9 +221,10 @@ public class Projections {
         // similarly, if partitioning by day(ts) and hour(ts), the more restrictive
         // projection should be used. ts = 2019-01-01T01:00:00 produces day=2019-01-01 and
         // hour=2019-01-01-01. the value will be in 2019-01-01-01 and not in 2019-01-01-02.
-        result = Expressions.and(
-            result,
-            ((Transform<T, ?>) part.transform()).project(part.name(), pred));
+        UnboundPredicate<?> inclusiveProjection = ((Transform<T, ?>) part.transform()).project(part.name(), pred);
+        if (inclusiveProjection != null) {
+          result = Expressions.and(result, inclusiveProjection);
+        }
       }
 
       return result;
@@ -251,9 +252,10 @@ public class Projections {
         // any timestamp where either projection predicate is true must match the original
         // predicate. For example, ts = 2019-01-01T03:00:00 matches the hour projection but not
         // the day, but does match the original predicate.
-        result = Expressions.or(
-            result,
-            ((Transform<T, ?>) part.transform()).projectStrict(part.name(), pred));
+        UnboundPredicate<?> strictProjection = ((Transform<T, ?>) part.transform()).projectStrict(part.name(), pred);
+        if (strictProjection != null) {
+          result = Expressions.or(result, strictProjection);
+        }
       }
 
       return result;
diff --git a/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
index 0fae40d..dd3a0b8 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
@@ -19,12 +19,9 @@
 
 package org.apache.iceberg.expressions;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import java.io.Serializable;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Objects;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
@@ -200,10 +197,15 @@ public class ResidualEvaluator implements Serializable {
     @Override
     @SuppressWarnings("unchecked")
     public <T> Expression predicate(BoundPredicate<T> pred) {
-      // Get the strict projection of this predicate in partition data, then use it to determine
-      // whether to return the original predicate. The strict projection returns true iff the
-      // original predicate would have returned true, so the predicate can be eliminated if the
-      // strict projection evaluates to true.
+      /**
+       * Get the strict projection and inclusive projection of this predicate in partition data,
+       * then use them to determine whether to return the original predicate. The strict projection
+       * returns true iff the original predicate would have returned true, so the predicate can be
+       * eliminated if the strict projection evaluates to true. Similarly the inclusive projection
+       * returns false iff the original predicate would have returned false, so the predicate can
+       * also be eliminated if the inclusive projection evaluates to false.
+       */
+
       //
       // If there is no strict projection or if it evaluates to false, then return the predicate.
       List<PartitionField> parts = spec.getFieldsBySourceId(pred.ref().fieldId());
@@ -211,31 +213,50 @@ public class ResidualEvaluator implements Serializable {
         return pred; // not associated inclusive a partition field, can't be evaluated
       }
 
-      List<UnboundPredicate<?>> strictProjections = Lists.transform(parts,
-          part -> ((Transform<T, ?>) part.transform()).projectStrict(part.name(), pred));
+      for (PartitionField part : parts) {
 
-      if (Iterables.all(strictProjections, Objects::isNull)) {
-        // if there are no strict projections, the predicate must be in the residual
-        return pred;
-      }
+        // checking the strict projection
+        UnboundPredicate<?> strictProjection = ((Transform<T, ?>) part.transform()).projectStrict(part.name(), pred);
+        Expression strictResult = null;
+
+        if (strictProjection != null) {
+          Expression bound = strictProjection.bind(spec.partitionType(), caseSensitive);
+          if (bound instanceof BoundPredicate) {
+            strictResult = super.predicate((BoundPredicate<?>) bound);
+          } else {
+            // if the result is not a predicate, then it must be a constant like alwaysTrue or alwaysFalse
+            strictResult = bound;
+          }
+        }
 
-      Expression result = Expressions.alwaysFalse();
-      for (UnboundPredicate<?> strictProjection : strictProjections) {
-        if (strictProjection == null) {
-          continue;
+        if (strictResult != null && strictResult.op() == Expression.Operation.TRUE) {
+          // If strict is true, returning true
+          return Expressions.alwaysTrue();
         }
 
-        Expression bound = strictProjection.bind(spec.partitionType(), caseSensitive);
-        if (bound instanceof BoundPredicate) {
-          // evaluate the bound predicate, which will return alwaysTrue or alwaysFalse
-          result = Expressions.or(result, super.predicate((BoundPredicate<?>) bound));
-        } else {
-          // update the result expression with the non-predicate residual (e.g. alwaysTrue)
-          result = Expressions.or(result, bound);
+        // checking the inclusive projection
+        UnboundPredicate<?> inclusiveProjection = ((Transform<T, ?>) part.transform()).project(part.name(), pred);
+        Expression inclusiveResult = null;
+        if (inclusiveProjection != null) {
+          Expression boundInclusive = inclusiveProjection.bind(spec.partitionType(), caseSensitive);
+          if (boundInclusive instanceof BoundPredicate) {
+            // using predicate method specific to inclusive
+            inclusiveResult = super.predicate((BoundPredicate<?>) boundInclusive);
+          } else {
+            // if the result is not a predicate, then it must be a constant like alwaysTrue or alwaysFalse
+            inclusiveResult = boundInclusive;
+          }
         }
+
+        if (inclusiveResult != null && inclusiveResult.op() == Expression.Operation.FALSE) {
+          // If inclusive is false, returning false
+          return Expressions.alwaysFalse();
+        }
+
       }
 
-      return result;
+      // neither strict not inclusive predicate was conclusive, returning the original pred
+      return pred;
     }
 
     @Override
diff --git a/api/src/main/java/org/apache/iceberg/transforms/Dates.java b/api/src/main/java/org/apache/iceberg/transforms/Dates.java
index a57d6d9..94714d2 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/Dates.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/Dates.java
@@ -73,8 +73,11 @@ enum Dates implements Transform<Integer, Integer> {
   }
 
   @Override
-  public UnboundPredicate<Integer> projectStrict(String fieldName, BoundPredicate<Integer> predicate) {
-    return null;
+  public UnboundPredicate<Integer> projectStrict(String fieldName, BoundPredicate<Integer> pred) {
+    if (pred.op() == NOT_NULL || pred.op() == IS_NULL) {
+      return Expressions.predicate(pred.op(), fieldName);
+    }
+    return ProjectionUtil.truncateIntegerStrict(fieldName, pred, this);
   }
 
   @Override
diff --git a/api/src/main/java/org/apache/iceberg/transforms/ProjectionUtil.java b/api/src/main/java/org/apache/iceberg/transforms/ProjectionUtil.java
index 04da036..ef1e0c3 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/ProjectionUtil.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/ProjectionUtil.java
@@ -52,6 +52,90 @@ class ProjectionUtil {
     }
   }
 
+  static UnboundPredicate<Integer> truncateIntegerStrict(
+      String name, BoundPredicate<Integer> pred, Transform<Integer, Integer> transform) {
+    int boundary = pred.literal().value();
+    switch (pred.op()) {
+      case LT:
+        // predicate would be <= the previous partition
+        return predicate(Expression.Operation.LT_EQ, name, transform.apply(boundary) - 1);
+      case LT_EQ:
+        // Checking if the literal is at the upper partition boundary
+        if (transform.apply(boundary + 1).equals(transform.apply(boundary))) {
+          // Literal is not at upper boundary, for eg: 2019-07-02T02:12:34.0000
+          // the predicate can be < 2019-07-01
+          return predicate(Expression.Operation.LT_EQ, name, transform.apply(boundary) - 1);
+        } else {
+          // Literal is not at upper boundary, for eg: 2019-07-02T23:59:59.99999
+          // the predicate can be <= 2019-07-02
+          return predicate(Expression.Operation.LT_EQ, name, transform.apply(boundary));
+        }
+      case GT:
+        // predicate would be >= the next partition
+        return predicate(Expression.Operation.GT_EQ, name, transform.apply(boundary) + 1);
+      case GT_EQ:
+        // Checking if the literal is at the lower partition boundary
+        if (transform.apply(boundary - 1).equals(transform.apply(boundary))) {
+          // Literal is not at lower boundary, for eg: 2019-07-02T02:12:34.0000
+          // the predicate can be >= 2019-07-03
+          return predicate(Expression.Operation.GT_EQ, name, transform.apply(boundary) + 1);
+        } else {
+          // Literal was at the lower boundary, for eg: 2019-07-02T00:00:00.0000
+          // the predicate can be >= 2019-07-02
+          return predicate(Expression.Operation.GT_EQ, name, transform.apply(boundary));
+        }
+      case NOT_EQ:
+        return predicate(Expression.Operation.NOT_EQ, name, transform.apply(boundary));
+      case EQ:
+        // there is no predicate that guarantees equality because adjacent ints transform to the same value
+        return null;
+      default:
+        return null;
+    }
+  }
+
+  static UnboundPredicate<Integer> truncateLongStrict(
+      String name, BoundPredicate<Long> pred, Transform<Long, Integer> transform) {
+    long boundary = pred.literal().value();
+    switch (pred.op()) {
+      case LT:
+        // predicate would be <= the previous partition
+        return predicate(Expression.Operation.LT_EQ, name, transform.apply(boundary) - 1);
+      case LT_EQ:
+        // Checking if the literal is at the upper partition boundary
+        if (transform.apply(boundary + 1L).equals(transform.apply(boundary))) {
+          // Literal is not at upper boundary, for eg: 2019-07-02T02:12:34.0000
+          // the predicate can be <= 2019-07-01
+          return predicate(Expression.Operation.LT_EQ, name, transform.apply(boundary) - 1);
+        } else {
+          // Literal is not at upper boundary, for eg: 2019-07-02T23:59:59.99999
+          // the predicate can be <= 2019-07-02
+          return predicate(Expression.Operation.LT_EQ, name, transform.apply(boundary));
+        }
+      case GT:
+        // predicate would be >= the next partition
+        return predicate(Expression.Operation.GT_EQ, name, transform.apply(boundary) + 1);
+      case GT_EQ:
+        // Checking if the literal is at the lower partition boundary
+        if (transform.apply(boundary - 1L).equals(transform.apply(boundary))) {
+          // Literal is not at lower boundary, for eg: 2019-07-02T02:12:34.0000
+          // the predicate can be >= 2019-07-03
+          return predicate(Expression.Operation.GT_EQ, name, transform.apply(boundary) + 1);
+        } else {
+          // Literal was at the lower boundary, for eg: 2019-07-02T00:00:00.0000
+          // the predicate can be >= 2019-07-02
+          return predicate(Expression.Operation.GT_EQ, name, transform.apply(boundary));
+        }
+      case NOT_EQ:
+        return predicate(Expression.Operation.NOT_EQ, name, transform.apply(boundary));
+      case EQ:
+        // there is no predicate that guarantees equality because adjacent longs transform to the same value
+        return null;
+      default:
+        return null;
+    }
+  }
+
   static <T> UnboundPredicate<T> truncateLong(
       String name, BoundPredicate<Long> pred, Transform<Long, T> transform) {
     long boundary = pred.literal().value();
diff --git a/api/src/main/java/org/apache/iceberg/transforms/Timestamps.java b/api/src/main/java/org/apache/iceberg/transforms/Timestamps.java
index f01ea05..7259def 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/Timestamps.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/Timestamps.java
@@ -76,8 +76,11 @@ enum Timestamps implements Transform<Long, Integer> {
   }
 
   @Override
-  public UnboundPredicate<Integer> projectStrict(String fieldName, BoundPredicate<Long> predicate) {
-    return null;
+  public UnboundPredicate<Integer> projectStrict(String fieldName, BoundPredicate<Long> pred) {
+    if (pred.op() == NOT_NULL || pred.op() == IS_NULL) {
+      return Expressions.predicate(pred.op(), fieldName);
+    }
+    return ProjectionUtil.truncateLongStrict(fieldName, pred, this);
   }
 
   @Override
diff --git a/api/src/main/java/org/apache/iceberg/transforms/Truncate.java b/api/src/main/java/org/apache/iceberg/transforms/Truncate.java
index d65d646..2eacaa2 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/Truncate.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/Truncate.java
@@ -27,6 +27,7 @@ import org.apache.iceberg.expressions.BoundPredicate;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.UnboundPredicate;
 import org.apache.iceberg.types.Type;
+import org.apache.iceberg.util.UnicodeUtil;
 
 import static org.apache.iceberg.expressions.Expression.Operation.IS_NULL;
 import static org.apache.iceberg.expressions.Expression.Operation.LT;
@@ -233,7 +234,7 @@ abstract class Truncate<T> implements Transform<T, T> {
 
     @Override
     public CharSequence apply(CharSequence value) {
-      return value.subSequence(0, Math.min(value.length(), length));
+      return UnicodeUtil.truncateString(value, length);
     }
 
     @Override
diff --git a/api/src/main/java/org/apache/iceberg/util/UnicodeUtil.java b/api/src/main/java/org/apache/iceberg/util/UnicodeUtil.java
index 1eaed21..f76ec73 100644
--- a/api/src/main/java/org/apache/iceberg/util/UnicodeUtil.java
+++ b/api/src/main/java/org/apache/iceberg/util/UnicodeUtil.java
@@ -79,11 +79,11 @@ public class UnicodeUtil {
 
     // Try incrementing the code points from the end
     for (int i = length - 1; i >= 0; i--) {
-      int nextCodePoint = truncatedStringBuffer.codePointAt(i) + 1;
+      // Get the offset in the truncated string buffer where the number of unicode characters = i
+      int offsetByCodePoint = truncatedStringBuffer.offsetByCodePoints(0, i);
+      int nextCodePoint = truncatedStringBuffer.codePointAt(offsetByCodePoint) + 1;
       // No overflow
       if (nextCodePoint != 0 && Character.isValidCodePoint(nextCodePoint)) {
-        // Get the offset in the truncated string buffer where the number of unicode characters = i
-        int offsetByCodePoint = truncatedStringBuffer.offsetByCodePoints(0, i);
         truncatedStringBuffer.setLength(offsetByCodePoint);
         // Append next code point to the truncated substring
         truncatedStringBuffer.appendCodePoint(nextCodePoint);
diff --git a/api/src/test/java/org/apache/iceberg/transforms/TestDatesProjection.java b/api/src/test/java/org/apache/iceberg/transforms/TestDatesProjection.java
new file mode 100644
index 0000000..7b4ee78
--- /dev/null
+++ b/api/src/test/java/org/apache/iceberg/transforms/TestDatesProjection.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.transforms;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.expressions.UnboundPredicate;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.TestHelpers.assertAndUnwrapUnbound;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.apache.iceberg.expressions.Expressions.greaterThan;
+import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.lessThan;
+import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.notEqual;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestDatesProjection {
+  private static final Types.DateType TYPE = Types.DateType.get();
+  private static final Schema SCHEMA = new Schema(optional(1, "date", TYPE));
+
+  public void assertProjectionStrict(PartitionSpec spec, UnboundPredicate<?> filter,
+                                     Expression.Operation expectedOp, String expectedLiteral) {
+
+    Expression projection = Projections.strict(spec).project(filter);
+    UnboundPredicate<?> predicate = assertAndUnwrapUnbound(projection);
+
+    Assert.assertEquals(expectedOp, predicate.op());
+
+    Literal literal = predicate.literal();
+    Dates transform = (Dates) spec.getFieldsBySourceId(1).get(0).transform();
+    String output = transform.toHumanString((int) literal.value());
+    Assert.assertEquals(expectedLiteral, output);
+  }
+
+  public void assertProjectionStrictValue(PartitionSpec spec, UnboundPredicate<?> filter,
+                                          Expression.Operation expectedOp) {
+
+    Expression projection = Projections.strict(spec).project(filter);
+    Assert.assertEquals(projection.op(), expectedOp);
+  }
+
+  public void assertProjectionInclusiveValue(PartitionSpec spec, UnboundPredicate<?> filter,
+                                             Expression.Operation expectedOp) {
+
+    Expression projection = Projections.inclusive(spec).project(filter);
+    Assert.assertEquals(projection.op(), expectedOp);
+  }
+
+  public void assertProjectionInclusive(PartitionSpec spec, UnboundPredicate<?> filter,
+                                        Expression.Operation expectedOp, String expectedLiteral) {
+    Expression projection = Projections.inclusive(spec).project(filter);
+    UnboundPredicate<?> predicate = assertAndUnwrapUnbound(projection);
+
+    Assert.assertEquals(predicate.op(), expectedOp);
+
+    Literal literal = predicate.literal();
+    Dates transform = (Dates) spec.getFieldsBySourceId(1).get(0).transform();
+    String output = transform.toHumanString((int) literal.value());
+    Assert.assertEquals(expectedLiteral, output);
+  }
+
+  @Test
+  public void testMonthStrictLowerBound() {
+    Integer date = (Integer) Literal.of("2017-01-01").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).month("date").build();
+
+    assertProjectionStrict(spec, lessThan("date", date), Expression.Operation.LT_EQ, "2016-12");
+    assertProjectionStrict(spec, lessThanOrEqual("date", date), Expression.Operation.LT_EQ, "2016-12");
+    assertProjectionStrict(spec, greaterThan("date", date), Expression.Operation.GT_EQ, "2017-02");
+    assertProjectionStrict(spec, greaterThanOrEqual("date", date), Expression.Operation.GT_EQ, "2017-01");
+    assertProjectionStrict(spec, notEqual("date", date), Expression.Operation.NOT_EQ, "2017-01");
+    assertProjectionStrictValue(spec, equal("date", date), Expression.Operation.FALSE);
+  }
+
+  @Test
+  public void testMonthStrictUpperBound() {
+    Integer date = (Integer) Literal.of("2017-12-31").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).month("date").build();
+
+    assertProjectionStrict(spec, lessThan("date", date), Expression.Operation.LT_EQ, "2017-11");
+    assertProjectionStrict(spec, lessThanOrEqual("date", date), Expression.Operation.LT_EQ, "2017-12");
+    assertProjectionStrict(spec, greaterThan("date", date), Expression.Operation.GT_EQ, "2018-01");
+    assertProjectionStrict(spec, greaterThanOrEqual("date", date), Expression.Operation.GT_EQ, "2018-01");
+    assertProjectionStrict(spec, notEqual("date", date), Expression.Operation.NOT_EQ, "2017-12");
+    assertProjectionStrictValue(spec, equal("date", date), Expression.Operation.FALSE);
+  }
+
+  @Test
+  public void testMonthInclusiveLowerBound() {
+    Integer date = (Integer) Literal.of("2017-12-01").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).month("date").build();
+
+    assertProjectionInclusive(spec, lessThan("date", date), Expression.Operation.LT_EQ, "2017-11");
+    assertProjectionInclusive(spec, lessThanOrEqual("date", date), Expression.Operation.LT_EQ, "2017-12");
+    assertProjectionInclusive(spec, greaterThan("date", date), Expression.Operation.GT_EQ, "2017-12");
+    assertProjectionInclusive(spec, greaterThanOrEqual("date", date), Expression.Operation.GT_EQ, "2017-12");
+    assertProjectionInclusive(spec, equal("date", date), Expression.Operation.EQ, "2017-12");
+    assertProjectionInclusiveValue(spec, notEqual("date", date), Expression.Operation.TRUE);
+  }
+
+  @Test
+  public void testMonthInclusiveUpperBound() {
+    Integer date = (Integer) Literal.of("2017-12-31").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).month("date").build();
+
+    assertProjectionInclusive(spec, lessThan("date", date), Expression.Operation.LT_EQ, "2017-12");
+    assertProjectionInclusive(spec, lessThanOrEqual("date", date), Expression.Operation.LT_EQ, "2017-12");
+    assertProjectionInclusive(spec, greaterThan("date", date), Expression.Operation.GT_EQ, "2018-01");
+    assertProjectionInclusive(spec, greaterThanOrEqual("date", date), Expression.Operation.GT_EQ, "2017-12");
+    assertProjectionInclusive(spec, equal("date", date), Expression.Operation.EQ, "2017-12");
+    assertProjectionInclusiveValue(spec, notEqual("date", date), Expression.Operation.TRUE);
+  }
+
+  @Test
+  public void testDayStrict() {
+    Integer date = (Integer) Literal.of("2017-01-01").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).day("date").build();
+
+    assertProjectionStrict(spec, lessThan("date", date), Expression.Operation.LT_EQ, "2016-12-31");
+    // should be the same date for <=
+    assertProjectionStrict(spec, lessThanOrEqual("date", date), Expression.Operation.LT_EQ, "2017-01-01");
+    assertProjectionStrict(spec, greaterThan("date", date), Expression.Operation.GT_EQ, "2017-01-02");
+    // should be the same date for >=
+    assertProjectionStrict(spec, greaterThanOrEqual("date", date), Expression.Operation.GT_EQ, "2017-01-01");
+    assertProjectionStrict(spec, notEqual("date", date), Expression.Operation.NOT_EQ, "2017-01-01");
+    assertProjectionStrictValue(spec, equal("date", date), Expression.Operation.FALSE);
+  }
+
+  @Test
+  public void testDayInclusive() {
+    Integer date = (Integer) Literal.of("2017-01-01").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).day("date").build();
+
+    assertProjectionInclusive(spec, lessThan("date", date), Expression.Operation.LT_EQ, "2016-12-31");
+    assertProjectionInclusive(spec, lessThanOrEqual("date", date), Expression.Operation.LT_EQ, "2017-01-01");
+    assertProjectionInclusive(spec, greaterThan("date", date), Expression.Operation.GT_EQ, "2017-01-02");
+    assertProjectionInclusive(spec, greaterThanOrEqual("date", date), Expression.Operation.GT_EQ, "2017-01-01");
+    assertProjectionInclusive(spec, equal("date", date), Expression.Operation.EQ, "2017-01-01");
+    assertProjectionInclusiveValue(spec, notEqual("date", date), Expression.Operation.TRUE);
+  }
+
+  @Test
+  public void testYearStrictLowerBound() {
+    Integer date = (Integer) Literal.of("2017-01-01").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).year("date").build();
+
+    assertProjectionStrict(spec, lessThan("date", date), Expression.Operation.LT_EQ, "2016");
+    assertProjectionStrict(spec, lessThanOrEqual("date", date), Expression.Operation.LT_EQ, "2016");
+    assertProjectionStrict(spec, greaterThan("date", date), Expression.Operation.GT_EQ, "2018");
+    assertProjectionStrict(spec, greaterThanOrEqual("date", date), Expression.Operation.GT_EQ, "2017");
+    assertProjectionStrict(spec, notEqual("date", date), Expression.Operation.NOT_EQ, "2017");
+    assertProjectionStrictValue(spec, equal("date", date), Expression.Operation.FALSE);
+  }
+
+  @Test
+  public void testYearStrictUpperBound() {
+    Integer date = (Integer) Literal.of("2017-12-31").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).year("date").build();
+
+    assertProjectionStrict(spec, lessThan("date", date), Expression.Operation.LT_EQ, "2016");
+    assertProjectionStrict(spec, lessThanOrEqual("date", date), Expression.Operation.LT_EQ, "2017");
+    assertProjectionStrict(spec, greaterThan("date", date), Expression.Operation.GT_EQ, "2018");
+    assertProjectionStrict(spec, greaterThanOrEqual("date", date), Expression.Operation.GT_EQ, "2018");
+    assertProjectionStrict(spec, notEqual("date", date), Expression.Operation.NOT_EQ, "2017");
+    assertProjectionStrictValue(spec, equal("date", date), Expression.Operation.FALSE);
+  }
+
+  @Test
+  public void testYearInclusiveLowerBound() {
+    Integer date = (Integer) Literal.of("2017-01-01").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).year("date").build();
+
+    assertProjectionInclusive(spec, lessThan("date", date), Expression.Operation.LT_EQ, "2016");
+    assertProjectionInclusive(spec, lessThanOrEqual("date", date), Expression.Operation.LT_EQ, "2017");
+    assertProjectionInclusive(spec, greaterThan("date", date), Expression.Operation.GT_EQ, "2017");
+    assertProjectionInclusive(spec, greaterThanOrEqual("date", date), Expression.Operation.GT_EQ, "2017");
+    assertProjectionInclusive(spec, equal("date", date), Expression.Operation.EQ, "2017");
+    assertProjectionInclusiveValue(spec, notEqual("date", date), Expression.Operation.TRUE);
+  }
+
+  @Test
+  public void testYearInclusiveUpperBound() {
+    Integer date = (Integer) Literal.of("2017-12-31").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).year("date").build();
+
+    assertProjectionInclusive(spec, lessThan("date", date), Expression.Operation.LT_EQ, "2017");
+    assertProjectionInclusive(spec, lessThanOrEqual("date", date), Expression.Operation.LT_EQ, "2017");
+    assertProjectionInclusive(spec, greaterThan("date", date), Expression.Operation.GT_EQ, "2018");
+    assertProjectionInclusive(spec, greaterThanOrEqual("date", date), Expression.Operation.GT_EQ, "2017");
+    assertProjectionInclusive(spec, equal("date", date), Expression.Operation.EQ, "2017");
+    assertProjectionInclusiveValue(spec, notEqual("date", date), Expression.Operation.TRUE);
+  }
+}
diff --git a/api/src/test/java/org/apache/iceberg/transforms/TestTimestampsProjection.java b/api/src/test/java/org/apache/iceberg/transforms/TestTimestampsProjection.java
new file mode 100644
index 0000000..5d3c5b0
--- /dev/null
+++ b/api/src/test/java/org/apache/iceberg/transforms/TestTimestampsProjection.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.transforms;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.expressions.UnboundPredicate;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.TestHelpers.assertAndUnwrapUnbound;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.apache.iceberg.expressions.Expressions.greaterThan;
+import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.lessThan;
+import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.notEqual;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestTimestampsProjection {
+  private static final Types.TimestampType TYPE = Types.TimestampType.withoutZone();
+  private static final Schema SCHEMA = new Schema(optional(1, "timestamp", TYPE));
+
+  public void assertProjectionStrict(PartitionSpec spec, UnboundPredicate<?> filter,
+                                     Expression.Operation expectedOp, String expectedLiteral) {
+
+    Expression projection = Projections.strict(spec).project(filter);
+    UnboundPredicate<?> predicate = assertAndUnwrapUnbound(projection);
+
+    Assert.assertEquals(expectedOp, predicate.op());
+
+    Literal literal = predicate.literal();
+    Timestamps transform = (Timestamps) spec.getFieldsBySourceId(1).get(0).transform();
+    String output = transform.toHumanString((int) literal.value());
+    Assert.assertEquals(expectedLiteral, output);
+  }
+
+  public void assertProjectionStrictValue(PartitionSpec spec, UnboundPredicate<?> filter,
+                                          Expression.Operation expectedOp) {
+
+    Expression projection = Projections.strict(spec).project(filter);
+    Assert.assertEquals(projection.op(), expectedOp);
+  }
+
+  public void assertProjectionInclusiveValue(PartitionSpec spec, UnboundPredicate<?> filter,
+                                             Expression.Operation expectedOp) {
+
+    Expression projection = Projections.inclusive(spec).project(filter);
+    Assert.assertEquals(projection.op(), expectedOp);
+  }
+
+  public void assertProjectionInclusive(PartitionSpec spec, UnboundPredicate<?> filter,
+                                        Expression.Operation expectedOp, String expectedLiteral) {
+    Expression projection = Projections.inclusive(spec).project(filter);
+    UnboundPredicate<?> predicate = assertAndUnwrapUnbound(projection);
+
+    Assert.assertEquals(predicate.op(), expectedOp);
+
+    Literal literal = predicate.literal();
+    Timestamps transform = (Timestamps) spec.getFieldsBySourceId(1).get(0).transform();
+    String output = transform.toHumanString((int) literal.value());
+    Assert.assertEquals(expectedLiteral, output);
+  }
+
+  @Test
+  public void testMonthStrictLowerBound() {
+    Long date = (long) Literal.of("2017-12-01T00:00:00.00000").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).month("timestamp").build();
+
+    assertProjectionStrict(spec, lessThan("timestamp", date), Expression.Operation.LT_EQ, "2017-11");
+    assertProjectionStrict(spec, lessThanOrEqual("timestamp", date), Expression.Operation.LT_EQ, "2017-11");
+    assertProjectionStrict(spec, greaterThan("timestamp", date), Expression.Operation.GT_EQ, "2018-01");
+    assertProjectionStrict(spec, greaterThanOrEqual("timestamp", date), Expression.Operation.GT_EQ, "2017-12");
+    assertProjectionStrict(spec, notEqual("timestamp", date), Expression.Operation.NOT_EQ, "2017-12");
+    assertProjectionStrictValue(spec, equal("timestamp", date), Expression.Operation.FALSE);
+  }
+
+  @Test
+  public void testMonthStrictUpperBound() {
+    Long date = (long) Literal.of("2017-12-31T23:59:59.999999").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).month("timestamp").build();
+
+    assertProjectionStrict(spec, lessThan("timestamp", date), Expression.Operation.LT_EQ, "2017-11");
+    assertProjectionStrict(spec, lessThanOrEqual("timestamp", date), Expression.Operation.LT_EQ, "2017-12");
+    assertProjectionStrict(spec, greaterThan("timestamp", date), Expression.Operation.GT_EQ, "2018-01");
+    assertProjectionStrict(spec, greaterThanOrEqual("timestamp", date), Expression.Operation.GT_EQ, "2018-01");
+    assertProjectionStrict(spec, notEqual("timestamp", date), Expression.Operation.NOT_EQ, "2017-12");
+    assertProjectionStrictValue(spec, equal("timestamp", date), Expression.Operation.FALSE);
+  }
+
+  @Test
+  public void testMonthInclusiveLowerBound() {
+    Long date = (long) Literal.of("2017-12-01T00:00:00.00000").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).month("timestamp").build();
+
+    assertProjectionInclusive(spec, lessThan("timestamp", date), Expression.Operation.LT_EQ, "2017-11");
+    assertProjectionInclusive(spec, lessThanOrEqual("timestamp", date), Expression.Operation.LT_EQ, "2017-12");
+    assertProjectionInclusive(spec, greaterThan("timestamp", date), Expression.Operation.GT_EQ, "2017-12");
+    assertProjectionInclusive(spec, greaterThanOrEqual("timestamp", date), Expression.Operation.GT_EQ, "2017-12");
+    assertProjectionInclusive(spec, equal("timestamp", date), Expression.Operation.EQ, "2017-12");
+    assertProjectionInclusiveValue(spec, notEqual("timestamp", date), Expression.Operation.TRUE);
+  }
+
+  @Test
+  public void testMonthInclusiveUpperBound() {
+    Long date = (long) Literal.of("2017-12-01T23:59:59.999999").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).month("timestamp").build();
+
+    assertProjectionInclusive(spec, lessThan("timestamp", date), Expression.Operation.LT_EQ, "2017-12");
+    assertProjectionInclusive(spec, lessThanOrEqual("timestamp", date), Expression.Operation.LT_EQ, "2017-12");
+    assertProjectionInclusive(spec, greaterThan("timestamp", date), Expression.Operation.GT_EQ, "2017-12");
+    assertProjectionInclusive(spec, greaterThanOrEqual("timestamp", date), Expression.Operation.GT_EQ, "2017-12");
+    assertProjectionInclusive(spec, equal("timestamp", date), Expression.Operation.EQ, "2017-12");
+    assertProjectionInclusiveValue(spec, notEqual("timestamp", date), Expression.Operation.TRUE);
+  }
+
+  @Test
+  public void testDayStrictLowerBound() {
+    Long date = (long) Literal.of("2017-12-01T00:00:00.00000").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).day("timestamp").build();
+
+    assertProjectionStrict(spec, lessThan("timestamp", date), Expression.Operation.LT_EQ, "2017-11-30");
+    assertProjectionStrict(spec, lessThanOrEqual("timestamp", date), Expression.Operation.LT_EQ, "2017-11-30");
+    assertProjectionStrict(spec, greaterThan("timestamp", date), Expression.Operation.GT_EQ, "2017-12-02");
+    assertProjectionStrict(spec, greaterThanOrEqual("timestamp", date), Expression.Operation.GT_EQ, "2017-12-01");
+    assertProjectionStrict(spec, notEqual("timestamp", date), Expression.Operation.NOT_EQ, "2017-12-01");
+    assertProjectionStrictValue(spec, equal("timestamp", date), Expression.Operation.FALSE);
+  }
+
+  @Test
+  public void testDayStrictUpperBound() {
+    Long date = (long) Literal.of("2017-12-01T23:59:59.999999").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).day("timestamp").build();
+
+    assertProjectionStrict(spec, lessThan("timestamp", date), Expression.Operation.LT_EQ, "2017-11-30");
+    assertProjectionStrict(spec, lessThanOrEqual("timestamp", date), Expression.Operation.LT_EQ, "2017-12-01");
+    assertProjectionStrict(spec, greaterThan("timestamp", date), Expression.Operation.GT_EQ, "2017-12-02");
+    assertProjectionStrict(spec, greaterThanOrEqual("timestamp", date), Expression.Operation.GT_EQ, "2017-12-02");
+    assertProjectionStrict(spec, notEqual("timestamp", date), Expression.Operation.NOT_EQ, "2017-12-01");
+    assertProjectionStrictValue(spec, equal("timestamp", date), Expression.Operation.FALSE);
+  }
+
+  @Test
+  public void testDayInclusiveLowerBound() {
+    Long date = (long) Literal.of("2017-12-01T00:00:00.00000").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).day("timestamp").build();
+
+    assertProjectionInclusive(spec, lessThan("timestamp", date), Expression.Operation.LT_EQ, "2017-11-30");
+    assertProjectionInclusive(spec, lessThanOrEqual("timestamp", date), Expression.Operation.LT_EQ, "2017-12-01");
+    assertProjectionInclusive(spec, greaterThan("timestamp", date), Expression.Operation.GT_EQ, "2017-12-01");
+    assertProjectionInclusive(spec, greaterThanOrEqual("timestamp", date), Expression.Operation.GT_EQ, "2017-12-01");
+    assertProjectionInclusive(spec, equal("timestamp", date), Expression.Operation.EQ, "2017-12-01");
+    assertProjectionInclusiveValue(spec, notEqual("timestamp", date), Expression.Operation.TRUE);
+  }
+
+  @Test
+  public void testDayInclusiveUpperBound() {
+    Long date = (long) Literal.of("2017-12-01T23:59:59.999999").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).day("timestamp").build();
+
+    assertProjectionInclusive(spec, lessThan("timestamp", date), Expression.Operation.LT_EQ, "2017-12-01");
+    assertProjectionInclusive(spec, lessThanOrEqual("timestamp", date), Expression.Operation.LT_EQ, "2017-12-01");
+    assertProjectionInclusive(spec, greaterThan("timestamp", date), Expression.Operation.GT_EQ, "2017-12-02");
+    assertProjectionInclusive(spec, greaterThanOrEqual("timestamp", date), Expression.Operation.GT_EQ, "2017-12-01");
+    assertProjectionInclusive(spec, equal("timestamp", date), Expression.Operation.EQ, "2017-12-01");
+    assertProjectionInclusiveValue(spec, notEqual("timestamp", date), Expression.Operation.TRUE);
+  }
+
+  @Test
+  public void testYearStrictLowerBound() {
+    Long date = (long) Literal.of("2017-01-01T00:00:00.00000").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).year("timestamp").build();
+
+    assertProjectionStrict(spec, lessThan("timestamp", date), Expression.Operation.LT_EQ, "2016");
+    assertProjectionStrict(spec, lessThanOrEqual("timestamp", date), Expression.Operation.LT_EQ, "2016");
+    assertProjectionStrict(spec, greaterThan("timestamp", date), Expression.Operation.GT_EQ, "2018");
+    assertProjectionStrict(spec, greaterThanOrEqual("timestamp", date), Expression.Operation.GT_EQ, "2017");
+    assertProjectionStrict(spec, notEqual("timestamp", date), Expression.Operation.NOT_EQ, "2017");
+    assertProjectionStrictValue(spec, equal("timestamp", date), Expression.Operation.FALSE);
+  }
+
+  @Test
+  public void testYearStrictUpperBound() {
+    Long date = (long) Literal.of("2017-12-31T23:59:59.999999").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).year("timestamp").build();
+
+    assertProjectionStrict(spec, lessThan("timestamp", date), Expression.Operation.LT_EQ, "2016");
+    assertProjectionStrict(spec, lessThanOrEqual("timestamp", date), Expression.Operation.LT_EQ, "2017");
+    assertProjectionStrict(spec, greaterThan("timestamp", date), Expression.Operation.GT_EQ, "2018");
+    assertProjectionStrict(spec, greaterThanOrEqual("timestamp", date), Expression.Operation.GT_EQ, "2018");
+    assertProjectionStrict(spec, notEqual("timestamp", date), Expression.Operation.NOT_EQ, "2017");
+    assertProjectionStrictValue(spec, equal("timestamp", date), Expression.Operation.FALSE);
+  }
+
+  @Test
+  public void testYearInclusiveLowerBound() {
+    Long date = (long) Literal.of("2017-01-01T00:00:00.00000").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).year("timestamp").build();
+
+    assertProjectionInclusive(spec, lessThan("timestamp", date), Expression.Operation.LT_EQ, "2016");
+    assertProjectionInclusive(spec, lessThanOrEqual("timestamp", date), Expression.Operation.LT_EQ, "2017");
+    assertProjectionInclusive(spec, greaterThan("timestamp", date), Expression.Operation.GT_EQ, "2017");
+    assertProjectionInclusive(spec, greaterThanOrEqual("timestamp", date), Expression.Operation.GT_EQ, "2017");
+    assertProjectionInclusive(spec, equal("timestamp", date), Expression.Operation.EQ, "2017");
+    assertProjectionInclusiveValue(spec, notEqual("timestamp", date), Expression.Operation.TRUE);
+  }
+
+  @Test
+  public void testYearInclusiveUpperBound() {
+    Long date = (long) Literal.of("2017-12-31T23:59:59.999999").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).year("timestamp").build();
+
+    assertProjectionInclusive(spec, lessThan("timestamp", date), Expression.Operation.LT_EQ, "2017");
+    assertProjectionInclusive(spec, lessThanOrEqual("timestamp", date), Expression.Operation.LT_EQ, "2017");
+    assertProjectionInclusive(spec, greaterThan("timestamp", date), Expression.Operation.GT_EQ, "2018");
+    assertProjectionInclusive(spec, greaterThanOrEqual("timestamp", date), Expression.Operation.GT_EQ, "2017");
+    assertProjectionInclusive(spec, equal("timestamp", date), Expression.Operation.EQ, "2017");
+    assertProjectionInclusiveValue(spec, notEqual("timestamp", date), Expression.Operation.TRUE);
+  }
+
+  @Test
+  public void testHourStrictLowerBound() {
+    Long date = (long) Literal.of("2017-12-01T10:00:00.00000").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).hour("timestamp").build();
+
+    assertProjectionStrict(spec, lessThan("timestamp", date), Expression.Operation.LT_EQ, "2017-12-01-09");
+    assertProjectionStrict(spec, lessThanOrEqual("timestamp", date), Expression.Operation.LT_EQ, "2017-12-01-09");
+    assertProjectionStrict(spec, greaterThan("timestamp", date), Expression.Operation.GT_EQ, "2017-12-01-11");
+    assertProjectionStrict(spec, greaterThanOrEqual("timestamp", date), Expression.Operation.GT_EQ, "2017-12-01-10");
+    assertProjectionStrict(spec, notEqual("timestamp", date), Expression.Operation.NOT_EQ, "2017-12-01-10");
+    assertProjectionStrictValue(spec, equal("timestamp", date), Expression.Operation.FALSE);
+  }
+
+  @Test
+  public void testHourStrictUpperBound() {
+    Long date = (long) Literal.of("2017-12-01T10:59:59.999999").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).hour("timestamp").build();
+
+    assertProjectionStrict(spec, lessThan("timestamp", date), Expression.Operation.LT_EQ, "2017-12-01-09");
+    assertProjectionStrict(spec, lessThanOrEqual("timestamp", date), Expression.Operation.LT_EQ, "2017-12-01-10");
+    assertProjectionStrict(spec, greaterThan("timestamp", date), Expression.Operation.GT_EQ, "2017-12-01-11");
+    assertProjectionStrict(spec, greaterThanOrEqual("timestamp", date), Expression.Operation.GT_EQ, "2017-12-01-11");
+    assertProjectionStrict(spec, notEqual("timestamp", date), Expression.Operation.NOT_EQ, "2017-12-01-10");
+    assertProjectionStrictValue(spec, equal("timestamp", date), Expression.Operation.FALSE);
+  }
+
+  @Test
+  public void testHourInclusiveLowerBound() {
+    Long date = (long) Literal.of("2017-12-01T10:00:00.00000").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).hour("timestamp").build();
+
+    assertProjectionInclusive(spec, lessThan("timestamp", date), Expression.Operation.LT_EQ, "2017-12-01-09");
+    assertProjectionInclusive(spec, lessThanOrEqual("timestamp", date), Expression.Operation.LT_EQ, "2017-12-01-10");
+    assertProjectionInclusive(spec, greaterThan("timestamp", date), Expression.Operation.GT_EQ, "2017-12-01-10");
+    assertProjectionInclusive(spec, greaterThanOrEqual("timestamp", date), Expression.Operation.GT_EQ, "2017-12-01-10");
+    assertProjectionInclusive(spec, equal("timestamp", date), Expression.Operation.EQ, "2017-12-01-10");
+    assertProjectionInclusiveValue(spec, notEqual("timestamp", date), Expression.Operation.TRUE);
+  }
+
+  @Test
+  public void testHourInclusiveUpperBound() {
+    Long date = (long) Literal.of("2017-12-01T10:59:59.999999").to(TYPE).value();
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).hour("timestamp").build();
+
+    assertProjectionInclusive(spec, lessThan("timestamp", date), Expression.Operation.LT_EQ, "2017-12-01-10");
+    assertProjectionInclusive(spec, lessThanOrEqual("timestamp", date), Expression.Operation.LT_EQ, "2017-12-01-10");
+    assertProjectionInclusive(spec, greaterThan("timestamp", date), Expression.Operation.GT_EQ, "2017-12-01-11");
+    assertProjectionInclusive(spec, greaterThanOrEqual("timestamp", date), Expression.Operation.GT_EQ, "2017-12-01-10");
+    assertProjectionInclusive(spec, equal("timestamp", date), Expression.Operation.EQ, "2017-12-01-10");
+    assertProjectionInclusiveValue(spec, notEqual("timestamp", date), Expression.Operation.TRUE);
+  }
+}
diff --git a/build.gradle b/build.gradle
index 71e8065..b7a9904 100644
--- a/build.gradle
+++ b/build.gradle
@@ -81,6 +81,8 @@ subprojects {
     all {
       exclude group: 'org.slf4j', module: 'slf4j-log4j12'
     }
+
+    testArtifacts
   }
 
   ext {
@@ -99,6 +101,15 @@ subprojects {
     testCompile 'org.mockito:mockito-core'
   }
 
+  task testJar(type: Jar){
+    archiveClassifier = 'tests'
+    from sourceSets.test.output
+  }
+
+  artifacts {
+    testArtifacts testJar
+  }
+
   publishing {
     publications {
       nebula(MavenPublication) {
@@ -300,19 +311,6 @@ project(':iceberg-hive') {
       exclude group: 'org.slf4j', module: 'slf4j-log4j12'
     }
   }
-
-  task testJar(type: Jar){
-    archiveClassifier = 'tests'
-    from sourceSets.test.output
-  }
-
-  configurations {
-    testArtifacts
-  }
-
-  artifacts {
-    testArtifacts testJar
-  }
 }
 
 project(':iceberg-orc') {
@@ -349,6 +347,8 @@ project(':iceberg-parquet') {
     compileOnly("org.apache.hadoop:hadoop-client") {
       exclude group: 'org.apache.avro', module: 'avro'
     }
+
+    testCompile project(path: ':iceberg-core', configuration: 'testArtifacts')
   }
 }
 
diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
index 2d578e7..0271695 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
@@ -73,6 +73,7 @@ class ManifestListWriter implements FileAppender<ManifestFile> {
           .schema(ManifestFile.schema())
           .named("manifest_file")
           .meta(meta)
+          .overwrite()
           .build();
 
     } catch (IOException e) {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index 0fe7da3..eb53829 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -195,6 +195,7 @@ public class ManifestWriter implements FileAppender<DataFile> {
               .meta("schema", SchemaParser.toJson(spec.schema()))
               .meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
               .meta("partition-spec-id", String.valueOf(spec.specId()))
+              .overwrite()
               .build();
         default:
           throw new IllegalArgumentException("Unsupported format: " + format);
diff --git a/core/src/main/java/org/apache/iceberg/MetricsConfig.java b/core/src/main/java/org/apache/iceberg/MetricsConfig.java
index 864ec06..816f23e 100644
--- a/core/src/main/java/org/apache/iceberg/MetricsConfig.java
+++ b/core/src/main/java/org/apache/iceberg/MetricsConfig.java
@@ -22,13 +22,15 @@ package org.apache.iceberg;
 import com.google.common.collect.Maps;
 import java.util.Map;
 import org.apache.iceberg.MetricsModes.MetricsMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE;
 import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE_DEFAULT;
 
 public class MetricsConfig {
 
-  private static final String COLUMN_CONF_PREFIX = "write.metadata.metrics.column.";
+  private static final Logger LOG = LoggerFactory.getLogger(MetricsConfig.class);
 
   private Map<String, MetricsMode> columnModes = Maps.newHashMap();
   private MetricsMode defaultMode;
@@ -43,15 +45,30 @@ public class MetricsConfig {
 
   public static MetricsConfig fromProperties(Map<String, String> props) {
     MetricsConfig spec = new MetricsConfig();
+    String defaultModeAsString = props.getOrDefault(DEFAULT_WRITE_METRICS_MODE, DEFAULT_WRITE_METRICS_MODE_DEFAULT);
+    try {
+      spec.defaultMode = MetricsModes.fromString(defaultModeAsString);
+    } catch (IllegalArgumentException err) {
+      // Mode was invalid, log the error and use the default
+      LOG.warn("Ignoring invalid default metrics mode: {}", defaultModeAsString, err);
+      spec.defaultMode = MetricsModes.fromString(DEFAULT_WRITE_METRICS_MODE_DEFAULT);
+    }
+
     props.keySet().stream()
-        .filter(key -> key.startsWith(COLUMN_CONF_PREFIX))
+        .filter(key -> key.startsWith(TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX))
         .forEach(key -> {
-          MetricsMode mode = MetricsModes.fromString(props.get(key));
-          String columnAlias = key.replaceFirst(COLUMN_CONF_PREFIX, "");
+          String columnAlias = key.replaceFirst(TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX, "");
+          MetricsMode mode;
+          try {
+            mode = MetricsModes.fromString(props.get(key));
+          } catch (IllegalArgumentException err) {
+            // Mode was invalid, log the error and use the default
+            LOG.warn("Ignoring invalid metrics mode for column {}: {}", columnAlias, props.get(key), err);
+            mode = spec.defaultMode;
+          }
           spec.columnModes.put(columnAlias, mode);
         });
-    String defaultModeAsString = props.getOrDefault(DEFAULT_WRITE_METRICS_MODE, DEFAULT_WRITE_METRICS_MODE_DEFAULT);
-    spec.defaultMode = MetricsModes.fromString(defaultModeAsString);
+
     return spec;
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
index 48cbb04..d94f449 100644
--- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
@@ -28,9 +28,14 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
@@ -39,6 +44,7 @@ import static org.apache.iceberg.types.Types.NestedField.required;
  * Schema evolution API implementation.
  */
 class SchemaUpdate implements UpdateSchema {
+  private static final Logger LOG = LoggerFactory.getLogger(SchemaUpdate.class);
   private static final int TABLE_ROOT_ID = -1;
 
   private final TableOperations ops;
@@ -200,7 +206,7 @@ class SchemaUpdate implements UpdateSchema {
 
   @Override
   public void commit() {
-    TableMetadata update = base.updateSchema(apply(), lastColumnId);
+    TableMetadata update = applyChangesToMapping(base.updateSchema(apply(), lastColumnId));
     ops.commit(base, update);
   }
 
@@ -210,6 +216,30 @@ class SchemaUpdate implements UpdateSchema {
     return next;
   }
 
+  private TableMetadata applyChangesToMapping(TableMetadata metadata) {
+    String mappingJson = metadata.property(TableProperties.DEFAULT_NAME_MAPPING, null);
+    if (mappingJson != null) {
+      try {
+        // parse and update the mapping
+        NameMapping mapping = NameMappingParser.fromJson(mappingJson);
+        NameMapping updated = MappingUtil.update(mapping, updates, adds);
+
+        // replace the table property
+        Map<String, String> updatedProperties = Maps.newHashMap();
+        updatedProperties.putAll(metadata.properties());
+        updatedProperties.put(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(updated));
+
+        return metadata.replaceProperties(updatedProperties);
+
+      } catch (RuntimeException e) {
+        // log the error, but do not fail the update
+        LOG.warn("Failed to update external schema mapping: {}", mappingJson, e);
+      }
+    }
+
+    return metadata;
+  }
+
   private static Schema applyChanges(Schema schema, List<Integer> deletes,
                                      Map<Integer, Types.NestedField> updates,
                                      Multimap<Integer, Types.NestedField> adds) {
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index e8cc29b..83f15b5 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -79,6 +79,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
   private final List<String> manifestLists = Lists.newArrayList();
   private Long snapshotId = null;
   private TableMetadata base = null;
+  private boolean stageOnly = false;
   private Consumer<String> deleteFunc = defaultDelete;
 
   protected SnapshotProducer(TableOperations ops) {
@@ -97,6 +98,12 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
   protected abstract ThisT self();
 
   @Override
+  public ThisT stageOnly() {
+    this.stageOnly = true;
+    return self();
+  }
+
+  @Override
   public ThisT deleteWith(Consumer<String> deleteCallback) {
     Preconditions.checkArgument(this.deleteFunc == defaultDelete, "Cannot set delete callback more than once");
     this.deleteFunc = deleteCallback;
@@ -230,7 +237,13 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
           .run(taskOps -> {
             Snapshot newSnapshot = apply();
             newSnapshotId.set(newSnapshot.snapshotId());
-            TableMetadata updated = base.replaceCurrentSnapshot(newSnapshot);
+            TableMetadata updated;
+            if (stageOnly) {
+              updated = base.addStagedSnapshot(newSnapshot);
+            } else {
+              updated = base.replaceCurrentSnapshot(newSnapshot);
+            }
+
             // if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries
             // to ensure that if a concurrent operation assigns the UUID, this operation will not fail.
             taskOps.commit(base, updated.withUUID());
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index ef289b4..ea015ba 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -321,6 +321,16 @@ public class TableMetadata {
         currentSnapshotId, snapshots, snapshotLog);
   }
 
+  public TableMetadata addStagedSnapshot(Snapshot snapshot) {
+    List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
+        .addAll(snapshots)
+        .add(snapshot)
+        .build();
+    return new TableMetadata(ops, null, uuid, location,
+        snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        currentSnapshotId, newSnapshots, snapshotLog);
+  }
+
   public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
     List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
         .addAll(snapshots)
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java
index d426dd3..b1e4ab0 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -89,6 +89,12 @@ public class TableProperties {
   public static final String METADATA_COMPRESSION = "write.metadata.compression-codec";
   public static final String METADATA_COMPRESSION_DEFAULT = "none";
 
+  public static final String METRICS_MODE_COLUMN_CONF_PREFIX = "write.metadata.metrics.column.";
   public static final String DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default";
   public static final String DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)";
+
+  public static final String DEFAULT_NAME_MAPPING = "schema.name-mapping.default";
+
+  public static final String WRITE_AUDIT_PUBLISH_ENABLED = "write.wap.enabled";
+  public static final String WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT = "false";
 }
diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index 65ba9d8..1276e23 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -34,6 +34,7 @@ import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificData;
 import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
@@ -84,11 +85,18 @@ public class Avro {
     private Map<String, String> config = Maps.newHashMap();
     private Map<String, String> metadata = Maps.newLinkedHashMap();
     private Function<Schema, DatumWriter<?>> createWriterFunc = GenericAvroWriter::new;
+    private boolean overwrite;
 
     private WriteBuilder(OutputFile file) {
       this.file = file;
     }
 
+    public WriteBuilder forTable(Table table) {
+      schema(table.schema());
+      setAll(table.properties());
+      return this;
+    }
+
     public WriteBuilder schema(org.apache.iceberg.Schema newSchema) {
       this.schema = newSchema;
       return this;
@@ -124,6 +132,15 @@ public class Avro {
       return this;
     }
 
+    public WriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public WriteBuilder overwrite(boolean enabled) {
+      this.overwrite = enabled;
+      return this;
+    }
+
     private CodecFactory codec() {
       String codec = config.getOrDefault(AVRO_COMPRESSION, AVRO_COMPRESSION_DEFAULT);
       try {
@@ -141,7 +158,7 @@ public class Avro {
       meta("iceberg.schema", SchemaParser.toJson(schema));
 
       return new AvroFileAppender<>(
-          AvroSchemaUtil.convert(schema, name), file, createWriterFunc, codec(), metadata);
+          AvroSchemaUtil.convert(schema, name), file, createWriterFunc, codec(), metadata, overwrite);
     }
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java b/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
index 8cc0712..a77b07d 100644
--- a/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
@@ -39,8 +39,9 @@ class AvroFileAppender<D> implements FileAppender<D> {
 
   AvroFileAppender(Schema schema, OutputFile file,
                    Function<Schema, DatumWriter<?>> createWriterFunc,
-                   CodecFactory codec, Map<String, String> metadata) throws IOException {
-    this.stream = file.create();
+                   CodecFactory codec, Map<String, String> metadata,
+                   boolean overwrite) throws IOException {
+    this.stream = overwrite ? file.createOrOverwrite() : file.create();
     this.writer = newAvroWriter(schema, stream, createWriterFunc, codec, metadata);
   }
 
@@ -92,7 +93,6 @@ class AvroFileAppender<D> implements FileAppender<D> {
       writer.setMeta(entry.getKey(), entry.getValue());
     }
 
-    // TODO: support overwrite
     return writer.create(schema, stream);
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/mapping/MappedField.java b/core/src/main/java/org/apache/iceberg/mapping/MappedField.java
new file mode 100644
index 0000000..2a56afa
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/mapping/MappedField.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mapping;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * An immutable mapping between a field ID and a set of names.
+ */
+public class MappedField {
+
+  static MappedField of(Integer id, String name) {
+    return new MappedField(id, ImmutableSet.of(name), null);
+  }
+
+  static MappedField of(Integer id, Iterable<String> names) {
+    return new MappedField(id, names, null);
+  }
+
+  static MappedField of(Integer id, String name, MappedFields nestedMapping) {
+    return new MappedField(id, ImmutableSet.of(name), nestedMapping);
+  }
+
+  static MappedField of(Integer id, Iterable<String> names, MappedFields nestedMapping) {
+    return new MappedField(id, names, nestedMapping);
+  }
+
+  private final Set<String> names;
+  private Integer id;
+  private MappedFields nestedMapping;
+
+  private MappedField(Integer id, Iterable<String> names, MappedFields nested) {
+    this.id = id;
+    this.names = ImmutableSet.copyOf(names);
+    this.nestedMapping = nested;
+  }
+
+  public Integer id() {
+    return id;
+  }
+
+  public Set<String> names() {
+    return names;
+  }
+
+  public MappedFields nestedMapping() {
+    return nestedMapping;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+
+    if (other == null || getClass() != other.getClass()) {
+      return false;
+    }
+
+    MappedField that = (MappedField) other;
+    return names.equals(that.names) &&
+        Objects.equals(id, that.id) &&
+        Objects.equals(nestedMapping, that.nestedMapping);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(names, id, nestedMapping);
+  }
+
+  @Override
+  public String toString() {
+    return "([" + Joiner.on(", ").join(names) + "] -> " + (id != null ? id : "?") +
+        (nestedMapping != null ? ", " + nestedMapping + ")" : ")");
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/mapping/MappedFields.java b/core/src/main/java/org/apache/iceberg/mapping/MappedFields.java
new file mode 100644
index 0000000..824257b
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/mapping/MappedFields.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mapping;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class MappedFields {
+
+  static MappedFields of(MappedField... fields) {
+    return new MappedFields(ImmutableList.copyOf(fields));
+  }
+
+  static MappedFields of(List<MappedField> fields) {
+    return new MappedFields(fields);
+  }
+
+  private final List<MappedField> fields;
+  private final Map<String, Integer> nameToId;
+  private final Map<Integer, MappedField> idToField;
+
+  private MappedFields(List<MappedField> fields) {
+    this.fields = ImmutableList.copyOf(fields);
+    this.nameToId = indexIds(fields);
+    this.idToField = indexFields(fields);
+  }
+
+  public MappedField field(int id) {
+    return idToField.get(id);
+  }
+
+  public Integer id(String name) {
+    return nameToId.get(name);
+  }
+
+  public int size() {
+    return fields.size();
+  }
+
+  private static Map<String, Integer> indexIds(List<MappedField> fields) {
+    ImmutableMap.Builder<String, Integer> builder = ImmutableMap.builder();
+    fields.forEach(field ->
+        field.names().forEach(name -> {
+          Integer id = field.id();
+          if (id != null) {
+            builder.put(name, id);
+          }
+        }));
+    return builder.build();
+  }
+
+  private static Map<Integer, MappedField> indexFields(List<MappedField> fields) {
+    ImmutableMap.Builder<Integer, MappedField> builder = ImmutableMap.builder();
+    fields.forEach(field -> {
+      Integer id = field.id();
+      if (id != null) {
+        builder.put(id, field);
+      }
+    });
+    return builder.build();
+  }
+
+  public List<MappedField> fields() {
+    return fields;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+
+    if (other == null || getClass() != other.getClass()) {
+      return false;
+    }
+
+    return fields.equals(((MappedFields) other).fields);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(fields);
+  }
+
+  @Override
+  public String toString() {
+    return "[ " + Joiner.on(", ").join(fields) + " ]";
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/mapping/MappingUtil.java b/core/src/main/java/org/apache/iceberg/mapping/MappingUtil.java
new file mode 100644
index 0000000..95d8b4e
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/mapping/MappingUtil.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mapping;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+
+public class MappingUtil {
+  private static final Joiner DOT = Joiner.on('.');
+
+  private MappingUtil() {
+  }
+
+  /**
+   * Create a name-based mapping for a schema.
+   * <p>
+   * The mapping returned by this method will use the schema's name for each field.
+   *
+   * @param schema a {@link Schema}
+   * @return a {@link NameMapping} initialized with the schema's fields and names
+   */
+  public static NameMapping create(Schema schema) {
+    return new NameMapping(TypeUtil.visit(schema, CreateMapping.INSTANCE));
+  }
+
+  /**
+   * Update a name-based mapping using changes to a schema.
+   *
+   * @param mapping a name-based mapping
+   * @param updates a map from field ID to updated field definitions
+   * @param adds a map from parent field ID to nested fields to be added
+   * @return an updated mapping with names added to renamed fields and the mapping extended for new fields
+   */
+  public static NameMapping update(NameMapping mapping,
+                            Map<Integer, Types.NestedField> updates,
+                            Multimap<Integer, Types.NestedField> adds) {
+    return new NameMapping(visit(mapping, new UpdateMapping(updates, adds)));
+  }
+
+  static Map<Integer, MappedField> indexById(MappedFields mapping) {
+    return visit(mapping, new IndexById());
+  }
+
+  static Map<String, MappedField> indexByName(MappedFields mapping) {
+    return visit(mapping, IndexByName.INSTANCE);
+  }
+
+  private static class UpdateMapping implements Visitor<MappedFields, MappedField> {
+    private final Map<Integer, Types.NestedField> updates;
+    private final Multimap<Integer, Types.NestedField> adds;
+
+    private UpdateMapping(Map<Integer, Types.NestedField> updates, Multimap<Integer, Types.NestedField> adds) {
+      this.updates = updates;
+      this.adds = adds;
+    }
+
+    @Override
+    public MappedFields mapping(NameMapping mapping, MappedFields result) {
+      return addNewFields(result, -1 /* parent ID used to add top-level fields */);
+    }
+
+    @Override
+    public MappedFields fields(MappedFields fields, List<MappedField> fieldResults) {
+      return MappedFields.of(fieldResults);
+    }
+
+    @Override
+    public MappedField field(MappedField field, MappedFields fieldResult) {
+      // update this field's names
+      Set<String> fieldNames = Sets.newHashSet(field.names());
+      Types.NestedField update = updates.get(field.id());
+      if (update != null) {
+        fieldNames.add(update.name());
+      }
+
+      // add a new mapping for any new nested fields
+      MappedFields nestedMapping = addNewFields(fieldResult, field.id());
+      return MappedField.of(field.id(), fieldNames, nestedMapping);
+    }
+
+    private MappedFields addNewFields(MappedFields mapping, int parentId) {
+      Collection<Types.NestedField> fieldsToAdd = adds.get(parentId);
+      if (fieldsToAdd == null || fieldsToAdd.isEmpty()) {
+        return mapping;
+      }
+
+      List<MappedField> fields = Lists.newArrayList();
+      if (mapping != null) {
+        fields.addAll(mapping.fields());
+      }
+
+      for (Types.NestedField add : fieldsToAdd) {
+        MappedFields nestedMapping = TypeUtil.visit(add.type(), CreateMapping.INSTANCE);
+        fields.add(MappedField.of(add.fieldId(), add.name(), nestedMapping));
+      }
+
+      return MappedFields.of(fields);
+    }
+  }
+
+  private static class IndexByName implements Visitor<Map<String, MappedField>, Map<String, MappedField>> {
+    static final IndexByName INSTANCE = new IndexByName();
+
+    @Override
+    public Map<String, MappedField> mapping(NameMapping mapping, Map<String, MappedField> result) {
+      return result;
+    }
+
+    @Override
+    public Map<String, MappedField> fields(MappedFields fields, List<Map<String, MappedField>> fieldResults) {
+      // merge the results of each field
+      ImmutableMap.Builder<String, MappedField> builder = ImmutableMap.builder();
+      for (Map<String, MappedField> results : fieldResults) {
+        builder.putAll(results);
+      }
+      return builder.build();
+    }
+
+    @Override
+    public Map<String, MappedField> field(MappedField field, Map<String, MappedField> fieldResult) {
+      ImmutableMap.Builder<String, MappedField> builder = ImmutableMap.builder();
+
+      if (fieldResult != null) {
+        for (String name : field.names()) {
+          for (Map.Entry<String, MappedField> entry : fieldResult.entrySet()) {
+            String fullName = DOT.join(name, entry.getKey());
+            builder.put(fullName, entry.getValue());
+          }
+        }
+      }
+
+      for (String name : field.names()) {
+        builder.put(name, field);
+      }
+
+      return builder.build();
+    }
+  }
+
+  private static class IndexById implements Visitor<Map<Integer, MappedField>, Map<Integer, MappedField>> {
+    private final Map<Integer, MappedField> result = Maps.newHashMap();
+
+    @Override
+    public Map<Integer, MappedField> mapping(NameMapping mapping, Map<Integer, MappedField> fieldsResult) {
+      return fieldsResult;
+    }
+
+    @Override
+    public Map<Integer, MappedField> fields(MappedFields fields, List<Map<Integer, MappedField>> fieldResults) {
+      return result;
+    }
+
+    @Override
+    public Map<Integer, MappedField> field(MappedField field, Map<Integer, MappedField> fieldResult) {
+      Preconditions.checkState(!result.containsKey(field.id()), "Invalid mapping: ID %s is not unique", field.id());
+      result.put(field.id(), field);
+      return result;
+    }
+  }
+
+  private interface Visitor<S, T> {
+    S mapping(NameMapping mapping, S result);
+    S fields(MappedFields fields, List<T> fieldResults);
+    T field(MappedField field, S fieldResult);
+  }
+
+  private static <S, T> S visit(NameMapping mapping, Visitor<S, T> visitor) {
+    return visitor.mapping(mapping, visit(mapping.asMappedFields(), visitor));
+  }
+
+  private static <S, T> S visit(MappedFields mapping, Visitor<S, T> visitor) {
+    if (mapping == null) {
+      return null;
+    }
+
+    List<T> fieldResults = Lists.newArrayList();
+    for (MappedField field : mapping.fields()) {
+      fieldResults.add(visitor.field(field, visit(field.nestedMapping(), visitor)));
+    }
+
+    return visitor.fields(mapping, fieldResults);
+  }
+
+  private static class CreateMapping extends TypeUtil.SchemaVisitor<MappedFields> {
+    private static final CreateMapping INSTANCE = new CreateMapping();
+
+    private CreateMapping() {
+    }
+
+    @Override
+    public MappedFields schema(Schema schema, MappedFields structResult) {
+      return structResult;
+    }
+
+    @Override
+    public MappedFields struct(Types.StructType struct, List<MappedFields> fieldResults) {
+      List<MappedField> fields = Lists.newArrayListWithExpectedSize(fieldResults.size());
+
+      for (int i = 0; i < fieldResults.size(); i += 1) {
+        Types.NestedField field = struct.fields().get(i);
+        MappedFields result = fieldResults.get(i);
+        fields.add(MappedField.of(field.fieldId(), field.name(), result));
+      }
+
+      return MappedFields.of(fields);
+    }
+
+    @Override
+    public MappedFields field(Types.NestedField field, MappedFields fieldResult) {
+      return fieldResult;
+    }
+
+    @Override
+    public MappedFields list(Types.ListType list, MappedFields elementResult) {
+      return MappedFields.of(MappedField.of(list.elementId(), "element", elementResult));
+    }
+
+    @Override
+    public MappedFields map(Types.MapType map, MappedFields keyResult, MappedFields valueResult) {
+      return MappedFields.of(
+          MappedField.of(map.keyId(), "key", keyResult),
+          MappedField.of(map.valueId(), "value", valueResult)
+      );
+    }
+
+    @Override
+    public MappedFields primitive(Type.PrimitiveType primitive) {
+      return null; // no mapping because primitives have no nested fields
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/mapping/NameMapping.java b/core/src/main/java/org/apache/iceberg/mapping/NameMapping.java
new file mode 100644
index 0000000..2d8d4d8
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/mapping/NameMapping.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mapping;
+
+import com.google.common.base.Joiner;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents a mapping from external schema names to Iceberg type IDs.
+ */
+public class NameMapping {
+  private static final Joiner DOT = Joiner.on('.');
+
+  private final MappedFields mapping;
+  private final Map<Integer, MappedField> fieldsById;
+  private final Map<String, MappedField> fieldsByName;
+
+  NameMapping(MappedFields mapping) {
+    this.mapping = mapping;
+    this.fieldsById = MappingUtil.indexById(mapping);
+    this.fieldsByName = MappingUtil.indexByName(mapping);
+  }
+
+  public MappedField find(int id) {
+    return fieldsById.get(id);
+  }
+
+  public MappedField find(String... names) {
+    return fieldsByName.get(DOT.join(names));
+  }
+
+  public MappedField find(List<String> names) {
+    return fieldsByName.get(DOT.join(names));
+  }
+
+  public MappedFields asMappedFields() {
+    return mapping;
+  }
+
+  @Override
+  public String toString() {
+    if (mapping.fields().isEmpty()) {
+      return "[]";
+    } else {
+      return "[\n  " + Joiner.on("\n  ").join(mapping.fields()) + "\n]";
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/mapping/NameMappingParser.java b/core/src/main/java/org/apache/iceberg/mapping/NameMappingParser.java
new file mode 100644
index 0000000..453165c
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/mapping/NameMappingParser.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mapping;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.Set;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.util.JsonUtil;
+
+/**
+ * Parses external name mappings from a JSON representation.
+ * <pre>
+ * [ { "field-id": 1, "names": ["id", "record_id"] },
+ *   { "field-id": 2, "names": ["data"] },
+ *   { "field-id": 3, "names": ["location"], "fields": [
+ *       { "field-id": 4, "names": ["latitude", "lat"] },
+ *       { "field-id": 5, "names": ["longitude", "long"] }
+ *     ] } ]
+ * </pre>
+ */
+public class NameMappingParser {
+
+  private NameMappingParser() {
+  }
+
+  private static final String FIELD_ID = "field-id";
+  private static final String NAMES = "names";
+  private static final String FIELDS = "fields";
+
+  public static String toJson(NameMapping mapping) {
+    try {
+      StringWriter writer = new StringWriter();
+      JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+      generator.useDefaultPrettyPrinter();
+      toJson(mapping, generator);
+      generator.flush();
+      return writer.toString();
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to write json for: %s", mapping);
+    }
+  }
+
+  static void toJson(NameMapping nameMapping, JsonGenerator generator) throws IOException {
+    toJson(nameMapping.asMappedFields(), generator);
+  }
+
+  private static void toJson(MappedFields mapping, JsonGenerator generator) throws IOException {
+    generator.writeStartArray();
+
+    for (MappedField field : mapping.fields()) {
+      toJson(field, generator);
+    }
+
+    generator.writeEndArray();
+  }
+
+  private static void toJson(MappedField field, JsonGenerator generator) throws IOException {
+    generator.writeStartObject();
+
+    generator.writeNumberField(FIELD_ID, field.id());
+
+    generator.writeArrayFieldStart(NAMES);
+    for (String name : field.names()) {
+      generator.writeString(name);
+    }
+    generator.writeEndArray();
+
+    MappedFields nested = field.nestedMapping();
+    if (nested != null) {
+      generator.writeFieldName(FIELDS);
+      toJson(nested, generator);
+    }
+
+    generator.writeEndObject();
+  }
+
+  public static NameMapping fromJson(String json) {
+    try {
+      return fromJson(JsonUtil.mapper().readValue(json, JsonNode.class));
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to convert version from json: %s", json);
+    }
+  }
+
+  static NameMapping fromJson(JsonNode node) {
+    return new NameMapping(fieldsFromJson(node));
+  }
+
+  private static MappedFields fieldsFromJson(JsonNode node) {
+    Preconditions.checkArgument(node.isArray(), "Cannot parse non-array mapping fields: %s", node);
+
+    List<MappedField> fields = Lists.newArrayList();
+    node.elements().forEachRemaining(fieldNode -> fields.add(fieldFromJson(fieldNode)));
+
+    return MappedFields.of(fields);
+  }
+
+  private static MappedField fieldFromJson(JsonNode node) {
+    Preconditions.checkArgument(node != null && !node.isNull() && node.isObject(),
+        "Cannot parse non-object mapping field: %s", node);
+
+    Integer id = JsonUtil.getIntOrNull(FIELD_ID, node);
+
+    Set<String> names;
+    if (node.has(NAMES)) {
+      names = ImmutableSet.copyOf(JsonUtil.getStringList(NAMES, node));
+    } else {
+      names = ImmutableSet.of();
+    }
+
+    MappedFields nested;
+    if (node.has(FIELDS)) {
+      nested = fieldsFromJson(node.get(FIELDS));
+    } else {
+      nested = null;
+    }
+
+    return MappedField.of(id, names, nested);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
index 09a5466..976a1cc 100644
--- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
@@ -52,6 +52,16 @@ public class JsonUtil {
     return pNode.asInt();
   }
 
+  public static Integer getIntOrNull(String property, JsonNode node) {
+    if (!node.has(property)) {
+      return null;
+    }
+    JsonNode pNode = node.get(property);
+    Preconditions.checkArgument(pNode != null && !pNode.isNull() && pNode.isIntegralNumber() && pNode.canConvertToInt(),
+        "Cannot parse %s from non-string value: %s", property, pNode);
+    return pNode.asInt();
+  }
+
   public static long getLong(String property, JsonNode node) {
     Preconditions.checkArgument(node.has(property), "Cannot parse missing int %s", property);
     JsonNode pNode = node.get(property);
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 6c4f682..c55c07d 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -40,7 +40,7 @@ import static org.apache.iceberg.types.Types.NestedField.required;
 
 public class TableTestBase {
   // Schema passed to create tables
-  static final Schema SCHEMA = new Schema(
+  public static final Schema SCHEMA = new Schema(
       required(3, "id", Types.IntegerType.get()),
       required(4, "data", Types.StringType.get())
   );
@@ -80,7 +80,7 @@ public class TableTestBase {
 
   File tableDir = null;
   File metadataDir = null;
-  TestTables.TestTable table = null;
+  public TestTables.TestTable table = null;
 
   @Before
   public void setupTable() throws Exception {
@@ -117,7 +117,7 @@ public class TableTestBase {
     return TestTables.metadataVersion("test");
   }
 
-  TableMetadata readMetadata() {
+  public TableMetadata readMetadata() {
     return TestTables.readMetadata("test");
   }
 
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetUtil.java b/core/src/test/java/org/apache/iceberg/TestMetrics.java
similarity index 92%
rename from parquet/src/test/java/org/apache/iceberg/parquet/TestParquetUtil.java
rename to core/src/test/java/org/apache/iceberg/TestMetrics.java
index e75c247..379d297 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetUtil.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetrics.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iceberg.parquet;
+package org.apache.iceberg;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -32,9 +32,8 @@ import java.util.UUID;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericFixed;
-import org.apache.iceberg.Metrics;
-import org.apache.iceberg.Schema;
 import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types.BinaryType;
 import org.apache.iceberg.types.Types.BooleanType;
@@ -55,17 +54,24 @@ import org.apache.iceberg.types.Types.UUIDType;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.apache.iceberg.Files.localInput;
 import static org.apache.iceberg.types.Conversions.fromByteBuffer;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
-public class TestParquetUtil extends BaseParquetWritingTest {
+/**
+ * Tests for Metrics.
+ */
+public abstract class TestMetrics {
+
   private final UUID uuid = UUID.randomUUID();
   private final GenericFixed fixed = new GenericData.Fixed(
       org.apache.avro.Schema.createFixed("fixedCol", null, null, 4),
       "abcd".getBytes(StandardCharsets.UTF_8));
 
+  public abstract Metrics getMetrics(InputFile file);
+
+  public abstract File writeRecords(Schema schema, Record... records) throws IOException;
+
   @Test
   public void testMetricsForTopLevelFields() throws IOException {
     Schema schema = new Schema(
@@ -113,9 +119,9 @@ public class TestParquetUtil extends BaseParquetWritingTest {
     secondRecord.put("fixedCol", fixed);
     secondRecord.put("binaryCol", "W".getBytes());
 
-    File parquetFile = writeRecords(schema, firstRecord, secondRecord);
+    File recordsFile = writeRecords(schema, firstRecord, secondRecord);
 
-    Metrics metrics = ParquetUtil.fileMetrics(localInput(parquetFile));
+    Metrics metrics = getMetrics(Files.localInput(recordsFile));
     Assert.assertEquals(2L, (long) metrics.recordCount());
     assertCounts(1, 2L, 0L, metrics);
     assertBounds(1, BooleanType.get(), false, true, metrics);
@@ -160,9 +166,9 @@ public class TestParquetUtil extends BaseParquetWritingTest {
     record.put("decimalAsInt64", new BigDecimal("4.75"));
     record.put("decimalAsFixed", new BigDecimal("5.80"));
 
-    File parquetFile = writeRecords(schema, record);
+    File recordsFile = writeRecords(schema, record);
 
-    Metrics metrics = ParquetUtil.fileMetrics(localInput(parquetFile));
+    Metrics metrics = getMetrics(Files.localInput(recordsFile));
     Assert.assertEquals(1L, (long) metrics.recordCount());
     assertCounts(1, 1L, 0L, metrics);
     assertBounds(1, DecimalType.of(4, 2), new BigDecimal("2.55"), new BigDecimal("2.55"), metrics);
@@ -197,9 +203,9 @@ public class TestParquetUtil extends BaseParquetWritingTest {
     record.put("intCol", Integer.MAX_VALUE);
     record.put("nestedStructCol", nestedStruct);
 
-    File parquetFile = writeRecords(schema, record);
+    File recordsFile = writeRecords(schema, record);
 
-    Metrics metrics = ParquetUtil.fileMetrics(localInput(parquetFile));
+    Metrics metrics = getMetrics(Files.localInput(recordsFile));
     Assert.assertEquals(1L, (long) metrics.recordCount());
     assertCounts(1, 1L, 0L, metrics);
     assertBounds(1, IntegerType.get(), Integer.MAX_VALUE, Integer.MAX_VALUE, metrics);
@@ -232,9 +238,9 @@ public class TestParquetUtil extends BaseParquetWritingTest {
     map.put("4", struct);
     record.put(1, map);
 
-    File parquetFile = writeRecords(schema, record);
+    File recordsFile = writeRecords(schema, record);
 
-    Metrics metrics = ParquetUtil.fileMetrics(localInput(parquetFile));
+    Metrics metrics = getMetrics(Files.localInput(recordsFile));
     Assert.assertEquals(1L, (long) metrics.recordCount());
     assertCounts(1, 1, 0, metrics);
     assertBounds(1, IntegerType.get(), null, null, metrics);
@@ -256,9 +262,9 @@ public class TestParquetUtil extends BaseParquetWritingTest {
     Record secondRecord = new Record(AvroSchemaUtil.convert(schema.asStruct()));
     secondRecord.put("intCol", null);
 
-    File parquetFile = writeRecords(schema, firstRecord, secondRecord);
+    File recordsFile = writeRecords(schema, firstRecord, secondRecord);
 
-    Metrics metrics = ParquetUtil.fileMetrics(localInput(parquetFile));
+    Metrics metrics = getMetrics(Files.localInput(recordsFile));
     Assert.assertEquals(2L, (long) metrics.recordCount());
     assertCounts(1, 2, 2, metrics);
     assertBounds(1, IntegerType.get(), null, null, metrics);
diff --git a/core/src/test/java/org/apache/iceberg/TestMetricsModes.java b/core/src/test/java/org/apache/iceberg/TestMetricsModes.java
index 03b6bb7..698f5c3 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetricsModes.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetricsModes.java
@@ -19,6 +19,8 @@
 
 package org.apache.iceberg;
 
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
 import org.apache.iceberg.MetricsModes.Counts;
 import org.apache.iceberg.MetricsModes.Full;
 import org.apache.iceberg.MetricsModes.None;
@@ -51,4 +53,26 @@ public class TestMetricsModes {
     exceptionRule.expectMessage("length should be positive");
     MetricsModes.fromString("truncate(0)");
   }
+
+  @Test
+  public void testInvalidColumnModeValue() {
+    Map<String, String> properties = ImmutableMap.of(
+        TableProperties.DEFAULT_WRITE_METRICS_MODE, "full",
+        TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX + "col", "troncate(5)");
+
+    MetricsConfig config = MetricsConfig.fromProperties(properties);
+    Assert.assertEquals("Invalid mode should be defaulted to table default (full)",
+        MetricsModes.Full.get(), config.columnMode("col"));
+  }
+
+  @Test
+  public void testInvalidDefaultColumnModeValue() {
+    Map<String, String> properties = ImmutableMap.of(
+        TableProperties.DEFAULT_WRITE_METRICS_MODE, "fuull",
+        TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX + "col", "troncate(5)");
+
+    MetricsConfig config = MetricsConfig.fromProperties(properties);
+    Assert.assertEquals("Invalid mode should be defaulted to library default (truncate(16))",
+        MetricsModes.Truncate.withLength(16), config.columnMode("col"));
+  }
 }
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetricsTruncation.java b/core/src/test/java/org/apache/iceberg/TestMetricsTruncation.java
similarity index 89%
rename from parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetricsTruncation.java
rename to core/src/test/java/org/apache/iceberg/TestMetricsTruncation.java
index a887e19..af304da 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetricsTruncation.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetricsTruncation.java
@@ -17,23 +17,23 @@
  * under the License.
  */
 
-package org.apache.iceberg.parquet;
+package org.apache.iceberg;
 
+import java.nio.ByteBuffer;
+import java.util.Comparator;
 import org.apache.iceberg.expressions.Literal;
 import org.junit.Assert;
 import org.junit.Test;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Comparator;
 
 import static org.apache.iceberg.util.BinaryUtil.truncateBinaryMax;
 import static org.apache.iceberg.util.BinaryUtil.truncateBinaryMin;
 import static org.apache.iceberg.util.UnicodeUtil.truncateStringMax;
 import static org.apache.iceberg.util.UnicodeUtil.truncateStringMin;
 
-public class TestParquetMetricsTruncation {
+@SuppressWarnings("checkstyle:LocalVariableName")
+public class TestMetricsTruncation {
   @Test
-  public void testTruncateBinaryMin() throws IOException {
+  public void testTruncateBinaryMin() {
     ByteBuffer test1 = ByteBuffer.wrap(new byte[] {1, 1, (byte) 0xFF, 2});
     // Output of test1 when truncated to 2 bytes
     ByteBuffer test1_2_expected = ByteBuffer.wrap(new byte[] {1, 1});
@@ -55,7 +55,7 @@ public class TestParquetMetricsTruncation {
   }
 
   @Test
-  public void testTruncateBinaryMax() throws IOException {
+  public void testTruncateBinaryMax() {
     ByteBuffer test1 = ByteBuffer.wrap(new byte[] {1, 1, 2});
     ByteBuffer test2 = ByteBuffer.wrap(new byte[] {1, 1, (byte) 0xFF, 2});
     ByteBuffer test3 = ByteBuffer.wrap(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 2});
@@ -83,8 +83,9 @@ public class TestParquetMetricsTruncation {
         cmp.compare(truncateBinaryMax(Literal.of(test4), 2).value(), expectedOutput) == 0);
   }
 
+  @SuppressWarnings("checkstyle:AvoidEscapedUnicodeCharacters")
   @Test
-  public void testTruncateStringMin() throws IOException {
+  public void testTruncateStringMin() {
     String test1 = "イロハニホヘト";
     // Output of test1 when truncated to 2 unicode characters
     String test1_2_expected = "イロ";
@@ -96,7 +97,6 @@ public class TestParquetMetricsTruncation {
     // test4 consists of 2 4 byte UTF-8 characters
     String test4 = "\uD800\uDC00\uD800\uDC00";
     String test4_1_expected = "\uD800\uDC00";
-
     Comparator<CharSequence> cmp = Literal.of(test1).comparator();
     Assert.assertTrue("Truncated lower bound should be lower than or equal to the actual lower bound",
         cmp.compare(truncateStringMin(Literal.of(test1), 3).value(), test1) <= 0);
@@ -120,8 +120,9 @@ public class TestParquetMetricsTruncation {
         cmp.compare(truncateStringMin(Literal.of(test4), 1).value(), test4_1_expected) == 0);
   }
 
+  @SuppressWarnings("checkstyle:AvoidEscapedUnicodeCharacters")
   @Test
-  public void testTruncateStringMax() throws IOException {
+  public void testTruncateStringMax() {
     String test1 = "イロハニホヘト";
     // Output of test1 when truncated to 2 unicode characters
     String test1_2_expected = "イヮ";
@@ -138,6 +139,9 @@ public class TestParquetMetricsTruncation {
     String test6 = "\uD800\uDFFF\uD800\uDFFF";
     // Increment the previous character
     String test6_2_expected = "\uD801\uDC00";
+    String test7 = "\uD83D\uDE02\uD83D\uDE02\uD83D\uDE02";
+    String test7_2_expected = "\uD83D\uDE02\uD83D\uDE03";
+    String test7_1_expected = "\uD83D\uDE03";
 
     Comparator<CharSequence> cmp = Literal.of(test1).comparator();
     Assert.assertTrue("Truncated upper bound should be greater than or equal to the actual upper bound",
@@ -175,5 +179,13 @@ public class TestParquetMetricsTruncation {
     Assert.assertTrue("Test 4 byte UTF-8 character increment. Output must have one character with " +
         "the first character incremented", cmp.compare(
         truncateStringMax(Literal.of(test6), 1).value(), test6_2_expected) == 0);
+    Assert.assertTrue("Truncated upper bound should be greater than or equal to the actual upper bound",
+        cmp.compare(truncateStringMax(Literal.of(test7), 2).value(), test7) >= 0);
+    Assert.assertTrue("Test input with multiple 4 byte UTF-8 character where the second unicode " +
+        "character should be incremented", cmp.compare(
+            truncateStringMax(Literal.of(test7), 2).value(), test7_2_expected) == 0);
+    Assert.assertTrue("Test input with multiple 4 byte UTF-8 character where the first unicode " +
+        "character should be incremented", cmp.compare(
+            truncateStringMax(Literal.of(test7), 1).value(), test7_1_expected) == 0);
   }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java
index 4275205..13e336e 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -38,7 +38,7 @@ public class TestTables {
 
   private TestTables() {}
 
-  static TestTable create(File temp, String name, Schema schema, PartitionSpec spec) {
+  public static TestTable create(File temp, String name, Schema schema, PartitionSpec spec) {
     TestTableOperations ops = new TestTableOperations(name, temp);
     if (ops.current() != null) {
       throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp);
@@ -47,7 +47,7 @@ public class TestTables {
     return new TestTable(ops, name);
   }
 
-  static Transaction beginCreate(File temp, String name, Schema schema, PartitionSpec spec) {
+  public static Transaction beginCreate(File temp, String name, Schema schema, PartitionSpec spec) {
     TableOperations ops = new TestTableOperations(name, temp);
     if (ops.current() != null) {
       throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp);
@@ -77,12 +77,12 @@ public class TestTables {
     }
   }
 
-  static TestTable load(File temp, String name) {
+  public static TestTable load(File temp, String name) {
     TestTableOperations ops = new TestTableOperations(name, temp);
     return new TestTable(ops, name);
   }
 
-  static class TestTable extends BaseTable {
+  public static class TestTable extends BaseTable {
     private final TestTableOperations ops;
 
     private TestTable(TestTableOperations ops, String name) {
diff --git a/core/src/test/java/org/apache/iceberg/mapping/TestMappingUpdates.java b/core/src/test/java/org/apache/iceberg/mapping/TestMappingUpdates.java
new file mode 100644
index 0000000..bd39cb0
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/mapping/TestMappingUpdates.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mapping;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestMappingUpdates extends TableTestBase {
+  @Test
+  public void testAddColumnMappingUpdate() {
+    NameMapping mapping = MappingUtil.create(table.schema());
+    table.updateProperties()
+        .set(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(mapping))
+        .commit();
+
+    Assert.assertEquals(
+        MappedFields.of(
+            MappedField.of(1, "id"),
+            MappedField.of(2, "data")),
+        mapping.asMappedFields());
+
+    table.updateSchema()
+        .addColumn("ts", Types.TimestampType.withZone())
+        .commit();
+
+    NameMapping updated = NameMappingParser.fromJson(table.properties().get(TableProperties.DEFAULT_NAME_MAPPING));
+
+    Assert.assertEquals(
+        MappedFields.of(
+            MappedField.of(1, "id"),
+            MappedField.of(2, "data"),
+            MappedField.of(3, "ts")),
+        updated.asMappedFields());
+  }
+
+  @Test
+  public void testAddNestedColumnMappingUpdate() {
+    NameMapping mapping = MappingUtil.create(table.schema());
+    table.updateProperties()
+        .set(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(mapping))
+        .commit();
+
+    Assert.assertEquals(
+        MappedFields.of(
+            MappedField.of(1, "id"),
+            MappedField.of(2, "data")),
+        mapping.asMappedFields());
+
+    table.updateSchema()
+        .addColumn("point", Types.StructType.of(
+            required(1, "x", Types.DoubleType.get()),
+            required(2, "y", Types.DoubleType.get())))
+        .commit();
+
+    NameMapping updated = NameMappingParser.fromJson(table.properties().get(TableProperties.DEFAULT_NAME_MAPPING));
+
+    Assert.assertEquals(
+        MappedFields.of(
+            MappedField.of(1, "id"),
+            MappedField.of(2, "data"),
+            MappedField.of(3, "point", MappedFields.of(
+                MappedField.of(4, "x"),
+                MappedField.of(5, "y")
+            ))),
+        updated.asMappedFields());
+
+    table.updateSchema()
+        .addColumn("point", "z", Types.DoubleType.get())
+        .commit();
+
+    NameMapping pointUpdated = NameMappingParser.fromJson(table.properties().get(TableProperties.DEFAULT_NAME_MAPPING));
+
+    Assert.assertEquals(
+        MappedFields.of(
+            MappedField.of(1, "id"),
+            MappedField.of(2, "data"),
+            MappedField.of(3, "point", MappedFields.of(
+                MappedField.of(4, "x"),
+                MappedField.of(5, "y"),
+                MappedField.of(6, "z")
+            ))),
+        pointUpdated.asMappedFields());
+  }
+
+  @Test
+  public void testRenameMappingUpdate() {
+    NameMapping mapping = MappingUtil.create(table.schema());
+    table.updateProperties()
+        .set(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(mapping))
+        .commit();
+
+    Assert.assertEquals(
+        MappedFields.of(
+            MappedField.of(1, "id"),
+            MappedField.of(2, "data")),
+        mapping.asMappedFields());
+
+    table.updateSchema()
+        .renameColumn("id", "object_id")
+        .commit();
+
+    NameMapping updated = NameMappingParser.fromJson(table.properties().get(TableProperties.DEFAULT_NAME_MAPPING));
+
+    Assert.assertEquals(
+        MappedFields.of(
+            MappedField.of(1, ImmutableList.of("id", "object_id")),
+            MappedField.of(2, "data")),
+        updated.asMappedFields());
+  }
+
+  @Test
+  public void testRenameNestedFieldMappingUpdate() {
+    NameMapping mapping = MappingUtil.create(table.schema());
+    table.updateProperties()
+        .set(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(mapping))
+        .commit();
+
+    table.updateSchema()
+        .addColumn("point", Types.StructType.of(
+            required(1, "x", Types.DoubleType.get()),
+            required(2, "y", Types.DoubleType.get())))
+        .commit();
+
+    NameMapping updated = NameMappingParser.fromJson(table.properties().get(TableProperties.DEFAULT_NAME_MAPPING));
+
+    Assert.assertEquals(
+        MappedFields.of(
+            MappedField.of(1, "id"),
+            MappedField.of(2, "data"),
+            MappedField.of(3, "point", MappedFields.of(
+                MappedField.of(4, "x"),
+                MappedField.of(5, "y")
+            ))),
+        updated.asMappedFields());
+
+    table.updateSchema()
+        .renameColumn("point.x", "X")
+        .renameColumn("point.y", "Y")
+        .commit();
+
+    NameMapping pointUpdated = NameMappingParser.fromJson(table.properties().get(TableProperties.DEFAULT_NAME_MAPPING));
+
+    Assert.assertEquals(
+        MappedFields.of(
+            MappedField.of(1, "id"),
+            MappedField.of(2, "data"),
+            MappedField.of(3, "point", MappedFields.of(
+                MappedField.of(4, ImmutableList.of("x", "X")),
+                MappedField.of(5, ImmutableList.of("y", "Y"))
+            ))),
+        pointUpdated.asMappedFields());
+  }
+
+
+  @Test
+  public void testRenameComplexFieldMappingUpdate() {
+    NameMapping mapping = MappingUtil.create(table.schema());
+    table.updateProperties()
+        .set(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(mapping))
+        .commit();
+
+    table.updateSchema()
+        .addColumn("point", Types.StructType.of(
+            required(1, "x", Types.DoubleType.get()),
+            required(2, "y", Types.DoubleType.get())))
+        .commit();
+
+    NameMapping updated = NameMappingParser.fromJson(table.properties().get(TableProperties.DEFAULT_NAME_MAPPING));
+
+    Assert.assertEquals(
+        MappedFields.of(
+            MappedField.of(1, "id"),
+            MappedField.of(2, "data"),
+            MappedField.of(3, "point", MappedFields.of(
+                MappedField.of(4, "x"),
+                MappedField.of(5, "y")
+            ))),
+        updated.asMappedFields());
+
+    table.updateSchema()
+        .renameColumn("point", "p2")
+        .commit();
+
+    NameMapping pointUpdated = NameMappingParser.fromJson(table.properties().get(TableProperties.DEFAULT_NAME_MAPPING));
+
+    Assert.assertEquals(
+        MappedFields.of(
+            MappedField.of(1, "id"),
+            MappedField.of(2, "data"),
+            MappedField.of(3, ImmutableList.of("point", "p2"), MappedFields.of(
+                MappedField.of(4, "x"),
+                MappedField.of(5, "y")
+            ))),
+        pointUpdated.asMappedFields());
+  }
+
+  @Test
+  public void testMappingUpdateFailureSkipsMappingUpdate() {
+    NameMapping mapping = MappingUtil.create(table.schema());
+    table.updateProperties()
+        .set(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(mapping))
+        .commit();
+
+    table.updateSchema()
+        .renameColumn("id", "object_id")
+        .commit();
+
+    String updatedJson = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+    NameMapping updated = NameMappingParser.fromJson(updatedJson);
+
+    Assert.assertEquals(
+        MappedFields.of(
+            MappedField.of(1, ImmutableList.of("id", "object_id")),
+            MappedField.of(2, "data")),
+        updated.asMappedFields());
+
+    // rename data to id, which conflicts in the mapping above
+    // this update should succeed, even though the mapping update fails
+    table.updateSchema()
+        .renameColumn("data", "id")
+        .commit();
+
+    Assert.assertEquals("Mapping JSON should not change",
+        updatedJson, table.properties().get(TableProperties.DEFAULT_NAME_MAPPING));
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/mapping/TestNameMapping.java b/core/src/test/java/org/apache/iceberg/mapping/TestNameMapping.java
new file mode 100644
index 0000000..7a49764
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/mapping/TestNameMapping.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mapping;
+
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestNameMapping {
+  @Test
+  public void testFlatSchemaToMapping() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()));
+
+    MappedFields expected = MappedFields.of(
+        MappedField.of(1, "id"),
+        MappedField.of(2, "data"));
+
+    NameMapping mapping = MappingUtil.create(schema);
+    Assert.assertEquals(expected, mapping.asMappedFields());
+  }
+
+  @Test
+  public void testNestedStructSchemaToMapping() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()),
+        required(3, "location", Types.StructType.of(
+            required(4, "latitude", Types.FloatType.get()),
+            required(5, "longitude", Types.FloatType.get())
+        )));
+
+    MappedFields expected = MappedFields.of(
+        MappedField.of(1, "id"),
+        MappedField.of(2, "data"),
+        MappedField.of(3, "location", MappedFields.of(
+            MappedField.of(4, "latitude"),
+            MappedField.of(5, "longitude")
+        )));
+
+    NameMapping mapping = MappingUtil.create(schema);
+    Assert.assertEquals(expected, mapping.asMappedFields());
+  }
+
+  @Test
+  public void testMapSchemaToMapping() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()),
+        required(3, "map", Types.MapType.ofRequired(4, 5,
+            Types.StringType.get(),
+            Types.DoubleType.get())));
+
+    MappedFields expected = MappedFields.of(
+        MappedField.of(1, "id"),
+        MappedField.of(2, "data"),
+        MappedField.of(3, "map", MappedFields.of(
+            MappedField.of(4, "key"),
+            MappedField.of(5, "value")
+        )));
+
+    NameMapping mapping = MappingUtil.create(schema);
+    Assert.assertEquals(expected, mapping.asMappedFields());
+  }
+
+  @Test
+  public void testComplexKeyMapSchemaToMapping() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()),
+        required(3, "map", Types.MapType.ofRequired(4, 5,
+            Types.StructType.of(
+                required(6, "x", Types.DoubleType.get()),
+                required(7, "y", Types.DoubleType.get())),
+            Types.DoubleType.get())));
+
+    MappedFields expected = MappedFields.of(
+        MappedField.of(1, "id"),
+        MappedField.of(2, "data"),
+        MappedField.of(3, "map", MappedFields.of(
+            MappedField.of(4, "key", MappedFields.of(
+                MappedField.of(6, "x"),
+                MappedField.of(7, "y")
+            )),
+            MappedField.of(5, "value")
+        )));
+
+    NameMapping mapping = MappingUtil.create(schema);
+    Assert.assertEquals(expected, mapping.asMappedFields());
+  }
+
+  @Test
+  public void testComplexValueMapSchemaToMapping() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()),
+        required(3, "map", Types.MapType.ofRequired(4, 5,
+            Types.DoubleType.get(),
+            Types.StructType.of(
+                required(6, "x", Types.DoubleType.get()),
+                required(7, "y", Types.DoubleType.get()))
+        )));
+
+    MappedFields expected = MappedFields.of(
+        MappedField.of(1, "id"),
+        MappedField.of(2, "data"),
+        MappedField.of(3, "map", MappedFields.of(
+            MappedField.of(4, "key"),
+            MappedField.of(5, "value", MappedFields.of(
+                MappedField.of(6, "x"),
+                MappedField.of(7, "y")
+            ))
+        )));
+
+    NameMapping mapping = MappingUtil.create(schema);
+    Assert.assertEquals(expected, mapping.asMappedFields());
+  }
+
+  @Test
+  public void testListSchemaToMapping() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()),
+        required(3, "list", Types.ListType.ofRequired(4, Types.StringType.get())));
+
+    MappedFields expected = MappedFields.of(
+        MappedField.of(1, "id"),
+        MappedField.of(2, "data"),
+        MappedField.of(3, "list", MappedFields.of(
+            MappedField.of(4, "element")
+        )));
+
+    NameMapping mapping = MappingUtil.create(schema);
+    Assert.assertEquals(expected, mapping.asMappedFields());
+  }
+
+  @Test
+  public void testFailsDuplicateId() {
+    // the schema can be created because ID indexing is lazy
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(1, "data", Types.StringType.get()));
+
+    AssertHelpers.assertThrows("Should fail if IDs are reused",
+        IllegalArgumentException.class, "Multiple entries with same key",
+        () -> MappingUtil.create(schema));
+  }
+
+  @Test
+  public void testFailsDuplicateName() {
+    AssertHelpers.assertThrows("Should fail if names are reused",
+        IllegalArgumentException.class, "Multiple entries with same key",
+        () -> new NameMapping(MappedFields.of(MappedField.of(1, "x"), MappedField.of(2, "x"))));
+  }
+
+  @Test
+  public void testAllowsDuplicateNamesInSeparateContexts() {
+    new NameMapping(MappedFields.of(
+        MappedField.of(1, "x", MappedFields.of(MappedField.of(3, "x"))),
+        MappedField.of(2, "y", MappedFields.of(MappedField.of(4, "x")))
+    ));
+  }
+
+  @Test
+  public void testMappingFindById() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()),
+        required(3, "map", Types.MapType.ofRequired(4, 5,
+            Types.DoubleType.get(),
+            Types.StructType.of(
+                required(6, "x", Types.DoubleType.get()),
+                required(7, "y", Types.DoubleType.get())))),
+        required(8, "list", Types.ListType.ofRequired(9,
+            Types.StringType.get())),
+        required(10, "location", Types.StructType.of(
+            required(11, "latitude", Types.FloatType.get()),
+            required(12, "longitude", Types.FloatType.get())
+        )));
+
+    NameMapping mapping = MappingUtil.create(schema);
+
+    Assert.assertNull("Should not return a field mapping for a missing ID", mapping.find(100));
+    Assert.assertEquals(MappedField.of(2, "data"), mapping.find(2));
+    Assert.assertEquals(MappedField.of(6, "x"), mapping.find(6));
+    Assert.assertEquals(MappedField.of(9, "element"), mapping.find(9));
+    Assert.assertEquals(MappedField.of(11, "latitude"), mapping.find(11));
+    Assert.assertEquals(
+        MappedField.of(10, "location", MappedFields.of(
+            MappedField.of(11, "latitude"),
+            MappedField.of(12, "longitude"))),
+        mapping.find(10));
+  }
+
+  @Test
+  public void testMappingFindByName() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()),
+        required(3, "map", Types.MapType.ofRequired(4, 5,
+            Types.DoubleType.get(),
+            Types.StructType.of(
+                required(6, "x", Types.DoubleType.get()),
+                required(7, "y", Types.DoubleType.get())))),
+        required(8, "list", Types.ListType.ofRequired(9,
+            Types.StringType.get())),
+        required(10, "location", Types.StructType.of(
+            required(11, "latitude", Types.FloatType.get()),
+            required(12, "longitude", Types.FloatType.get())
+        )));
+
+    NameMapping mapping = MappingUtil.create(schema);
+
+    Assert.assertNull("Should not return a field mapping for a nested name", mapping.find("element"));
+    Assert.assertNull("Should not return a field mapping for a nested name", mapping.find("x"));
+    Assert.assertNull("Should not return a field mapping for a nested name", mapping.find("key"));
+    Assert.assertNull("Should not return a field mapping for a nested name", mapping.find("value"));
+    Assert.assertEquals(MappedField.of(2, "data"), mapping.find("data"));
+    Assert.assertEquals(MappedField.of(6, "x"), mapping.find("map", "value", "x"));
+    Assert.assertEquals(MappedField.of(9, "element"), mapping.find("list", "element"));
+    Assert.assertEquals(MappedField.of(11, "latitude"), mapping.find("location", "latitude"));
+    Assert.assertEquals(
+        MappedField.of(10, "location", MappedFields.of(
+            MappedField.of(11, "latitude"),
+            MappedField.of(12, "longitude"))),
+        mapping.find("location"));
+  }
+}
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveTypeConverter.java b/hive/src/main/java/org/apache/iceberg/hive/HiveTypeConverter.java
index 31f3b73..dc50c7a 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveTypeConverter.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveTypeConverter.java
@@ -45,7 +45,7 @@ public final class HiveTypeConverter {
       case DATE:
         return "date";
       case TIME:
-        throw new UnsupportedOperationException("Hive does not support time fields");
+        return "string";
       case TIMESTAMP:
         return "timestamp";
       case STRING:
diff --git a/jitpack.yml b/jitpack.yml
new file mode 100644
index 0000000..abeb8ee
--- /dev/null
+++ b/jitpack.yml
@@ -0,0 +1,2 @@
+install:
+  - ./gradlew publishToMavenLocal
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index a014d2a..0f7ee46 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -89,6 +89,15 @@ public class ORC {
       return this;
     }
 
+    public WriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public WriteBuilder overwrite(boolean enabled) {
+      OrcConf.OVERWRITE_OUTPUT_FILE.setBoolean(conf, enabled);
+      return this;
+    }
+
     public <D> FileAppender<D> build() {
       Preconditions.checkNotNull(schema, "Schema is required");
       return new OrcFileAppender<>(TypeConversion.toOrc(schema, new ColumnIdMap()),
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 9ce8dff..4c34fab 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -86,6 +86,7 @@ public class Parquet {
     private Map<String, String> config = Maps.newLinkedHashMap();
     private Function<MessageType, ParquetValueWriter<?>> createWriterFunc = null;
     private MetricsConfig metricsConfig = MetricsConfig.getDefault();
+    private ParquetFileWriter.Mode writeMode = ParquetFileWriter.Mode.CREATE;
 
     private WriteBuilder(OutputFile file) {
       this.file = file;
@@ -139,6 +140,15 @@ public class Parquet {
       return this;
     }
 
+    public WriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public WriteBuilder overwrite(boolean enabled) {
+      this.writeMode = enabled ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE;
+      return this;
+    }
+
     @SuppressWarnings("unchecked")
     private <T> WriteSupport<T> getWriteSupport(MessageType type) {
       if (writeSupport != null) {
@@ -202,7 +212,7 @@ public class Parquet {
 
         return new org.apache.iceberg.parquet.ParquetWriter<>(
             conf, file, schema, rowGroupSize, metadata, createWriterFunc, codec(),
-            parquetProperties, metricsConfig);
+            parquetProperties, metricsConfig, writeMode);
       } else {
         return new ParquetWriteAdapter<>(new ParquetWriteBuilder<D>(ParquetIO.file(file))
             .withWriterVersion(writerVersion)
@@ -211,7 +221,7 @@ public class Parquet {
             .setKeyValueMetadata(metadata)
             .setWriteSupport(getWriteSupport(type))
             .withCompressionCodec(codec())
-            .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) // TODO: support modes
+            .withWriteMode(writeMode)
             .withRowGroupSize(rowGroupSize)
             .withPageSize(pageSize)
             .withDictionaryPageSize(dictionaryPageSize)
@@ -283,7 +293,7 @@ public class Parquet {
     private Map<String, String> properties = Maps.newHashMap();
     private boolean callInit = false;
     private boolean reuseContainers = false;
-    private int maxRecordsPerBatch = 1000;
+    private int maxRecordsPerBatch = 10000;
 
     private ReadBuilder(InputFile file) {
       this.file = file;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
index 4529367..0e6e686 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
@@ -26,7 +26,6 @@ import java.lang.reflect.Array;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -590,6 +589,7 @@ public class ParquetValueReaders {
     private final TripleIterator<?> column;
     private final TripleIterator<?>[] columns;
     private final List<TripleIterator<?>> children;
+    private ColumnarBatch columnarBatch;
 
     @SuppressWarnings("unchecked")
     public ColumnarBatchReader(List<Type> types,
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
index e8f95ea..5f89d54 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
@@ -83,7 +83,8 @@ public class ParquetWriter<T> implements FileAppender<T>, Closeable {
                 Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
                 CompressionCodecName codec,
                 ParquetProperties properties,
-                MetricsConfig metricsConfig) {
+                MetricsConfig metricsConfig,
+                ParquetFileWriter.Mode writeMode) {
     this.output = output;
     this.targetRowGroupSize = rowGroupSize;
     this.props = properties;
@@ -95,7 +96,7 @@ public class ParquetWriter<T> implements FileAppender<T>, Closeable {
 
     try {
       this.writer = new ParquetFileWriter(ParquetIO.file(output, conf), parquetSchema,
-          ParquetFileWriter.Mode.OVERWRITE, rowGroupSize, 0);
+         writeMode, rowGroupSize, 0);
     } catch (IOException e) {
       throw new RuntimeIOException(e, "Failed to create Parquet file");
     }
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/BaseParquetWritingTest.java b/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
similarity index 83%
rename from parquet/src/test/java/org/apache/iceberg/parquet/BaseParquetWritingTest.java
rename to parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
index d587b64..f42bdbd 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/BaseParquetWritingTest.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
@@ -31,24 +31,23 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.parquet.schema.MessageType;
-import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
 import static org.apache.iceberg.Files.localOutput;
 
 /**
- * Base utility test class for tests that need to write Parquet files
+ * Utilities for tests that need to write Parquet files.
  */
-public abstract class BaseParquetWritingTest {
+class ParquetWritingTestUtils {
 
-  @Rule
-  public TemporaryFolder temp = new TemporaryFolder();
+  private ParquetWritingTestUtils() {}
 
-  File writeRecords(Schema schema, GenericData.Record... records) throws IOException {
-    return writeRecords(schema, Collections.emptyMap(), null, records);
+  static File writeRecords(TemporaryFolder temp, Schema schema, GenericData.Record... records) throws IOException {
+    return writeRecords(temp, schema, Collections.emptyMap(), null, records);
   }
 
-  File writeRecords(
+  static File writeRecords(
+      TemporaryFolder temp,
       Schema schema, Map<String, String> properties,
       Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
       GenericData.Record... records) throws IOException {
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
index 9429e45..f5433ea 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
@@ -32,13 +32,19 @@ import org.apache.iceberg.types.Types.IntegerType;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.schema.MessageType;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import static org.apache.iceberg.Files.localInput;
 import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.parquet.ParquetWritingTestUtils.writeRecords;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 
-public class TestParquet extends BaseParquetWritingTest {
+public class TestParquet {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
 
   @Test
   public void testRowGroupSizeConfigurable() throws IOException {
@@ -79,7 +85,7 @@ public class TestParquet extends BaseParquetWritingTest {
     // Force multiple row groups by making the byte size very small
     // Note there'a also minimumRowGroupRecordCount which cannot be configured so we have to write
     // at least that many records for a new row group to occur
-    return writeRecords(
+    return writeRecords(temp,
         schema,
         ImmutableMap.of(
             PARQUET_ROW_GROUP_SIZE_BYTES,
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
new file mode 100644
index 0000000..a838c92
--- /dev/null
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TestMetrics;
+import org.apache.iceberg.io.InputFile;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Test Metrics for Parquet.
+ */
+public class TestParquetMetrics extends TestMetrics {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Override
+  public Metrics getMetrics(InputFile file) {
+    return ParquetUtil.fileMetrics(file);
+  }
+
+  @Override
+  public File writeRecords(Schema schema, GenericData.Record... records) throws IOException {
+    return ParquetWritingTestUtils.writeRecords(temp, schema, records);
+  }
+}
diff --git a/pig/src/main/java/org/apache/iceberg/pig/IcebergPigInputFormat.java b/pig/src/main/java/org/apache/iceberg/pig/IcebergPigInputFormat.java
index ca2529b..e9b41ec 100644
--- a/pig/src/main/java/org/apache/iceberg/pig/IcebergPigInputFormat.java
+++ b/pig/src/main/java/org/apache/iceberg/pig/IcebergPigInputFormat.java
@@ -197,12 +197,16 @@ public class IcebergPigInputFormat<T> extends InputFormat<Void, T> {
           if (hasJoinedPartitionColumns) {
 
             Schema readSchema = TypeUtil.selectNot(projectedSchema, idColumns);
-            Schema partitionSchema = TypeUtil.select(tableSchema, idColumns);
             Schema projectedPartitionSchema = TypeUtil.select(projectedSchema, idColumns);
 
+            Map<String, Integer> partitionSpecFieldIndexMap = Maps.newHashMap();
+            for(int i=0; i<spec.fields().size(); i++) {
+              partitionSpecFieldIndexMap.put(spec.fields().get(i).name(), i);
+            }
+
             for (Types.NestedField field : projectedPartitionSchema.columns()) {
               int tupleIndex = projectedSchema.columns().indexOf(field);
-              int partitionIndex = partitionSchema.columns().indexOf(field);
+              int partitionIndex = partitionSpecFieldIndexMap.get(field.name());
 
               Object partitionValue = file.partition().get(partitionIndex, Object.class);
               partitionValueMap.put(tupleIndex, convertPartitionValue(field.type(), partitionValue));
diff --git a/project/scalastyle_config.xml b/project/scalastyle_config.xml
index ee3cac3..04333bb 100644
--- a/project/scalastyle_config.xml
+++ b/project/scalastyle_config.xml
@@ -136,12 +136,6 @@
     </check>
     <check level="error" class="org.scalastyle.scalariform.UnderscoreImportChecker" enabled="false"/>
     <check level="error" class="org.scalastyle.scalariform.LowercasePatternMatchChecker" enabled="true"/>
-    <check level="error" class="org.scalastyle.scalariform.MultipleStringLiteralsChecker" enabled="true">
-        <parameters>
-            <parameter name="allowed"><![CDATA[2]]></parameter>
-            <parameter name="ignoreRegex"><![CDATA[^""$]]></parameter>
-        </parameters>
-    </check>
     <check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" enabled="true">
         <parameters>
             <parameter name="groups">all</parameter>
diff --git a/site/docs/spark.md b/site/docs/spark.md
index 9489f37..2230466 100644
--- a/site/docs/spark.md
+++ b/site/docs/spark.md
@@ -25,7 +25,7 @@ Iceberg uses Spark's DataSourceV2 API for data source and catalog implementation
 
 To use Iceberg in Spark 2.4, add the `iceberg-spark-runtime` Jar to Spark's `jars` folder.
 
-Spark 2.4 is limited to reading and writing existing Iceberg tables. Use the [Iceberg API](api) to create Iceberg tables.
+Spark 2.4 is limited to reading and writing existing Iceberg tables. Use the [Iceberg API](../api) to create Iceberg tables.
 
 
 ### Reading an Iceberg table
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
index 1c91645..273d6b1 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
@@ -76,7 +76,52 @@ public class IcebergSourceFlatParquetDataFilterBenchmark extends IcebergSourceFl
     tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
     withTableProperties(tableProperties, () -> {
       String tableLocation = table().location();
-      Dataset<Row> df = spark().read().format("iceberg").load(tableLocation).filter(FILTER_COND);
+      Dataset<Row> df = spark().read().format("iceberg")
+          .option("iceberg.read.numrecordsperbatch", "10000")
+          .load(tableLocation).filter(FILTER_COND);
+      materialize(df);
+    });
+  }
+
+
+  @Benchmark
+  @Threads(1)
+  public void readWithFilterIceberg100k() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg")
+          .option("iceberg.read.numrecordsperbatch", "100000")
+          .load(tableLocation).filter(FILTER_COND);
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithFilterIceberg10k() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg")
+          .option("iceberg.read.numrecordsperbatch", "10000")
+          .load(tableLocation).filter(FILTER_COND);
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithFilterIceberg5k() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg")
+          .option("iceberg.read.numrecordsperbatch", "5000")
+          .load(tableLocation).filter(FILTER_COND);
       materialize(df);
     });
   }
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java
index 37a6336..9e51081 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java
@@ -66,12 +66,43 @@ public class IcebergSourceFlatParquetDataReadBenchmark extends IcebergSourceFlat
 
   @Benchmark
   @Threads(1)
-  public void readIceberg() {
+  public void readIcebergVectorized100k() {
     Map<String, String> tableProperties = Maps.newHashMap();
     tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
     withTableProperties(tableProperties, () -> {
       String tableLocation = table().location();
-      Dataset<Row> df = spark().read().format("iceberg").load(tableLocation);
+      Dataset<Row> df = spark().read().format("iceberg")
+          .option("iceberg.read.numrecordsperbatch", "100000")
+          .load(tableLocation);
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readIcebergVectorized10k() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg")
+          .option("iceberg.read.numrecordsperbatch", "10000")
+          .load(tableLocation);
+      materialize(df);
+    });
+  }
+
+
+  @Benchmark
+  @Threads(1)
+  public void readIcebergVectorized5k() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg")
+          .option("iceberg.read.numrecordsperbatch", "5000")
+          .load(tableLocation);
       materialize(df);
     });
   }
@@ -102,12 +133,43 @@ public class IcebergSourceFlatParquetDataReadBenchmark extends IcebergSourceFlat
 
   @Benchmark
   @Threads(1)
-  public void readWithProjectionIceberg() {
+  public void readWithProjectionIcebergVectorized100k() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg")
+          .option("iceberg.read.numrecordsperbatch", "100000")
+          .load(tableLocation).select("longCol");
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionIcebergVectorized10k() {
     Map<String, String> tableProperties = Maps.newHashMap();
     tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
     withTableProperties(tableProperties, () -> {
       String tableLocation = table().location();
-      Dataset<Row> df = spark().read().format("iceberg").load(tableLocation).select("longCol");
+      Dataset<Row> df = spark().read().format("iceberg")
+          .option("iceberg.read.numrecordsperbatch", "10000")
+          .load(tableLocation).select("longCol");
+      materialize(df);
+    });
+  }
+
+
+  @Benchmark
+  @Threads(1)
+  public void readWithProjectionIcebergVectorized5k() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg")
+          .option("iceberg.read.numrecordsperbatch", "5000")
+          .load(tableLocation).select("longCol");
       materialize(df);
     });
   }
@@ -136,6 +198,22 @@ public class IcebergSourceFlatParquetDataReadBenchmark extends IcebergSourceFlat
     });
   }
 
+
+  @Benchmark
+  @Threads(1)
+  public void readIcebergVectorized1k() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg")
+          .option("iceberg.read.numrecordsperbatch", "1000")
+          .load(tableLocation);
+      materialize(df);
+    });
+  }
+
+
   private void appendData() {
     for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
       Dataset<Row> df = spark().range(NUM_ROWS)
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java
index 3b67c96..e030fa4 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java
@@ -21,7 +21,7 @@ package org.apache.iceberg.spark.data.vector;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.BigIntVector;
 import org.apache.arrow.vector.BitVector;
 import org.apache.arrow.vector.DateDayVector;
@@ -66,16 +66,32 @@ public class VectorizedParquetValueReaders {
 
   public abstract static class VectorReader extends ParquetValueReaders.PrimitiveReader<FieldVector> {
 
+    public static final int DEFAULT_NUM_ROWS_IN_BATCH = 10000;
+    // private static final Logger LOG = LoggerFactory.getLogger(VectorReader.class);
+
     private FieldVector vec;
     private boolean isOptional;
+    private int rowsInBatch = DEFAULT_NUM_ROWS_IN_BATCH;
+    private ColumnDescriptor desc;
 
     VectorReader(ColumnDescriptor desc,
         Types.NestedField icebergField,
-        RootAllocator rootAlloc) {
+        BufferAllocator rootAlloc) {
 
       super(desc);
       this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc);
       this.isOptional = desc.getPrimitiveType().isRepetition(Type.Repetition.OPTIONAL);
+      this.desc = desc;
+    }
+
+    VectorReader(ColumnDescriptor desc,
+        Types.NestedField icebergField,
+        BufferAllocator rootAlloc,
+        int rowsInBatch) {
+
+      this(desc, icebergField, rootAlloc);
+      this.rowsInBatch = (rowsInBatch == 0) ? DEFAULT_NUM_ROWS_IN_BATCH : rowsInBatch;
+      // LOG.info("=> [VectorReader] rowsInBatch = " + this.rowsInBatch);
     }
 
     protected FieldVector getVector() {
@@ -92,19 +108,28 @@ public class VectorizedParquetValueReaders {
       vec.reset();
       int ordinal = 0;
 
-      while (column.hasNext()) {
-        // Todo: this check works for flat schemas only
-        // need to get max definition level to do proper check
-        if (isOptional && column.currentDefinitionLevel() == 0) {
-          // handle null
-          column.nextNull();
-          nextNullAt(ordinal);
+      for (; ordinal < rowsInBatch; ordinal++) {
+        if (column.hasNext()) {
+          // while (column.hasNext()) {
+          // Todo: this check works for flat schemas only
+          // need to get max definition level to do proper check
+          if (isOptional && column.currentDefinitionLevel() == 0) {
+            // handle null
+            column.nextNull();
+            nextNullAt(ordinal);
+          } else {
+            nextValueAt(ordinal);
+          }
         } else {
-          nextValueAt(ordinal);
+          // proceed to next rowgroup Or exit.
+          // LOG.info("**** No more in RowGroup. Exiting!");
+          break;
         }
-        ordinal++;
+        // }
       }
       vec.setValueCount(ordinal);
+      // LOG.info("=> Vector col:" + desc.getPrimitiveType().getPrimitiveTypeName() +
+      //     ", for setting batch size :" + rowsInBatch + ", with " + ordinal + " values");
       return vec;
     }
 
@@ -120,8 +145,9 @@ public class VectorizedParquetValueReaders {
 
   protected static class StringReader extends VectorReader {
 
-    StringReader(ColumnDescriptor desc, Types.NestedField icebergField, RootAllocator rootAlloc) {
-      super(desc, icebergField, rootAlloc);
+    StringReader(ColumnDescriptor desc, Types.NestedField icebergField,
+        BufferAllocator rootAlloc, int recordsPerBatch) {
+      super(desc, icebergField, rootAlloc, recordsPerBatch);
     }
 
     @Override
@@ -138,8 +164,8 @@ public class VectorizedParquetValueReaders {
         ((VarCharVector) getVector()).setNull(ordinal);
 
       } else {
-        String utf8Str = binary.toStringUsingUTF8();
-        ((VarCharVector) getVector()).setSafe(ordinal, utf8Str.getBytes());
+        // String utf8Str = binary.toStringUsingUTF8();
+        ((VarCharVector) getVector()).setSafe(ordinal, binary.getBytesUnsafe());
       }
     }
 
@@ -149,9 +175,9 @@ public class VectorizedParquetValueReaders {
 
     IntegerReader(ColumnDescriptor desc,
         Types.NestedField icebergField,
-        RootAllocator rootAlloc) {
+        BufferAllocator rootAlloc, int recordsPerBatch) {
 
-      super(desc, icebergField, rootAlloc);
+      super(desc, icebergField, rootAlloc, recordsPerBatch);
     }
 
     @Override
@@ -171,9 +197,9 @@ public class VectorizedParquetValueReaders {
 
     LongReader(ColumnDescriptor desc,
         Types.NestedField icebergField,
-        RootAllocator rootAlloc) {
+        BufferAllocator rootAlloc, int recordsPerBatch) {
 
-      super(desc, icebergField, rootAlloc);
+      super(desc, icebergField, rootAlloc, recordsPerBatch);
     }
 
     protected void nextNullAt(int ordinal) {
@@ -192,8 +218,8 @@ public class VectorizedParquetValueReaders {
 
     TimestampMillisReader(ColumnDescriptor desc,
         Types.NestedField icebergField,
-        RootAllocator rootAlloc) {
-      super(desc, icebergField, rootAlloc);
+        BufferAllocator rootAlloc, int recordsPerBatch) {
+      super(desc, icebergField, rootAlloc, recordsPerBatch);
     }
 
     protected void nextValueAt(int ordinal) {
@@ -208,8 +234,8 @@ public class VectorizedParquetValueReaders {
 
     TimestampMicroReader(ColumnDescriptor desc,
         Types.NestedField icebergField,
-        RootAllocator rootAlloc) {
-      super(desc, icebergField, rootAlloc);
+        BufferAllocator rootAlloc, int recordsPerBatch) {
+      super(desc, icebergField, rootAlloc, recordsPerBatch);
     }
 
     protected void nextNullAt(int ordinal) {
@@ -228,8 +254,8 @@ public class VectorizedParquetValueReaders {
 
     BooleanReader(ColumnDescriptor desc,
         Types.NestedField icebergField,
-        RootAllocator rootAlloc) {
-      super(desc, icebergField, rootAlloc);
+        BufferAllocator rootAlloc, int recordsPerBatch) {
+      super(desc, icebergField, rootAlloc, recordsPerBatch);
     }
 
     protected void nextNullAt(int ordinal) {
@@ -249,8 +275,8 @@ public class VectorizedParquetValueReaders {
 
     FloatReader(ColumnDescriptor desc,
         Types.NestedField icebergField,
-        RootAllocator rootAlloc) {
-      super(desc, icebergField, rootAlloc);
+        BufferAllocator rootAlloc, int recordsPerBatch) {
+      super(desc, icebergField, rootAlloc, recordsPerBatch);
     }
 
     protected void nextNullAt(int ordinal) {
@@ -269,8 +295,8 @@ public class VectorizedParquetValueReaders {
 
     DoubleReader(ColumnDescriptor desc,
         Types.NestedField icebergField,
-        RootAllocator rootAlloc) {
-      super(desc, icebergField, rootAlloc);
+        BufferAllocator rootAlloc, int recordsPerBatch) {
+      super(desc, icebergField, rootAlloc, recordsPerBatch);
     }
 
     protected void nextNullAt(int ordinal) {
@@ -290,8 +316,8 @@ public class VectorizedParquetValueReaders {
 
     BinaryReader(ColumnDescriptor desc,
         Types.NestedField icebergField,
-        RootAllocator rootAlloc) {
-      super(desc, icebergField, rootAlloc);
+        BufferAllocator rootAlloc, int recordsPerBatch) {
+      super(desc, icebergField, rootAlloc, recordsPerBatch);
     }
 
     protected void nextNullAt(int ordinal) {
@@ -311,8 +337,8 @@ public class VectorizedParquetValueReaders {
 
     DateReader(ColumnDescriptor desc,
         Types.NestedField icebergField,
-        RootAllocator rootAlloc) {
-      super(desc, icebergField, rootAlloc);
+        BufferAllocator rootAlloc, int recordsPerBatch) {
+      super(desc, icebergField, rootAlloc, recordsPerBatch);
     }
 
     protected void nextNullAt(int ordinal) {
@@ -334,10 +360,10 @@ public class VectorizedParquetValueReaders {
 
     IntegerDecimalReader(ColumnDescriptor desc,
         Types.NestedField icebergField,
-        RootAllocator rootAlloc,
-        int precision, int scale) {
+        BufferAllocator rootAlloc,
+        int precision, int scale, int recordsPerBatch) {
 
-      super(desc, icebergField, rootAlloc);
+      super(desc, icebergField, rootAlloc, recordsPerBatch);
       this.precision = precision;
       this.scale = scale;
     }
@@ -362,10 +388,10 @@ public class VectorizedParquetValueReaders {
 
     LongDecimalReader(ColumnDescriptor desc,
         Types.NestedField icebergField,
-        RootAllocator rootAlloc,
-        int precision, int scale) {
+        BufferAllocator rootAlloc,
+        int precision, int scale, int recordsPerBatch) {
 
-      super(desc, icebergField, rootAlloc);
+      super(desc, icebergField, rootAlloc, recordsPerBatch);
       this.precision = precision;
       this.scale = scale;
     }
@@ -390,10 +416,10 @@ public class VectorizedParquetValueReaders {
 
     BinaryDecimalReader(ColumnDescriptor desc,
         Types.NestedField icebergField,
-        RootAllocator rootAlloc,
-        int precision, int scale) {
+        BufferAllocator rootAlloc,
+        int precision, int scale, int recordsPerBatch) {
 
-      super(desc, icebergField, rootAlloc);
+      super(desc, icebergField, rootAlloc, recordsPerBatch);
       this.precision = precision;
       this.scale = scale;
     }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
index f283cda..7a86903 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
@@ -25,7 +25,7 @@ import com.google.common.collect.Maps;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.arrow.ArrowSchemaUtil;
@@ -39,10 +39,15 @@ import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.execution.arrow.ArrowUtils;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class VectorizedSparkParquetReaders {
 
+  private static final Logger LOG = LoggerFactory.getLogger(VectorizedSparkParquetReaders.class);
+
   private VectorizedSparkParquetReaders() {
   }
 
@@ -52,9 +57,21 @@ public class VectorizedSparkParquetReaders {
       Schema expectedSchema,
       MessageType fileSchema) {
 
+    return buildReader(tableSchema, expectedSchema, fileSchema,
+        VectorizedParquetValueReaders.VectorReader.DEFAULT_NUM_ROWS_IN_BATCH);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static ParquetValueReader<ColumnarBatch> buildReader(
+      Schema tableSchema,
+      Schema expectedSchema,
+      MessageType fileSchema,
+      Integer recordsPerBatch) {
+
+    LOG.info("=> [VectorizedSparkParquetReaders] recordsPerBatch = {}", recordsPerBatch);
     return (ParquetValueReader<ColumnarBatch>)
         TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
-            new ReadBuilder(tableSchema, expectedSchema, fileSchema));
+            new ReadBuilder(tableSchema, expectedSchema, fileSchema, recordsPerBatch));
   }
 
   private static class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
@@ -62,14 +79,19 @@ public class VectorizedSparkParquetReaders {
     private final Schema projectedIcebergSchema;
     private final Schema tableIcebergSchema;
     private final org.apache.arrow.vector.types.pojo.Schema arrowSchema;
-    private final RootAllocator rootAllocator;
+    private final BufferAllocator rootAllocator;
+    private final int recordsPerBatch;
 
-    ReadBuilder(Schema tableSchema, Schema projectedIcebergSchema, MessageType parquetSchema) {
+    ReadBuilder(Schema tableSchema, Schema projectedIcebergSchema, MessageType parquetSchema, int recordsPerBatch) {
       this.parquetSchema = parquetSchema;
       this.tableIcebergSchema = tableSchema;
       this.projectedIcebergSchema = projectedIcebergSchema;
       this.arrowSchema = ArrowSchemaUtil.convert(projectedIcebergSchema);
-      this.rootAllocator = new RootAllocator(Long.MAX_VALUE);
+      this.recordsPerBatch = recordsPerBatch;
+      // this.rootAllocator = new RootAllocator(Long.MAX_VALUE);
+      this.rootAllocator = ArrowUtils.rootAllocator().newChildAllocator("VectorizedReadBuilder",
+          0, Long.MAX_VALUE);
+      LOG.info("=> [ReadBuilder] recordsPerBatch = {}", this.recordsPerBatch);
     }
 
     @Override
@@ -142,24 +164,27 @@ public class VectorizedSparkParquetReaders {
           case ENUM:
           case JSON:
           case UTF8:
-            return new VectorizedParquetValueReaders.StringReader(desc, icebergField, rootAllocator);
+            //return new VectorizedParquetValueReaders.BinaryReader(desc, icebergField, rootAllocator, recordsPerBatch);
+            return new VectorizedParquetValueReaders.StringReader(desc, icebergField, rootAllocator, recordsPerBatch);
           case INT_8:
           case INT_16:
           case INT_32:
-            return new VectorizedParquetValueReaders.IntegerReader(desc, icebergField, rootAllocator);
+            return new VectorizedParquetValueReaders.IntegerReader(desc, icebergField, rootAllocator, recordsPerBatch);
             // if (expected != null && expected.typeId() == Types.LongType.get().typeId()) {
             //   return new ParquetValueReaders.IntAsLongReader(desc);
             // } else {
             //   return new ParquetValueReaders.UnboxedReader(desc);
             // }
           case DATE:
-            return new VectorizedParquetValueReaders.DateReader(desc, icebergField, rootAllocator);
+            return new VectorizedParquetValueReaders.DateReader(desc, icebergField, rootAllocator, recordsPerBatch);
           case INT_64:
-            return new VectorizedParquetValueReaders.LongReader(desc, icebergField, rootAllocator);
+            return new VectorizedParquetValueReaders.LongReader(desc, icebergField, rootAllocator, recordsPerBatch);
           case TIMESTAMP_MICROS:
-            return new VectorizedParquetValueReaders.TimestampMicroReader(desc, icebergField, rootAllocator);
+            return new VectorizedParquetValueReaders.TimestampMicroReader(desc, icebergField,
+                rootAllocator, recordsPerBatch);
           case TIMESTAMP_MILLIS:
-            return new VectorizedParquetValueReaders.TimestampMillisReader(desc, icebergField, rootAllocator);
+            return new VectorizedParquetValueReaders.TimestampMillisReader(desc, icebergField,
+                rootAllocator, recordsPerBatch);
           case DECIMAL:
             DecimalMetadata decimal = primitive.getDecimalMetadata();
             switch (primitive.getPrimitiveTypeName()) {
@@ -167,21 +192,21 @@ public class VectorizedSparkParquetReaders {
               case FIXED_LEN_BYTE_ARRAY:
                 return new VectorizedParquetValueReaders.BinaryDecimalReader(desc, icebergField, rootAllocator,
                     decimal.getPrecision(),
-                    decimal.getScale());
+                    decimal.getScale(), recordsPerBatch);
               case INT64:
                 return new VectorizedParquetValueReaders.LongDecimalReader(desc, icebergField, rootAllocator,
                     decimal.getPrecision(),
-                    decimal.getScale());
+                    decimal.getScale(), recordsPerBatch);
               case INT32:
                 return new VectorizedParquetValueReaders.IntegerDecimalReader(desc, icebergField, rootAllocator,
                     decimal.getPrecision(),
-                    decimal.getScale());
+                    decimal.getScale(), recordsPerBatch);
               default:
                 throw new UnsupportedOperationException(
                     "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
             }
           case BSON:
-            return new VectorizedParquetValueReaders.BinaryReader(desc, icebergField, rootAllocator);
+            return new VectorizedParquetValueReaders.BinaryReader(desc, icebergField, rootAllocator, recordsPerBatch);
           default:
             throw new UnsupportedOperationException(
                 "Unsupported logical type: " + primitive.getOriginalType());
@@ -191,22 +216,22 @@ public class VectorizedSparkParquetReaders {
       switch (primitive.getPrimitiveTypeName()) {
         case FIXED_LEN_BYTE_ARRAY:
         case BINARY:
-          return new VectorizedParquetValueReaders.BinaryReader(desc, icebergField, rootAllocator);
+          return new VectorizedParquetValueReaders.BinaryReader(desc, icebergField, rootAllocator, recordsPerBatch);
         case INT32:
-          return new VectorizedParquetValueReaders.IntegerReader(desc, icebergField, rootAllocator);
+          return new VectorizedParquetValueReaders.IntegerReader(desc, icebergField, rootAllocator, recordsPerBatch);
         case FLOAT:
-          return new VectorizedParquetValueReaders.FloatReader(desc, icebergField, rootAllocator);
+          return new VectorizedParquetValueReaders.FloatReader(desc, icebergField, rootAllocator, recordsPerBatch);
           // if (expected != null && expected.typeId() == org.apache.iceberg.types.Type.TypeID.DOUBLE) {
           //   return new ParquetValueReaders.FloatAsDoubleReader(desc);
           // } else {
           //   return new ParquetValueReaders.UnboxedReader<>(desc);
           // }
         case BOOLEAN:
-          return new VectorizedParquetValueReaders.BooleanReader(desc, icebergField, rootAllocator);
+          return new VectorizedParquetValueReaders.BooleanReader(desc, icebergField, rootAllocator, recordsPerBatch);
         case INT64:
-          return new VectorizedParquetValueReaders.LongReader(desc, icebergField, rootAllocator);
+          return new VectorizedParquetValueReaders.LongReader(desc, icebergField, rootAllocator, recordsPerBatch);
         case DOUBLE:
-          return new VectorizedParquetValueReaders.DoubleReader(desc, icebergField, rootAllocator);
+          return new VectorizedParquetValueReaders.DoubleReader(desc, icebergField, rootAllocator, recordsPerBatch);
         default:
           throw new UnsupportedOperationException("Unsupported type: " + primitive);
       }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index 8229b12..94b2079 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -75,7 +75,8 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
     Table table = getTableAndResolveHadoopConfiguration(options, conf);
     validateWriteSchema(table.schema(), dsStruct);
     String appId = lazySparkSession().sparkContext().applicationId();
-    return Optional.of(new Writer(table, options, mode == SaveMode.Overwrite, appId));
+    String wapId = lazySparkSession().conf().get("spark.wap.id", null);
+    return Optional.of(new Writer(table, options, mode == SaveMode.Overwrite, appId, wapId));
   }
 
   @Override
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 75d72f1..40a5a6d 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -59,6 +59,7 @@ import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.SparkAvroReader;
 import org.apache.iceberg.spark.data.SparkOrcReader;
+import org.apache.iceberg.spark.data.vector.VectorizedParquetValueReaders;
 import org.apache.iceberg.spark.data.vector.VectorizedSparkParquetReaders;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.ByteBuffers;
@@ -86,6 +87,8 @@ import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.apache.spark.unsafe.types.UTF8String;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.collection.JavaConverters;
 
 class Reader implements DataSourceReader,
@@ -96,13 +99,14 @@ class Reader implements DataSourceReader,
 
   private static final Filter[] NO_FILTERS = new Filter[0];
 
+  private static final Logger LOG = LoggerFactory.getLogger(Reader.class);
   private final Table table;
   private final Long snapshotId;
   private final Long asOfTimestamp;
   private final FileIO fileIo;
   private final EncryptionManager encryptionManager;
   private final boolean caseSensitive;
-  private int numRecordsPerBatch;
+  private final int numRecordsPerBatch;
   private StructType requestedSchema = null;
   private List<Expression> filterExpressions = null;
   private Filter[] pushedFilters = NO_FILTERS;
@@ -111,18 +115,21 @@ class Reader implements DataSourceReader,
   private Schema schema;
   private StructType type = null; // cached because Spark accesses it multiple times
   private List<CombinedScanTask> tasks = null; // lazy cache of tasks
-  private static final int DEFAULT_NUM_RECORDS_PER_BATCH = 1000;
 
   Reader(Table table, boolean caseSensitive, DataSourceOptions options) {
     this.table = table;
     this.snapshotId = options.get("snapshot-id").map(Long::parseLong).orElse(null);
     this.asOfTimestamp = options.get("as-of-timestamp").map(Long::parseLong).orElse(null);
     Optional<String> numRecordsPerBatchOpt = options.get("iceberg.read.numrecordsperbatch");
-    this.numRecordsPerBatch = DEFAULT_NUM_RECORDS_PER_BATCH;
     if (numRecordsPerBatchOpt.isPresent()) {
+
       this.numRecordsPerBatch = Integer.parseInt(numRecordsPerBatchOpt.get());
+
+    } else {
+
+      this.numRecordsPerBatch = VectorizedParquetValueReaders.VectorReader.DEFAULT_NUM_ROWS_IN_BATCH;
     }
-    // LOG.info("[IcebergSource] => Reading numRecordsPerBatch = "+numRecordsPerBatch);
+    LOG.info("=> Set Config numRecordsPerBatch = {}", numRecordsPerBatch);
 
     if (snapshotId != null && asOfTimestamp != null) {
       throw new IllegalArgumentException(
@@ -169,6 +176,7 @@ class Reader implements DataSourceReader,
           new ReadTask(task, tableSchemaString, expectedSchemaString, fileIo, encryptionManager,
               caseSensitive, numRecordsPerBatch));
     }
+    LOG.info("=> Batching input partitions with {} tasks.", readTasks.size());
 
     return readTasks;
   }
@@ -305,6 +313,7 @@ class Reader implements DataSourceReader,
       this.encryptionManager = encryptionManager;
       this.caseSensitive = caseSensitive;
       this.numRecordsPerBatch = numRecordsPerBatch;
+      LOG.info("=> [ReadTask] numRecordsPerBatch = {}", numRecordsPerBatch);
     }
 
     @Override
@@ -340,7 +349,7 @@ class Reader implements DataSourceReader,
     private final FileIO fileIo;
     private final Map<String, InputFile> inputFiles;
     private final boolean caseSensitive;
-    private final int numRecordsPerBatch;
+    private final Integer numRecordsPerBatch;
 
     private Iterator<ColumnarBatch> currentIterator;
     private Closeable currentCloseable = null;
@@ -361,9 +370,10 @@ class Reader implements DataSourceReader,
       decryptedFiles.forEach(decrypted -> inputFileBuilder.put(decrypted.location(), decrypted));
       this.inputFiles = inputFileBuilder.build();
       // open last because the schemas and fileIo must be set
+      this.numRecordsPerBatch = numRecordsPerBatch;
       this.currentIterator = open(tasks.next());
       this.caseSensitive = caseSensitive;
-      this.numRecordsPerBatch = numRecordsPerBatch;
+      LOG.info("=> [TaskDataReader] numRecordsPerBatch = {}", numRecordsPerBatch);
     }
 
     @Override
@@ -521,7 +531,7 @@ class Reader implements DataSourceReader,
           .project(readSchema, SparkSchemaUtil.convert(readSchema))
           .split(task.start(), task.length())
           .createReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(tableSchema, readSchema,
-              fileSchema))
+              fileSchema, numRecordsPerBatch))
           .filter(task.residual())
           .caseSensitive(caseSensitive)
           .recordsPerBatch(numRecordsPerBatch)
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index aa9becc..f520505 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -45,6 +45,7 @@ import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.encryption.EncryptionManager;
@@ -87,14 +88,20 @@ class Writer implements DataSourceWriter {
   private final EncryptionManager encryptionManager;
   private final boolean replacePartitions;
   private final String applicationId;
+  private final String wapId;
 
   Writer(Table table, DataSourceOptions options, boolean replacePartitions, String applicationId) {
+    this(table, options, replacePartitions, applicationId, null);
+  }
+
+  Writer(Table table, DataSourceOptions options, boolean replacePartitions, String applicationId, String wapId) {
     this.table = table;
     this.format = getFileFormat(table.properties(), options);
     this.fileIo = table.io();
     this.encryptionManager = table.encryption();
     this.replacePartitions = replacePartitions;
     this.applicationId = applicationId;
+    this.wapId = wapId;
   }
 
   private FileFormat getFileFormat(Map<String, String> tableProperties, DataSourceOptions options) {
@@ -104,6 +111,11 @@ class Writer implements DataSourceWriter {
     return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
   }
 
+  private boolean isWapTable() {
+    return Boolean.parseBoolean(table.properties().getOrDefault(
+        TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT));
+  }
+
   @Override
   public DataWriterFactory<InternalRow> createWriterFactory() {
     return new WriterFactory(
@@ -124,6 +136,14 @@ class Writer implements DataSourceWriter {
     if (applicationId != null) {
       operation.set("spark.app.id", applicationId);
     }
+
+    if (isWapTable() && wapId != null) {
+      // write-audit-publish is enabled for this table and job
+      // stage the changes without changing the current snapshot
+      operation.set("wap.id", wapId);
+      operation.stageOnly();
+    }
+
     long start = System.currentTimeMillis();
     operation.commit(); // abort is automatically called if this fails
     long duration = System.currentTimeMillis() - start;
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java
index da9a466..96933d4 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java
@@ -65,7 +65,7 @@ public class TestSparkParquetVectorizedReader extends AvroDataTest {
 
     try (CloseableIterable<ColumnarBatch> batchReader = Parquet.read(Files.localInput(testFile))
         .project(schema)
-        .createReaderFunc(type -> VectorizedSparkParquetReaders.buildReader(schema, schema, type))
+        .createReaderFunc(type -> VectorizedSparkParquetReaders.buildReader(schema, schema, type, 10000))
         .build()) {
 
       Iterator<ColumnarBatch> batches = batchReader.iterator();