You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/05/07 16:38:30 UTC

[incubator-iceberg] branch master updated: Add column reordering to UpdateSchema (#995)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 50cc2e0  Add column reordering to UpdateSchema (#995)
50cc2e0 is described below

commit 50cc2e0cdb5278a8fc3343522e0b33eb158285b3
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Thu May 7 09:38:15 2020 -0700

    Add column reordering to UpdateSchema (#995)
---
 .../main/java/org/apache/iceberg/UpdateSchema.java |  36 ++
 .../org/apache/iceberg/types/IndexParents.java     |  82 ++++
 .../java/org/apache/iceberg/types/TypeUtil.java    |   5 +
 .../main/java/org/apache/iceberg/SchemaUpdate.java | 217 ++++++++-
 .../java/org/apache/iceberg/TestSchemaUpdate.java  | 509 +++++++++++++++++++++
 5 files changed, 835 insertions(+), 14 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/UpdateSchema.java b/api/src/main/java/org/apache/iceberg/UpdateSchema.java
index 503be92..38df9bd 100644
--- a/api/src/main/java/org/apache/iceberg/UpdateSchema.java
+++ b/api/src/main/java/org/apache/iceberg/UpdateSchema.java
@@ -325,4 +325,40 @@ public interface UpdateSchema extends PendingUpdate<Schema> {
    */
   UpdateSchema deleteColumn(String name);
 
+  /**
+   * Move a column from its current position to the start of the schema or its parent struct.
+   * @param name name of the column to move
+   * @return this for method chaining
+   * @throws IllegalArgumentException If name doesn't identify a column in the schema or if this
+   *                                  change conflicts with other changes.
+   */
+  UpdateSchema moveFirst(String name);
+
+  /**
+   * Move a column from its current position to directly before a reference column.
+   * <p>
+   * The name is used to find the column to move using {@link Schema#findField(String)}. If the name identifies a nested
+   * column, it can only be moved within the nested struct that contains it.
+   *
+   * @param name name of the column to move
+   * @param beforeName name of the reference column
+   * @return this for method chaining
+   * @throws IllegalArgumentException If name doesn't identify a column in the schema or if this
+   *                                  change conflicts with other changes.
+   */
+  UpdateSchema moveBefore(String name, String beforeName);
+
+  /**
+   * Move a column from its current position to directly after a reference column.
+   * <p>
+   * The name is used to find the column to move using {@link Schema#findField(String)}. If the name identifies a nested
+   * column, it can only be moved within the nested struct that contains it.
+   *
+   * @param name name of the column to move
+   * @param afterName name of the reference column
+   * @return this for method chaining
+   * @throws IllegalArgumentException If name doesn't identify a column in the schema or if this
+   *                                  change conflicts with other changes.
+   */
+  UpdateSchema moveAfter(String name, String afterName);
 }
diff --git a/api/src/main/java/org/apache/iceberg/types/IndexParents.java b/api/src/main/java/org/apache/iceberg/types/IndexParents.java
new file mode 100644
index 0000000..eb4bf5c
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/types/IndexParents.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.types;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+
+public class IndexParents extends TypeUtil.SchemaVisitor<Map<Integer, Integer>> {
+  private final Map<Integer, Integer> idToParent = Maps.newHashMap();
+  private final Deque<Integer> idStack = Lists.newLinkedList();
+
+  @Override
+  public void beforeField(Types.NestedField field) {
+    idStack.push(field.fieldId());
+  }
+
+  @Override
+  public void afterField(Types.NestedField field) {
+    idStack.pop();
+  }
+
+  @Override
+  public Map<Integer, Integer> schema(Schema schema, Map<Integer, Integer> structResult) {
+    return idToParent;
+  }
+
+  @Override
+  public Map<Integer, Integer> struct(Types.StructType struct, List<Map<Integer, Integer>> fieldResults) {
+    for (Types.NestedField field : struct.fields()) {
+      Integer parentId = idStack.peek();
+      if (parentId != null) {
+        // fields in the root struct are not added
+        idToParent.put(field.fieldId(), parentId);
+      }
+    }
+    return idToParent;
+  }
+
+  @Override
+  public Map<Integer, Integer> field(Types.NestedField field, Map<Integer, Integer> fieldResult) {
+    return idToParent;
+  }
+
+  @Override
+  public Map<Integer, Integer> list(Types.ListType list, Map<Integer, Integer> element) {
+    idToParent.put(list.elementId(), idStack.peek());
+    return idToParent;
+  }
+
+  @Override
+  public Map<Integer, Integer> map(Types.MapType map, Map<Integer, Integer> key, Map<Integer, Integer> value) {
+    idToParent.put(map.keyId(), idStack.peek());
+    idToParent.put(map.valueId(), idStack.peek());
+    return idToParent;
+  }
+
+  @Override
+  public Map<Integer, Integer> primitive(Type.PrimitiveType primitive) {
+    return idToParent;
+  }
+}
diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
index 82c23f1..58db0aa 100644
--- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
+++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.types;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -95,6 +96,10 @@ public class TypeUtil {
     return visit(struct, new IndexById());
   }
 
+  public static Map<Integer, Integer> indexParents(Types.StructType struct) {
+    return ImmutableMap.copyOf(visit(struct, new IndexParents()));
+  }
+
   /**
    * Assigns fresh ids from the {@link NextID nextId function} for all fields in a type.
    *
diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
index a1ef08c..1700107 100644
--- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
@@ -20,11 +20,13 @@
 package org.apache.iceberg;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 import java.util.Collection;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -47,18 +49,23 @@ class SchemaUpdate implements UpdateSchema {
   private final TableOperations ops;
   private final TableMetadata base;
   private final Schema schema;
+  private final Map<Integer, Integer> idToParent;
   private final List<Integer> deletes = Lists.newArrayList();
   private final Map<Integer, Types.NestedField> updates = Maps.newHashMap();
   private final Multimap<Integer, Types.NestedField> adds =
       Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
+  private final Map<String, Integer> addedNameToId = Maps.newHashMap();
+  private final Multimap<Integer, Move> moves = Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
   private int lastColumnId;
   private boolean allowIncompatibleChanges = false;
 
+
   SchemaUpdate(TableOperations ops) {
     this.ops = ops;
     this.base = ops.current();
     this.schema = base.schema();
     this.lastColumnId = base.lastColumnId();
+    this.idToParent = Maps.newHashMap(TypeUtil.indexParents(schema.asStruct()));
   }
 
   /**
@@ -69,6 +76,7 @@ class SchemaUpdate implements UpdateSchema {
     this.base = null;
     this.schema = schema;
     this.lastColumnId = lastColumnId;
+    this.idToParent = Maps.newHashMap(TypeUtil.indexParents(schema.asStruct()));
   }
 
   @Override
@@ -108,6 +116,7 @@ class SchemaUpdate implements UpdateSchema {
 
   private void internalAddColumn(String parent, String name, boolean isOptional, Type type, String doc) {
     int parentId = TABLE_ROOT_ID;
+    String fullName;
     if (parent != null) {
       Types.NestedField parentField = schema.findField(parent);
       Preconditions.checkArgument(parentField != null, "Cannot find parent struct: %s", parent);
@@ -130,13 +139,22 @@ class SchemaUpdate implements UpdateSchema {
           "Cannot add to a column that will be deleted: %s", parent);
       Preconditions.checkArgument(schema.findField(parent + "." + name) == null,
           "Cannot add column, name already exists: %s.%s", parent, name);
+      fullName = schema.findColumnName(parentId) + "." + name;
     } else {
       Preconditions.checkArgument(schema.findField(name) == null,
           "Cannot add column, name already exists: %s", name);
+      fullName = name;
     }
 
     // assign new IDs in order
     int newId = assignNewColumnId();
+
+    // update tracking for moves
+    addedNameToId.put(fullName, newId);
+    if (parentId != TABLE_ROOT_ID) {
+      idToParent.put(newId, parentId);
+    }
+
     adds.put(parentId, Types.NestedField.of(newId, isOptional, name,
         TypeUtil.assignFreshIds(type, this::assignNewColumnId), doc));
   }
@@ -260,6 +278,69 @@ class SchemaUpdate implements UpdateSchema {
     return this;
   }
 
+  @Override
+  public UpdateSchema moveFirst(String name) {
+    Integer fieldId = findForMove(name);
+    Preconditions.checkArgument(fieldId != null, "Cannot move missing column: %s", name);
+    internalMove(name, Move.first(fieldId));
+    return this;
+  }
+
+  @Override
+  public UpdateSchema moveBefore(String name, String beforeName) {
+    Integer fieldId = findForMove(name);
+    Preconditions.checkArgument(fieldId != null, "Cannot move missing column: %s", name);
+    Integer beforeId = findForMove(beforeName);
+    Preconditions.checkArgument(beforeId != null, "Cannot move %s before missing column: %s", name, beforeName);
+    Preconditions.checkArgument(!fieldId.equals(beforeId), "Cannot move %s before itself", name);
+    internalMove(name, Move.before(fieldId, beforeId));
+    return this;
+  }
+
+  @Override
+  public UpdateSchema moveAfter(String name, String afterName) {
+    Integer fieldId = findForMove(name);
+    Preconditions.checkArgument(fieldId != null, "Cannot move missing column: %s", name);
+    Integer afterId = findForMove(afterName);
+    Preconditions.checkArgument(afterId != null, "Cannot move %s after missing column: %s", name, afterName);
+    Preconditions.checkArgument(!fieldId.equals(afterId), "Cannot move %s after itself", name);
+    internalMove(name, Move.after(fieldId, afterId));
+    return this;
+  }
+
+  private Integer findForMove(String name) {
+    Types.NestedField field = schema.findField(name);
+    if (field != null) {
+      return field.fieldId();
+    }
+    return addedNameToId.get(name);
+  }
+
+  private void internalMove(String name, Move move) {
+    Integer parentId = idToParent.get(move.fieldId());
+    if (parentId != null) {
+      Types.NestedField parent = schema.findField(parentId);
+      Preconditions.checkArgument(parent.type().isStructType(),
+          "Cannot move fields in non-struct type: %s", parent.type());
+
+      if (move.type() == Move.MoveType.AFTER || move.type() == Move.MoveType.BEFORE) {
+        Preconditions.checkArgument(
+            parentId.equals(idToParent.get(move.referenceFieldId())),
+            "Cannot move field %s to a different struct", name);
+      }
+
+      moves.put(parentId, move);
+    } else {
+      if (move.type() == Move.MoveType.AFTER || move.type() == Move.MoveType.BEFORE) {
+        Preconditions.checkArgument(
+            idToParent.get(move.referenceFieldId()) == null,
+            "Cannot move field %s to a different struct", name);
+      }
+
+      moves.put(TABLE_ROOT_ID, move);
+    }
+  }
+
   /**
    * Apply the pending changes to the original schema and returns the result.
    * <p>
@@ -269,7 +350,7 @@ class SchemaUpdate implements UpdateSchema {
    */
   @Override
   public Schema apply() {
-    return applyChanges(schema, deletes, updates, adds);
+    return applyChanges(schema, deletes, updates, adds, moves);
   }
 
   @Override
@@ -310,9 +391,10 @@ class SchemaUpdate implements UpdateSchema {
 
   private static Schema applyChanges(Schema schema, List<Integer> deletes,
                                      Map<Integer, Types.NestedField> updates,
-                                     Multimap<Integer, Types.NestedField> adds) {
+                                     Multimap<Integer, Types.NestedField> adds,
+                                     Multimap<Integer, Move> moves) {
     Types.StructType struct = TypeUtil
-        .visit(schema, new ApplyChanges(deletes, updates, adds))
+        .visit(schema, new ApplyChanges(deletes, updates, adds, moves))
         .asNestedType().asStructType();
     return new Schema(struct.fields());
   }
@@ -321,20 +403,25 @@ class SchemaUpdate implements UpdateSchema {
     private final List<Integer> deletes;
     private final Map<Integer, Types.NestedField> updates;
     private final Multimap<Integer, Types.NestedField> adds;
+    private final Multimap<Integer, Move> moves;
 
     private ApplyChanges(List<Integer> deletes,
-                        Map<Integer, Types.NestedField> updates,
-                        Multimap<Integer, Types.NestedField> adds) {
+                         Map<Integer, Types.NestedField> updates,
+                         Multimap<Integer, Types.NestedField> adds,
+                         Multimap<Integer, Move> moves) {
       this.deletes = deletes;
       this.updates = updates;
       this.adds = adds;
+      this.moves = moves;
     }
 
     @Override
     public Type schema(Schema schema, Type structResult) {
-      Collection<Types.NestedField> newColumns = adds.get(TABLE_ROOT_ID);
-      if (newColumns != null) {
-        return addFields(structResult.asNestedType().asStructType(), newColumns);
+      List<Types.NestedField> fields = addAndMoveFields(structResult.asStructType().fields(),
+          adds.get(TABLE_ROOT_ID), moves.get(TABLE_ROOT_ID));
+
+      if (fields != null) {
+        return Types.StructType.of(fields);
       }
 
       return structResult;
@@ -399,8 +486,14 @@ class SchemaUpdate implements UpdateSchema {
 
       // handle adds
       Collection<Types.NestedField> newFields = adds.get(fieldId);
-      if (newFields != null && !newFields.isEmpty()) {
-        return addFields(fieldResult.asNestedType().asStructType(), newFields);
+      Collection<Move> columnsToMove = moves.get(fieldId);
+      if (!newFields.isEmpty() || !columnsToMove.isEmpty()) {
+        // if either collection is non-null, then this must be a struct type. try to apply the changes
+        List<Types.NestedField> fields = addAndMoveFields(
+            fieldResult.asStructType().fields(), newFields, columnsToMove);
+        if (fields != null) {
+          return Types.StructType.of(fields);
+        }
       }
 
       return fieldResult;
@@ -470,10 +563,106 @@ class SchemaUpdate implements UpdateSchema {
     }
   }
 
-  private static Types.StructType addFields(Types.StructType struct,
-                                            Collection<Types.NestedField> adds) {
-    List<Types.NestedField> newFields = Lists.newArrayList(struct.fields());
+  private static List<Types.NestedField> addAndMoveFields(List<Types.NestedField> fields,
+                                                          Collection<Types.NestedField> adds,
+                                                          Collection<Move> moves) {
+    if (adds != null && !adds.isEmpty()) {
+      if (moves != null && !moves.isEmpty()) {
+        // always apply adds first so that added fields can be moved
+        return moveFields(addFields(fields, adds), moves);
+      } else {
+        return addFields(fields, adds);
+      }
+    } else if (moves != null && !moves.isEmpty()) {
+      return moveFields(fields, moves);
+    }
+    return null;
+  }
+
+  private static List<Types.NestedField> addFields(List<Types.NestedField> fields,
+                                                   Collection<Types.NestedField> adds) {
+    List<Types.NestedField> newFields = Lists.newArrayList(fields);
     newFields.addAll(adds);
-    return Types.StructType.of(newFields);
+    return newFields;
+  }
+
+  @SuppressWarnings("checkstyle:IllegalType")
+  private static List<Types.NestedField> moveFields(List<Types.NestedField> fields,
+                                                    Collection<Move> moves) {
+    LinkedList<Types.NestedField> reordered = Lists.newLinkedList(fields);
+
+    for (Move move : moves) {
+      Types.NestedField toMove = Iterables.find(reordered, field -> field.fieldId() == move.fieldId());
+      reordered.remove(toMove);
+
+      switch (move.type()) {
+        case FIRST:
+          reordered.addFirst(toMove);
+          break;
+
+        case BEFORE:
+          Types.NestedField before = Iterables.find(reordered, field -> field.fieldId() == move.referenceFieldId());
+          int beforeIndex = reordered.indexOf(before);
+          // insert the new node at the index of the existing node
+          reordered.add(beforeIndex, toMove);
+          break;
+
+        case AFTER:
+          Types.NestedField after = Iterables.find(reordered, field -> field.fieldId() == move.referenceFieldId());
+          int afterIndex = reordered.indexOf(after);
+          reordered.add(afterIndex + 1, toMove);
+          break;
+
+        default:
+          throw new UnsupportedOperationException("Unknown move type: " + move.type());
+      }
+    }
+
+    return reordered;
+  }
+
+  /**
+   * Represents a requested column move in a struct.
+   */
+  private static class Move {
+    private enum MoveType {
+      FIRST,
+      BEFORE,
+      AFTER
+    }
+
+    static Move first(int fieldId) {
+      return new Move(fieldId, -1, MoveType.FIRST);
+    }
+
+    static Move before(int fieldId, int referenceFieldId) {
+      return new Move(fieldId, referenceFieldId, MoveType.BEFORE);
+    }
+
+    static Move after(int fieldId, int referenceFieldId) {
+      return new Move(fieldId, referenceFieldId, MoveType.AFTER);
+    }
+
+    private final int fieldId;
+    private final int referenceFieldId;
+    private final MoveType type;
+
+    private Move(int fieldId, int referenceFieldId, MoveType type) {
+      this.fieldId = fieldId;
+      this.referenceFieldId = referenceFieldId;
+      this.type = type;
+    }
+
+    public int fieldId() {
+      return fieldId;
+    }
+
+    public int referenceFieldId() {
+      return referenceFieldId;
+    }
+
+    public MoveType type() {
+      return type;
+    }
   }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java b/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java
index 0875ab6..ad3afdf 100644
--- a/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java
+++ b/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java
@@ -659,4 +659,513 @@ public class TestSchemaUpdate {
         }
     );
   }
+
+  @Test
+  public void testMultipleMoves() {
+    Schema schema = new Schema(
+        required(1, "a", Types.IntegerType.get()),
+        required(2, "b", Types.IntegerType.get()),
+        required(3, "c", Types.IntegerType.get()),
+        required(4, "d", Types.IntegerType.get()));
+
+    Schema expected = new Schema(
+        required(3, "c", Types.IntegerType.get()),
+        required(2, "b", Types.IntegerType.get()),
+        required(4, "d", Types.IntegerType.get()),
+        required(1, "a", Types.IntegerType.get()));
+
+    // moves are applied in order
+    Schema actual = new SchemaUpdate(schema, 4)
+        .moveFirst("d")
+        .moveFirst("c")
+        .moveAfter("b", "d")
+        .moveBefore("d", "a")
+        .apply();
+
+    Assert.assertEquals("Schema should match", expected.asStruct(), actual.asStruct());
+  }
+
+  @Test
+  public void testMoveTopLevelColumnFirst() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()));
+    Schema expected = new Schema(
+        required(2, "data", Types.StringType.get()),
+        required(1, "id", Types.LongType.get()));
+
+    Schema actual = new SchemaUpdate(schema, 2)
+        .moveFirst("data")
+        .apply();
+
+    Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+  }
+
+  @Test
+  public void testMoveTopLevelColumnBeforeFirst() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()));
+    Schema expected = new Schema(
+        required(2, "data", Types.StringType.get()),
+        required(1, "id", Types.LongType.get()));
+
+    Schema actual = new SchemaUpdate(schema, 2)
+        .moveBefore("data", "id")
+        .apply();
+
+    Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+  }
+
+  @Test
+  public void testMoveTopLevelColumnAfterLast() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()));
+    Schema expected = new Schema(
+        required(2, "data", Types.StringType.get()),
+        required(1, "id", Types.LongType.get()));
+
+    Schema actual = new SchemaUpdate(schema, 2)
+        .moveAfter("id", "data")
+        .apply();
+
+    Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+  }
+
+  @Test
+  public void testMoveTopLevelColumnAfter() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()),
+        optional(3, "ts", Types.TimestampType.withZone()));
+    Schema expected = new Schema(
+        required(1, "id", Types.LongType.get()),
+        optional(3, "ts", Types.TimestampType.withZone()),
+        required(2, "data", Types.StringType.get()));
+
+    Schema actual = new SchemaUpdate(schema, 3)
+        .moveAfter("ts", "id")
+        .apply();
+
+    Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+  }
+
+  @Test
+  public void testMoveTopLevelColumnBefore() {
+    Schema schema = new Schema(
+        optional(3, "ts", Types.TimestampType.withZone()),
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()));
+    Schema expected = new Schema(
+        required(1, "id", Types.LongType.get()),
+        optional(3, "ts", Types.TimestampType.withZone()),
+        required(2, "data", Types.StringType.get()));
+
+    Schema actual = new SchemaUpdate(schema, 3)
+        .moveBefore("ts", "data")
+        .apply();
+
+    Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+  }
+
+  @Test
+  public void testMoveNestedFieldFirst() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "struct", Types.StructType.of(
+            required(3, "count", Types.LongType.get()),
+            required(4, "data", Types.StringType.get()))));
+    Schema expected = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "struct", Types.StructType.of(
+            required(4, "data", Types.StringType.get()),
+            required(3, "count", Types.LongType.get()))));
+
+    Schema actual = new SchemaUpdate(schema, 4)
+        .moveFirst("struct.data")
+        .apply();
+
+    Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+  }
+
+  @Test
+  public void testMoveNestedFieldBeforeFirst() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "struct", Types.StructType.of(
+            required(3, "count", Types.LongType.get()),
+            required(4, "data", Types.StringType.get()))));
+    Schema expected = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "struct", Types.StructType.of(
+            required(4, "data", Types.StringType.get()),
+            required(3, "count", Types.LongType.get()))));
+
+    Schema actual = new SchemaUpdate(schema, 4)
+        .moveBefore("struct.data", "struct.count")
+        .apply();
+
+    Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+  }
+
+  @Test
+  public void testMoveNestedFieldAfterLast() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "struct", Types.StructType.of(
+            required(3, "count", Types.LongType.get()),
+            required(4, "data", Types.StringType.get()))));
+    Schema expected = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "struct", Types.StructType.of(
+            required(4, "data", Types.StringType.get()),
+            required(3, "count", Types.LongType.get()))));
+
+    Schema actual = new SchemaUpdate(schema, 4)
+        .moveAfter("struct.count", "struct.data")
+        .apply();
+
+    Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+  }
+
+  @Test
+  public void testMoveNestedFieldAfter() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "struct", Types.StructType.of(
+            required(3, "count", Types.LongType.get()),
+            required(4, "data", Types.StringType.get()),
+            optional(5, "ts", Types.TimestampType.withZone()))));
+    Schema expected = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "struct", Types.StructType.of(
+            required(3, "count", Types.LongType.get()),
+            optional(5, "ts", Types.TimestampType.withZone()),
+            required(4, "data", Types.StringType.get()))));
+
+    Schema actual = new SchemaUpdate(schema, 5)
+        .moveAfter("struct.ts", "struct.count")
+        .apply();
+
+    Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+  }
+
+  @Test
+  public void testMoveNestedFieldBefore() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "struct", Types.StructType.of(
+            optional(5, "ts", Types.TimestampType.withZone()),
+            required(3, "count", Types.LongType.get()),
+            required(4, "data", Types.StringType.get()))));
+    Schema expected = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "struct", Types.StructType.of(
+            required(3, "count", Types.LongType.get()),
+            optional(5, "ts", Types.TimestampType.withZone()),
+            required(4, "data", Types.StringType.get()))));
+
+    Schema actual = new SchemaUpdate(schema, 5)
+        .moveBefore("struct.ts", "struct.data")
+        .apply();
+
+    Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+  }
+
+  @Test
+  public void testMoveListElementField() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "list", Types.ListType.ofOptional(6, Types.StructType.of(
+            optional(5, "ts", Types.TimestampType.withZone()),
+            required(3, "count", Types.LongType.get()),
+            required(4, "data", Types.StringType.get())))));
+    Schema expected = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "list", Types.ListType.ofOptional(6, Types.StructType.of(
+            required(3, "count", Types.LongType.get()),
+            optional(5, "ts", Types.TimestampType.withZone()),
+            required(4, "data", Types.StringType.get())))));
+
+    Schema actual = new SchemaUpdate(schema, 6)
+        .moveBefore("list.ts", "list.data")
+        .apply();
+
+    Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+  }
+
+  @Test
+  public void testMoveMapValueStructField() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "map", Types.MapType.ofOptional(6, 7, Types.StringType.get(), Types.StructType.of(
+            optional(5, "ts", Types.TimestampType.withZone()),
+            required(3, "count", Types.LongType.get()),
+            required(4, "data", Types.StringType.get())))));
+    Schema expected = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "map", Types.MapType.ofOptional(6, 7, Types.StringType.get(), Types.StructType.of(
+            required(3, "count", Types.LongType.get()),
+            optional(5, "ts", Types.TimestampType.withZone()),
+            required(4, "data", Types.StringType.get())))));
+
+    Schema actual = new SchemaUpdate(schema, 7)
+        .moveBefore("map.ts", "map.data")
+        .apply();
+
+    Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+  }
+
+  @Test
+  public void testMoveAddedTopLevelColumn() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()));
+    Schema expected = new Schema(
+        required(1, "id", Types.LongType.get()),
+        optional(3, "ts", Types.TimestampType.withZone()),
+        required(2, "data", Types.StringType.get()));
+
+    Schema actual = new SchemaUpdate(schema, 2)
+        .addColumn("ts", Types.TimestampType.withZone())
+        .moveAfter("ts", "id")
+        .apply();
+
+    Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+  }
+
+  @Test
+  public void testMoveAddedTopLevelColumnAfterAddedColumn() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()));
+    Schema expected = new Schema(
+        required(1, "id", Types.LongType.get()),
+        optional(3, "ts", Types.TimestampType.withZone()),
+        optional(4, "count", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()));
+
+    Schema actual = new SchemaUpdate(schema, 2)
+        .addColumn("ts", Types.TimestampType.withZone())
+        .addColumn("count", Types.LongType.get())
+        .moveAfter("ts", "id")
+        .moveAfter("count", "ts")
+        .apply();
+
+    Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+  }
+
+  @Test
+  public void testMoveAddedNestedStructField() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "struct", Types.StructType.of(
+            required(3, "count", Types.LongType.get()),
+            required(4, "data", Types.StringType.get()))));
+    Schema expected = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "struct", Types.StructType.of(
+            optional(5, "ts", Types.TimestampType.withZone()),
+            required(3, "count", Types.LongType.get()),
+            required(4, "data", Types.StringType.get()))));
+
+    Schema actual = new SchemaUpdate(schema, 4)
+        .addColumn("struct", "ts", Types.TimestampType.withZone())
+        .moveBefore("struct.ts", "struct.count")
+        .apply();
+
+    Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+  }
+
+  @Test
+  public void testMoveAddedNestedStructFieldBeforeAddedColumn() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "struct", Types.StructType.of(
+            required(3, "count", Types.LongType.get()),
+            required(4, "data", Types.StringType.get()))));
+    Schema expected = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "struct", Types.StructType.of(
+            optional(6, "size", Types.LongType.get()),
+            optional(5, "ts", Types.TimestampType.withZone()),
+            required(3, "count", Types.LongType.get()),
+            required(4, "data", Types.StringType.get()))));
+
+    Schema actual = new SchemaUpdate(schema, 4)
+        .addColumn("struct", "ts", Types.TimestampType.withZone())
+        .addColumn("struct", "size", Types.LongType.get())
+        .moveBefore("struct.ts", "struct.count")
+        .moveBefore("struct.size", "struct.ts")
+        .apply();
+
+    Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+  }
+
+  @Test
+  public void testMoveSelfReferenceFails() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()));
+
+    AssertHelpers.assertThrows("Should fail move for a field that is not in the schema",
+        IllegalArgumentException.class, "Cannot move id before itself", () ->
+            new SchemaUpdate(schema, 2)
+                .moveBefore("id", "id")
+                .apply());
+
+    AssertHelpers.assertThrows("Should fail move for a field that is not in the schema",
+        IllegalArgumentException.class, "Cannot move id after itself", () ->
+            new SchemaUpdate(schema, 2)
+                .moveAfter("id", "id")
+                .apply());
+  }
+
+  @Test
+  public void testMoveMissingColumnFails() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()));
+
+    AssertHelpers.assertThrows("Should fail move for a field that is not in the schema",
+        IllegalArgumentException.class, "Cannot move missing column", () ->
+            new SchemaUpdate(schema, 2)
+                .moveFirst("items")
+                .apply());
+
+    AssertHelpers.assertThrows("Should fail move for a field that is not in the schema",
+        IllegalArgumentException.class, "Cannot move missing column", () ->
+            new SchemaUpdate(schema, 2)
+                .moveBefore("items", "id")
+                .apply());
+
+    AssertHelpers.assertThrows("Should fail move for a field that is not in the schema",
+        IllegalArgumentException.class, "Cannot move missing column", () ->
+            new SchemaUpdate(schema, 2)
+                .moveAfter("items", "data")
+                .apply());
+  }
+
+  @Test
+  public void testMoveBeforeAddFails() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()));
+
+    AssertHelpers.assertThrows("Should fail move for a field that has not been added yet",
+        IllegalArgumentException.class, "Cannot move missing column", () ->
+            new SchemaUpdate(schema, 2)
+                .moveFirst("ts")
+                .addColumn("ts", Types.TimestampType.withZone())
+                .apply());
+
+    AssertHelpers.assertThrows("Should fail move for a field that has not been added yet",
+        IllegalArgumentException.class, "Cannot move missing column", () ->
+            new SchemaUpdate(schema, 2)
+                .moveBefore("ts", "id")
+                .addColumn("ts", Types.TimestampType.withZone())
+                .apply());
+
+    AssertHelpers.assertThrows("Should fail move for a field that has not been added yet",
+        IllegalArgumentException.class, "Cannot move missing column", () ->
+            new SchemaUpdate(schema, 2)
+                .moveAfter("ts", "data")
+                .addColumn("ts", Types.TimestampType.withZone())
+                .apply());
+  }
+
+  @Test
+  public void testMoveMissingReferenceColumnFails() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()));
+
+    AssertHelpers.assertThrows("Should fail move before a field that is not in the schema",
+        IllegalArgumentException.class, "Cannot move id before missing column", () ->
+            new SchemaUpdate(schema, 2)
+                .moveBefore("id", "items")
+                .apply());
+
+    AssertHelpers.assertThrows("Should fail move after for a field that is not in the schema",
+        IllegalArgumentException.class, "Cannot move data after missing column", () ->
+            new SchemaUpdate(schema, 2)
+                .moveAfter("data", "items")
+                .apply());
+  }
+
+  @Test
+  public void testMovePrimitiveMapKeyFails() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()),
+        optional(3, "map", Types.MapType.ofRequired(4, 5, Types.StringType.get(), Types.StringType.get())));
+
+    AssertHelpers.assertThrows("Should fail move for map key",
+        IllegalArgumentException.class, "Cannot move fields in non-struct type", () ->
+            new SchemaUpdate(schema, 5)
+                .moveBefore("map.key", "map.value")
+                .apply());
+  }
+
+  @Test
+  public void testMovePrimitiveMapValueFails() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()),
+        optional(3, "map", Types.MapType.ofRequired(4, 5, Types.StringType.get(), Types.StructType.of())));
+
+    AssertHelpers.assertThrows("Should fail move for map value",
+        IllegalArgumentException.class, "Cannot move fields in non-struct type", () ->
+            new SchemaUpdate(schema, 5)
+                .moveBefore("map.value", "map.key")
+                .apply());
+  }
+
+  @Test
+  public void testMovePrimitiveListElementFails() {
+    Schema schema = new Schema(
+        required(1, "id", Types.LongType.get()),
+        required(2, "data", Types.StringType.get()),
+        optional(3, "list", Types.ListType.ofRequired(4, Types.StringType.get())));
+
+    AssertHelpers.assertThrows("Should fail move for list element",
+        IllegalArgumentException.class, "Cannot move fields in non-struct type", () ->
+            new SchemaUpdate(schema, 4)
+                .moveBefore("list.element", "list")
+                .apply());
+  }
+
+  @Test
+  public void testMoveTopLevelBetweenStructsFails() {
+    Schema schema = new Schema(
+        required(1, "a", Types.IntegerType.get()),
+        required(2, "b", Types.IntegerType.get()),
+        required(3, "struct", Types.StructType.of(
+            required(4, "x", Types.IntegerType.get()),
+            required(5, "y", Types.IntegerType.get()))));
+
+    AssertHelpers.assertThrows("Should fail move between separate structs",
+        IllegalArgumentException.class, "Cannot move field a to a different struct", () ->
+            new SchemaUpdate(schema, 5)
+                .moveBefore("a", "struct.x")
+                .apply());
+  }
+
+  @Test
+  public void testMoveBetweenStructsFails() {
+    Schema schema = new Schema(
+        required(1, "s1", Types.StructType.of(
+            required(3, "a", Types.IntegerType.get()),
+            required(4, "b", Types.IntegerType.get()))),
+        required(2, "s2", Types.StructType.of(
+            required(5, "x", Types.IntegerType.get()),
+            required(6, "y", Types.IntegerType.get()))));
+
+    AssertHelpers.assertThrows("Should fail move between separate structs",
+        IllegalArgumentException.class, "Cannot move field s2.x to a different struct", () ->
+            new SchemaUpdate(schema, 6)
+                .moveBefore("s2.x", "s1.a")
+                .apply());
+  }
 }