You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/05/07 16:38:30 UTC
[incubator-iceberg] branch master updated: Add column reordering to
UpdateSchema (#995)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 50cc2e0 Add column reordering to UpdateSchema (#995)
50cc2e0 is described below
commit 50cc2e0cdb5278a8fc3343522e0b33eb158285b3
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Thu May 7 09:38:15 2020 -0700
Add column reordering to UpdateSchema (#995)
---
.../main/java/org/apache/iceberg/UpdateSchema.java | 36 ++
.../org/apache/iceberg/types/IndexParents.java | 82 ++++
.../java/org/apache/iceberg/types/TypeUtil.java | 5 +
.../main/java/org/apache/iceberg/SchemaUpdate.java | 217 ++++++++-
.../java/org/apache/iceberg/TestSchemaUpdate.java | 509 +++++++++++++++++++++
5 files changed, 835 insertions(+), 14 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/UpdateSchema.java b/api/src/main/java/org/apache/iceberg/UpdateSchema.java
index 503be92..38df9bd 100644
--- a/api/src/main/java/org/apache/iceberg/UpdateSchema.java
+++ b/api/src/main/java/org/apache/iceberg/UpdateSchema.java
@@ -325,4 +325,40 @@ public interface UpdateSchema extends PendingUpdate<Schema> {
*/
UpdateSchema deleteColumn(String name);
+ /**
+ * Move a column from its current position to the start of the schema or its parent struct.
+ * @param name name of the column to move
+ * @return this for method chaining
+ * @throws IllegalArgumentException If name doesn't identify a column in the schema or if this
+ * change conflicts with other changes.
+ */
+ UpdateSchema moveFirst(String name);
+
+ /**
+ * Move a column from its current position to directly before a reference column.
+ * <p>
+ * The name is used to find the column to move using {@link Schema#findField(String)}. If the name identifies a nested
+ * column, it can only be moved within the nested struct that contains it.
+ *
+ * @param name name of the column to move
+ * @param beforeName name of the reference column
+ * @return this for method chaining
+ * @throws IllegalArgumentException If name doesn't identify a column in the schema or if this
+ * change conflicts with other changes.
+ */
+ UpdateSchema moveBefore(String name, String beforeName);
+
+ /**
+ * Move a column from its current position to directly after a reference column.
+ * <p>
+ * The name is used to find the column to move using {@link Schema#findField(String)}. If the name identifies a nested
+ * column, it can only be moved within the nested struct that contains it.
+ *
+ * @param name name of the column to move
+ * @param afterName name of the reference column
+ * @return this for method chaining
+ * @throws IllegalArgumentException If name doesn't identify a column in the schema or if this
+ * change conflicts with other changes.
+ */
+ UpdateSchema moveAfter(String name, String afterName);
}
diff --git a/api/src/main/java/org/apache/iceberg/types/IndexParents.java b/api/src/main/java/org/apache/iceberg/types/IndexParents.java
new file mode 100644
index 0000000..eb4bf5c
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/types/IndexParents.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.types;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+
+public class IndexParents extends TypeUtil.SchemaVisitor<Map<Integer, Integer>> {
+ private final Map<Integer, Integer> idToParent = Maps.newHashMap();
+ private final Deque<Integer> idStack = Lists.newLinkedList();
+
+ @Override
+ public void beforeField(Types.NestedField field) {
+ idStack.push(field.fieldId());
+ }
+
+ @Override
+ public void afterField(Types.NestedField field) {
+ idStack.pop();
+ }
+
+ @Override
+ public Map<Integer, Integer> schema(Schema schema, Map<Integer, Integer> structResult) {
+ return idToParent;
+ }
+
+ @Override
+ public Map<Integer, Integer> struct(Types.StructType struct, List<Map<Integer, Integer>> fieldResults) {
+ for (Types.NestedField field : struct.fields()) {
+ Integer parentId = idStack.peek();
+ if (parentId != null) {
+ // fields in the root struct are not added
+ idToParent.put(field.fieldId(), parentId);
+ }
+ }
+ return idToParent;
+ }
+
+ @Override
+ public Map<Integer, Integer> field(Types.NestedField field, Map<Integer, Integer> fieldResult) {
+ return idToParent;
+ }
+
+ @Override
+ public Map<Integer, Integer> list(Types.ListType list, Map<Integer, Integer> element) {
+ idToParent.put(list.elementId(), idStack.peek());
+ return idToParent;
+ }
+
+ @Override
+ public Map<Integer, Integer> map(Types.MapType map, Map<Integer, Integer> key, Map<Integer, Integer> value) {
+ idToParent.put(map.keyId(), idStack.peek());
+ idToParent.put(map.valueId(), idStack.peek());
+ return idToParent;
+ }
+
+ @Override
+ public Map<Integer, Integer> primitive(Type.PrimitiveType primitive) {
+ return idToParent;
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
index 82c23f1..58db0aa 100644
--- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
+++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.types;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -95,6 +96,10 @@ public class TypeUtil {
return visit(struct, new IndexById());
}
+ public static Map<Integer, Integer> indexParents(Types.StructType struct) {
+ return ImmutableMap.copyOf(visit(struct, new IndexParents()));
+ }
+
/**
* Assigns fresh ids from the {@link NextID nextId function} for all fields in a type.
*
diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
index a1ef08c..1700107 100644
--- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
@@ -20,11 +20,13 @@
package org.apache.iceberg;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import java.util.Collection;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -47,18 +49,23 @@ class SchemaUpdate implements UpdateSchema {
private final TableOperations ops;
private final TableMetadata base;
private final Schema schema;
+ private final Map<Integer, Integer> idToParent;
private final List<Integer> deletes = Lists.newArrayList();
private final Map<Integer, Types.NestedField> updates = Maps.newHashMap();
private final Multimap<Integer, Types.NestedField> adds =
Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
+ private final Map<String, Integer> addedNameToId = Maps.newHashMap();
+ private final Multimap<Integer, Move> moves = Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
private int lastColumnId;
private boolean allowIncompatibleChanges = false;
+
SchemaUpdate(TableOperations ops) {
this.ops = ops;
this.base = ops.current();
this.schema = base.schema();
this.lastColumnId = base.lastColumnId();
+ this.idToParent = Maps.newHashMap(TypeUtil.indexParents(schema.asStruct()));
}
/**
@@ -69,6 +76,7 @@ class SchemaUpdate implements UpdateSchema {
this.base = null;
this.schema = schema;
this.lastColumnId = lastColumnId;
+ this.idToParent = Maps.newHashMap(TypeUtil.indexParents(schema.asStruct()));
}
@Override
@@ -108,6 +116,7 @@ class SchemaUpdate implements UpdateSchema {
private void internalAddColumn(String parent, String name, boolean isOptional, Type type, String doc) {
int parentId = TABLE_ROOT_ID;
+ String fullName;
if (parent != null) {
Types.NestedField parentField = schema.findField(parent);
Preconditions.checkArgument(parentField != null, "Cannot find parent struct: %s", parent);
@@ -130,13 +139,22 @@ class SchemaUpdate implements UpdateSchema {
"Cannot add to a column that will be deleted: %s", parent);
Preconditions.checkArgument(schema.findField(parent + "." + name) == null,
"Cannot add column, name already exists: %s.%s", parent, name);
+ fullName = schema.findColumnName(parentId) + "." + name;
} else {
Preconditions.checkArgument(schema.findField(name) == null,
"Cannot add column, name already exists: %s", name);
+ fullName = name;
}
// assign new IDs in order
int newId = assignNewColumnId();
+
+ // update tracking for moves
+ addedNameToId.put(fullName, newId);
+ if (parentId != TABLE_ROOT_ID) {
+ idToParent.put(newId, parentId);
+ }
+
adds.put(parentId, Types.NestedField.of(newId, isOptional, name,
TypeUtil.assignFreshIds(type, this::assignNewColumnId), doc));
}
@@ -260,6 +278,69 @@ class SchemaUpdate implements UpdateSchema {
return this;
}
+ @Override
+ public UpdateSchema moveFirst(String name) {
+ Integer fieldId = findForMove(name);
+ Preconditions.checkArgument(fieldId != null, "Cannot move missing column: %s", name);
+ internalMove(name, Move.first(fieldId));
+ return this;
+ }
+
+ @Override
+ public UpdateSchema moveBefore(String name, String beforeName) {
+ Integer fieldId = findForMove(name);
+ Preconditions.checkArgument(fieldId != null, "Cannot move missing column: %s", name);
+ Integer beforeId = findForMove(beforeName);
+ Preconditions.checkArgument(beforeId != null, "Cannot move %s before missing column: %s", name, beforeName);
+ Preconditions.checkArgument(!fieldId.equals(beforeId), "Cannot move %s before itself", name);
+ internalMove(name, Move.before(fieldId, beforeId));
+ return this;
+ }
+
+ @Override
+ public UpdateSchema moveAfter(String name, String afterName) {
+ Integer fieldId = findForMove(name);
+ Preconditions.checkArgument(fieldId != null, "Cannot move missing column: %s", name);
+ Integer afterId = findForMove(afterName);
+ Preconditions.checkArgument(afterId != null, "Cannot move %s after missing column: %s", name, afterName);
+ Preconditions.checkArgument(!fieldId.equals(afterId), "Cannot move %s after itself", name);
+ internalMove(name, Move.after(fieldId, afterId));
+ return this;
+ }
+
+ private Integer findForMove(String name) {
+ Types.NestedField field = schema.findField(name);
+ if (field != null) {
+ return field.fieldId();
+ }
+ return addedNameToId.get(name);
+ }
+
+ private void internalMove(String name, Move move) {
+ Integer parentId = idToParent.get(move.fieldId());
+ if (parentId != null) {
+ Types.NestedField parent = schema.findField(parentId);
+ Preconditions.checkArgument(parent.type().isStructType(),
+ "Cannot move fields in non-struct type: %s", parent.type());
+
+ if (move.type() == Move.MoveType.AFTER || move.type() == Move.MoveType.BEFORE) {
+ Preconditions.checkArgument(
+ parentId.equals(idToParent.get(move.referenceFieldId())),
+ "Cannot move field %s to a different struct", name);
+ }
+
+ moves.put(parentId, move);
+ } else {
+ if (move.type() == Move.MoveType.AFTER || move.type() == Move.MoveType.BEFORE) {
+ Preconditions.checkArgument(
+ idToParent.get(move.referenceFieldId()) == null,
+ "Cannot move field %s to a different struct", name);
+ }
+
+ moves.put(TABLE_ROOT_ID, move);
+ }
+ }
+
/**
* Apply the pending changes to the original schema and returns the result.
* <p>
@@ -269,7 +350,7 @@ class SchemaUpdate implements UpdateSchema {
*/
@Override
public Schema apply() {
- return applyChanges(schema, deletes, updates, adds);
+ return applyChanges(schema, deletes, updates, adds, moves);
}
@Override
@@ -310,9 +391,10 @@ class SchemaUpdate implements UpdateSchema {
private static Schema applyChanges(Schema schema, List<Integer> deletes,
Map<Integer, Types.NestedField> updates,
- Multimap<Integer, Types.NestedField> adds) {
+ Multimap<Integer, Types.NestedField> adds,
+ Multimap<Integer, Move> moves) {
Types.StructType struct = TypeUtil
- .visit(schema, new ApplyChanges(deletes, updates, adds))
+ .visit(schema, new ApplyChanges(deletes, updates, adds, moves))
.asNestedType().asStructType();
return new Schema(struct.fields());
}
@@ -321,20 +403,25 @@ class SchemaUpdate implements UpdateSchema {
private final List<Integer> deletes;
private final Map<Integer, Types.NestedField> updates;
private final Multimap<Integer, Types.NestedField> adds;
+ private final Multimap<Integer, Move> moves;
private ApplyChanges(List<Integer> deletes,
- Map<Integer, Types.NestedField> updates,
- Multimap<Integer, Types.NestedField> adds) {
+ Map<Integer, Types.NestedField> updates,
+ Multimap<Integer, Types.NestedField> adds,
+ Multimap<Integer, Move> moves) {
this.deletes = deletes;
this.updates = updates;
this.adds = adds;
+ this.moves = moves;
}
@Override
public Type schema(Schema schema, Type structResult) {
- Collection<Types.NestedField> newColumns = adds.get(TABLE_ROOT_ID);
- if (newColumns != null) {
- return addFields(structResult.asNestedType().asStructType(), newColumns);
+ List<Types.NestedField> fields = addAndMoveFields(structResult.asStructType().fields(),
+ adds.get(TABLE_ROOT_ID), moves.get(TABLE_ROOT_ID));
+
+ if (fields != null) {
+ return Types.StructType.of(fields);
}
return structResult;
@@ -399,8 +486,14 @@ class SchemaUpdate implements UpdateSchema {
// handle adds
Collection<Types.NestedField> newFields = adds.get(fieldId);
- if (newFields != null && !newFields.isEmpty()) {
- return addFields(fieldResult.asNestedType().asStructType(), newFields);
+ Collection<Move> columnsToMove = moves.get(fieldId);
+ if (!newFields.isEmpty() || !columnsToMove.isEmpty()) {
+ // if either collection is non-null, then this must be a struct type. try to apply the changes
+ List<Types.NestedField> fields = addAndMoveFields(
+ fieldResult.asStructType().fields(), newFields, columnsToMove);
+ if (fields != null) {
+ return Types.StructType.of(fields);
+ }
}
return fieldResult;
@@ -470,10 +563,106 @@ class SchemaUpdate implements UpdateSchema {
}
}
- private static Types.StructType addFields(Types.StructType struct,
- Collection<Types.NestedField> adds) {
- List<Types.NestedField> newFields = Lists.newArrayList(struct.fields());
+ private static List<Types.NestedField> addAndMoveFields(List<Types.NestedField> fields,
+ Collection<Types.NestedField> adds,
+ Collection<Move> moves) {
+ if (adds != null && !adds.isEmpty()) {
+ if (moves != null && !moves.isEmpty()) {
+ // always apply adds first so that added fields can be moved
+ return moveFields(addFields(fields, adds), moves);
+ } else {
+ return addFields(fields, adds);
+ }
+ } else if (moves != null && !moves.isEmpty()) {
+ return moveFields(fields, moves);
+ }
+ return null;
+ }
+
+ private static List<Types.NestedField> addFields(List<Types.NestedField> fields,
+ Collection<Types.NestedField> adds) {
+ List<Types.NestedField> newFields = Lists.newArrayList(fields);
newFields.addAll(adds);
- return Types.StructType.of(newFields);
+ return newFields;
+ }
+
+ @SuppressWarnings("checkstyle:IllegalType")
+ private static List<Types.NestedField> moveFields(List<Types.NestedField> fields,
+ Collection<Move> moves) {
+ LinkedList<Types.NestedField> reordered = Lists.newLinkedList(fields);
+
+ for (Move move : moves) {
+ Types.NestedField toMove = Iterables.find(reordered, field -> field.fieldId() == move.fieldId());
+ reordered.remove(toMove);
+
+ switch (move.type()) {
+ case FIRST:
+ reordered.addFirst(toMove);
+ break;
+
+ case BEFORE:
+ Types.NestedField before = Iterables.find(reordered, field -> field.fieldId() == move.referenceFieldId());
+ int beforeIndex = reordered.indexOf(before);
+ // insert the new node at the index of the existing node
+ reordered.add(beforeIndex, toMove);
+ break;
+
+ case AFTER:
+ Types.NestedField after = Iterables.find(reordered, field -> field.fieldId() == move.referenceFieldId());
+ int afterIndex = reordered.indexOf(after);
+ reordered.add(afterIndex + 1, toMove);
+ break;
+
+ default:
+ throw new UnsupportedOperationException("Unknown move type: " + move.type());
+ }
+ }
+
+ return reordered;
+ }
+
+ /**
+ * Represents a requested column move in a struct.
+ */
+ private static class Move {
+ private enum MoveType {
+ FIRST,
+ BEFORE,
+ AFTER
+ }
+
+ static Move first(int fieldId) {
+ return new Move(fieldId, -1, MoveType.FIRST);
+ }
+
+ static Move before(int fieldId, int referenceFieldId) {
+ return new Move(fieldId, referenceFieldId, MoveType.BEFORE);
+ }
+
+ static Move after(int fieldId, int referenceFieldId) {
+ return new Move(fieldId, referenceFieldId, MoveType.AFTER);
+ }
+
+ private final int fieldId;
+ private final int referenceFieldId;
+ private final MoveType type;
+
+ private Move(int fieldId, int referenceFieldId, MoveType type) {
+ this.fieldId = fieldId;
+ this.referenceFieldId = referenceFieldId;
+ this.type = type;
+ }
+
+ public int fieldId() {
+ return fieldId;
+ }
+
+ public int referenceFieldId() {
+ return referenceFieldId;
+ }
+
+ public MoveType type() {
+ return type;
+ }
}
}
diff --git a/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java b/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java
index 0875ab6..ad3afdf 100644
--- a/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java
+++ b/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java
@@ -659,4 +659,513 @@ public class TestSchemaUpdate {
}
);
}
+
+ @Test
+ public void testMultipleMoves() {
+ Schema schema = new Schema(
+ required(1, "a", Types.IntegerType.get()),
+ required(2, "b", Types.IntegerType.get()),
+ required(3, "c", Types.IntegerType.get()),
+ required(4, "d", Types.IntegerType.get()));
+
+ Schema expected = new Schema(
+ required(3, "c", Types.IntegerType.get()),
+ required(2, "b", Types.IntegerType.get()),
+ required(4, "d", Types.IntegerType.get()),
+ required(1, "a", Types.IntegerType.get()));
+
+ // moves are applied in order
+ Schema actual = new SchemaUpdate(schema, 4)
+ .moveFirst("d")
+ .moveFirst("c")
+ .moveAfter("b", "d")
+ .moveBefore("d", "a")
+ .apply();
+
+ Assert.assertEquals("Schema should match", expected.asStruct(), actual.asStruct());
+ }
+
+ @Test
+ public void testMoveTopLevelColumnFirst() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "data", Types.StringType.get()));
+ Schema expected = new Schema(
+ required(2, "data", Types.StringType.get()),
+ required(1, "id", Types.LongType.get()));
+
+ Schema actual = new SchemaUpdate(schema, 2)
+ .moveFirst("data")
+ .apply();
+
+ Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+ }
+
+ @Test
+ public void testMoveTopLevelColumnBeforeFirst() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "data", Types.StringType.get()));
+ Schema expected = new Schema(
+ required(2, "data", Types.StringType.get()),
+ required(1, "id", Types.LongType.get()));
+
+ Schema actual = new SchemaUpdate(schema, 2)
+ .moveBefore("data", "id")
+ .apply();
+
+ Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+ }
+
+ @Test
+ public void testMoveTopLevelColumnAfterLast() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "data", Types.StringType.get()));
+ Schema expected = new Schema(
+ required(2, "data", Types.StringType.get()),
+ required(1, "id", Types.LongType.get()));
+
+ Schema actual = new SchemaUpdate(schema, 2)
+ .moveAfter("id", "data")
+ .apply();
+
+ Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+ }
+
+ @Test
+ public void testMoveTopLevelColumnAfter() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "data", Types.StringType.get()),
+ optional(3, "ts", Types.TimestampType.withZone()));
+ Schema expected = new Schema(
+ required(1, "id", Types.LongType.get()),
+ optional(3, "ts", Types.TimestampType.withZone()),
+ required(2, "data", Types.StringType.get()));
+
+ Schema actual = new SchemaUpdate(schema, 3)
+ .moveAfter("ts", "id")
+ .apply();
+
+ Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+ }
+
+ @Test
+ public void testMoveTopLevelColumnBefore() {
+ Schema schema = new Schema(
+ optional(3, "ts", Types.TimestampType.withZone()),
+ required(1, "id", Types.LongType.get()),
+ required(2, "data", Types.StringType.get()));
+ Schema expected = new Schema(
+ required(1, "id", Types.LongType.get()),
+ optional(3, "ts", Types.TimestampType.withZone()),
+ required(2, "data", Types.StringType.get()));
+
+ Schema actual = new SchemaUpdate(schema, 3)
+ .moveBefore("ts", "data")
+ .apply();
+
+ Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+ }
+
+ @Test
+ public void testMoveNestedFieldFirst() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "struct", Types.StructType.of(
+ required(3, "count", Types.LongType.get()),
+ required(4, "data", Types.StringType.get()))));
+ Schema expected = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "struct", Types.StructType.of(
+ required(4, "data", Types.StringType.get()),
+ required(3, "count", Types.LongType.get()))));
+
+ Schema actual = new SchemaUpdate(schema, 4)
+ .moveFirst("struct.data")
+ .apply();
+
+ Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+ }
+
+ @Test
+ public void testMoveNestedFieldBeforeFirst() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "struct", Types.StructType.of(
+ required(3, "count", Types.LongType.get()),
+ required(4, "data", Types.StringType.get()))));
+ Schema expected = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "struct", Types.StructType.of(
+ required(4, "data", Types.StringType.get()),
+ required(3, "count", Types.LongType.get()))));
+
+ Schema actual = new SchemaUpdate(schema, 4)
+ .moveBefore("struct.data", "struct.count")
+ .apply();
+
+ Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+ }
+
+ @Test
+ public void testMoveNestedFieldAfterLast() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "struct", Types.StructType.of(
+ required(3, "count", Types.LongType.get()),
+ required(4, "data", Types.StringType.get()))));
+ Schema expected = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "struct", Types.StructType.of(
+ required(4, "data", Types.StringType.get()),
+ required(3, "count", Types.LongType.get()))));
+
+ Schema actual = new SchemaUpdate(schema, 4)
+ .moveAfter("struct.count", "struct.data")
+ .apply();
+
+ Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+ }
+
+ @Test
+ public void testMoveNestedFieldAfter() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "struct", Types.StructType.of(
+ required(3, "count", Types.LongType.get()),
+ required(4, "data", Types.StringType.get()),
+ optional(5, "ts", Types.TimestampType.withZone()))));
+ Schema expected = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "struct", Types.StructType.of(
+ required(3, "count", Types.LongType.get()),
+ optional(5, "ts", Types.TimestampType.withZone()),
+ required(4, "data", Types.StringType.get()))));
+
+ Schema actual = new SchemaUpdate(schema, 5)
+ .moveAfter("struct.ts", "struct.count")
+ .apply();
+
+ Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+ }
+
+ @Test
+ public void testMoveNestedFieldBefore() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "struct", Types.StructType.of(
+ optional(5, "ts", Types.TimestampType.withZone()),
+ required(3, "count", Types.LongType.get()),
+ required(4, "data", Types.StringType.get()))));
+ Schema expected = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "struct", Types.StructType.of(
+ required(3, "count", Types.LongType.get()),
+ optional(5, "ts", Types.TimestampType.withZone()),
+ required(4, "data", Types.StringType.get()))));
+
+ Schema actual = new SchemaUpdate(schema, 5)
+ .moveBefore("struct.ts", "struct.data")
+ .apply();
+
+ Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+ }
+
+ @Test
+ public void testMoveListElementField() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "list", Types.ListType.ofOptional(6, Types.StructType.of(
+ optional(5, "ts", Types.TimestampType.withZone()),
+ required(3, "count", Types.LongType.get()),
+ required(4, "data", Types.StringType.get())))));
+ Schema expected = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "list", Types.ListType.ofOptional(6, Types.StructType.of(
+ required(3, "count", Types.LongType.get()),
+ optional(5, "ts", Types.TimestampType.withZone()),
+ required(4, "data", Types.StringType.get())))));
+
+ Schema actual = new SchemaUpdate(schema, 6)
+ .moveBefore("list.ts", "list.data")
+ .apply();
+
+ Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+ }
+
+ @Test
+ public void testMoveMapValueStructField() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "map", Types.MapType.ofOptional(6, 7, Types.StringType.get(), Types.StructType.of(
+ optional(5, "ts", Types.TimestampType.withZone()),
+ required(3, "count", Types.LongType.get()),
+ required(4, "data", Types.StringType.get())))));
+ Schema expected = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "map", Types.MapType.ofOptional(6, 7, Types.StringType.get(), Types.StructType.of(
+ required(3, "count", Types.LongType.get()),
+ optional(5, "ts", Types.TimestampType.withZone()),
+ required(4, "data", Types.StringType.get())))));
+
+ Schema actual = new SchemaUpdate(schema, 7)
+ .moveBefore("map.ts", "map.data")
+ .apply();
+
+ Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+ }
+
+ @Test
+ public void testMoveAddedTopLevelColumn() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "data", Types.StringType.get()));
+ Schema expected = new Schema(
+ required(1, "id", Types.LongType.get()),
+ optional(3, "ts", Types.TimestampType.withZone()),
+ required(2, "data", Types.StringType.get()));
+
+ Schema actual = new SchemaUpdate(schema, 2)
+ .addColumn("ts", Types.TimestampType.withZone())
+ .moveAfter("ts", "id")
+ .apply();
+
+ Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+ }
+
+ @Test
+ public void testMoveAddedTopLevelColumnAfterAddedColumn() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "data", Types.StringType.get()));
+ Schema expected = new Schema(
+ required(1, "id", Types.LongType.get()),
+ optional(3, "ts", Types.TimestampType.withZone()),
+ optional(4, "count", Types.LongType.get()),
+ required(2, "data", Types.StringType.get()));
+
+ Schema actual = new SchemaUpdate(schema, 2)
+ .addColumn("ts", Types.TimestampType.withZone())
+ .addColumn("count", Types.LongType.get())
+ .moveAfter("ts", "id")
+ .moveAfter("count", "ts")
+ .apply();
+
+ Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+ }
+
+ @Test
+ public void testMoveAddedNestedStructField() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "struct", Types.StructType.of(
+ required(3, "count", Types.LongType.get()),
+ required(4, "data", Types.StringType.get()))));
+ Schema expected = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "struct", Types.StructType.of(
+ optional(5, "ts", Types.TimestampType.withZone()),
+ required(3, "count", Types.LongType.get()),
+ required(4, "data", Types.StringType.get()))));
+
+ Schema actual = new SchemaUpdate(schema, 4)
+ .addColumn("struct", "ts", Types.TimestampType.withZone())
+ .moveBefore("struct.ts", "struct.count")
+ .apply();
+
+ Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+ }
+
+ @Test
+ public void testMoveAddedNestedStructFieldBeforeAddedColumn() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "struct", Types.StructType.of(
+ required(3, "count", Types.LongType.get()),
+ required(4, "data", Types.StringType.get()))));
+ Schema expected = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "struct", Types.StructType.of(
+ optional(6, "size", Types.LongType.get()),
+ optional(5, "ts", Types.TimestampType.withZone()),
+ required(3, "count", Types.LongType.get()),
+ required(4, "data", Types.StringType.get()))));
+
+ Schema actual = new SchemaUpdate(schema, 4)
+ .addColumn("struct", "ts", Types.TimestampType.withZone())
+ .addColumn("struct", "size", Types.LongType.get())
+ .moveBefore("struct.ts", "struct.count")
+ .moveBefore("struct.size", "struct.ts")
+ .apply();
+
+ Assert.assertEquals("Should move data first", expected.asStruct(), actual.asStruct());
+ }
+
+ @Test
+ public void testMoveSelfReferenceFails() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "data", Types.StringType.get()));
+
+ AssertHelpers.assertThrows("Should fail move for a field that is not in the schema",
+ IllegalArgumentException.class, "Cannot move id before itself", () ->
+ new SchemaUpdate(schema, 2)
+ .moveBefore("id", "id")
+ .apply());
+
+ AssertHelpers.assertThrows("Should fail move for a field that is not in the schema",
+ IllegalArgumentException.class, "Cannot move id after itself", () ->
+ new SchemaUpdate(schema, 2)
+ .moveAfter("id", "id")
+ .apply());
+ }
+
+ @Test
+ public void testMoveMissingColumnFails() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "data", Types.StringType.get()));
+
+ AssertHelpers.assertThrows("Should fail move for a field that is not in the schema",
+ IllegalArgumentException.class, "Cannot move missing column", () ->
+ new SchemaUpdate(schema, 2)
+ .moveFirst("items")
+ .apply());
+
+ AssertHelpers.assertThrows("Should fail move for a field that is not in the schema",
+ IllegalArgumentException.class, "Cannot move missing column", () ->
+ new SchemaUpdate(schema, 2)
+ .moveBefore("items", "id")
+ .apply());
+
+ AssertHelpers.assertThrows("Should fail move for a field that is not in the schema",
+ IllegalArgumentException.class, "Cannot move missing column", () ->
+ new SchemaUpdate(schema, 2)
+ .moveAfter("items", "data")
+ .apply());
+ }
+
+ @Test
+ public void testMoveBeforeAddFails() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "data", Types.StringType.get()));
+
+ AssertHelpers.assertThrows("Should fail move for a field that has not been added yet",
+ IllegalArgumentException.class, "Cannot move missing column", () ->
+ new SchemaUpdate(schema, 2)
+ .moveFirst("ts")
+ .addColumn("ts", Types.TimestampType.withZone())
+ .apply());
+
+ AssertHelpers.assertThrows("Should fail move for a field that has not been added yet",
+ IllegalArgumentException.class, "Cannot move missing column", () ->
+ new SchemaUpdate(schema, 2)
+ .moveBefore("ts", "id")
+ .addColumn("ts", Types.TimestampType.withZone())
+ .apply());
+
+ AssertHelpers.assertThrows("Should fail move for a field that has not been added yet",
+ IllegalArgumentException.class, "Cannot move missing column", () ->
+ new SchemaUpdate(schema, 2)
+ .moveAfter("ts", "data")
+ .addColumn("ts", Types.TimestampType.withZone())
+ .apply());
+ }
+
+ @Test
+ public void testMoveMissingReferenceColumnFails() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "data", Types.StringType.get()));
+
+ AssertHelpers.assertThrows("Should fail move before a field that is not in the schema",
+ IllegalArgumentException.class, "Cannot move id before missing column", () ->
+ new SchemaUpdate(schema, 2)
+ .moveBefore("id", "items")
+ .apply());
+
+ AssertHelpers.assertThrows("Should fail move after for a field that is not in the schema",
+ IllegalArgumentException.class, "Cannot move data after missing column", () ->
+ new SchemaUpdate(schema, 2)
+ .moveAfter("data", "items")
+ .apply());
+ }
+
+ @Test
+ public void testMovePrimitiveMapKeyFails() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "data", Types.StringType.get()),
+ optional(3, "map", Types.MapType.ofRequired(4, 5, Types.StringType.get(), Types.StringType.get())));
+
+ AssertHelpers.assertThrows("Should fail move for map key",
+ IllegalArgumentException.class, "Cannot move fields in non-struct type", () ->
+ new SchemaUpdate(schema, 5)
+ .moveBefore("map.key", "map.value")
+ .apply());
+ }
+
+ @Test
+ public void testMovePrimitiveMapValueFails() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "data", Types.StringType.get()),
+ optional(3, "map", Types.MapType.ofRequired(4, 5, Types.StringType.get(), Types.StructType.of())));
+
+ AssertHelpers.assertThrows("Should fail move for map value",
+ IllegalArgumentException.class, "Cannot move fields in non-struct type", () ->
+ new SchemaUpdate(schema, 5)
+ .moveBefore("map.value", "map.key")
+ .apply());
+ }
+
+ @Test
+ public void testMovePrimitiveListElementFails() {
+ Schema schema = new Schema(
+ required(1, "id", Types.LongType.get()),
+ required(2, "data", Types.StringType.get()),
+ optional(3, "list", Types.ListType.ofRequired(4, Types.StringType.get())));
+
+ AssertHelpers.assertThrows("Should fail move for list element",
+ IllegalArgumentException.class, "Cannot move fields in non-struct type", () ->
+ new SchemaUpdate(schema, 4)
+ .moveBefore("list.element", "list")
+ .apply());
+ }
+
+ @Test
+ public void testMoveTopLevelBetweenStructsFails() {
+ Schema schema = new Schema(
+ required(1, "a", Types.IntegerType.get()),
+ required(2, "b", Types.IntegerType.get()),
+ required(3, "struct", Types.StructType.of(
+ required(4, "x", Types.IntegerType.get()),
+ required(5, "y", Types.IntegerType.get()))));
+
+ AssertHelpers.assertThrows("Should fail move between separate structs",
+ IllegalArgumentException.class, "Cannot move field a to a different struct", () ->
+ new SchemaUpdate(schema, 5)
+ .moveBefore("a", "struct.x")
+ .apply());
+ }
+
+ @Test
+ public void testMoveBetweenStructsFails() {
+ Schema schema = new Schema(
+ required(1, "s1", Types.StructType.of(
+ required(3, "a", Types.IntegerType.get()),
+ required(4, "b", Types.IntegerType.get()))),
+ required(2, "s2", Types.StructType.of(
+ required(5, "x", Types.IntegerType.get()),
+ required(6, "y", Types.IntegerType.get()))));
+
+ AssertHelpers.assertThrows("Should fail move between separate structs",
+ IllegalArgumentException.class, "Cannot move field s2.x to a different struct", () ->
+ new SchemaUpdate(schema, 6)
+ .moveBefore("s2.x", "s1.a")
+ .apply());
+ }
}