You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/08 05:50:32 UTC

[GitHub] [iceberg] rdsr commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

rdsr commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r451298044



##########
File path: core/src/test/java/org/apache/iceberg/TestSchemaUpdateSync.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.types.Type.PrimitiveType;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.BinaryType;
+import org.apache.iceberg.types.Types.BooleanType;
+import org.apache.iceberg.types.Types.DateType;
+import org.apache.iceberg.types.Types.DecimalType;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.FixedType;
+import org.apache.iceberg.types.Types.FloatType;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.TimeType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.types.Types.UUIDType;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestSchemaUpdateSync {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private static List<? extends PrimitiveType> primitiveTypes() {
+    return Lists.newArrayList(StringType.get(),
+        TimeType.get(),
+        TimestampType.withoutZone(),
+        TimestampType.withZone(),
+        UUIDType.get(),
+        DateType.get(),
+        BooleanType.get(),
+        BinaryType.get(),
+        DoubleType.get(),
+        IntegerType.get(),
+        FixedType.ofLength(10),
+        DecimalType.of(10, 2),
+        LongType.get(),
+        FloatType.get()
+    );
+  }
+
+  private static NestedField[] primitiveFields(Integer initialValue, List<? extends PrimitiveType> primitiveTypes) {
+    AtomicInteger i = new AtomicInteger(initialValue);
+    return primitiveTypes.stream()
+        .map(type -> optional(i.incrementAndGet(), type.toString(),
+            Types.fromPrimitiveString(type.toString()))).toArray(NestedField[]::new);
+  }
+
+  @Test
+  public void testAddTopLevelPrimitives() {
+    Schema newSchema = new Schema(primitiveFields(0, primitiveTypes()));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddTopLevelListOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aList", Types.ListType.ofOptional(2, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelMapOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aMap", Types.MapType.ofOptional(2, 3, primitiveType, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelStructOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(currentSchema).apply();
+      Assert.assertEquals(currentSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitive() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+      Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitives() {
+    Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+    Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+        primitiveFields(1, primitiveTypes()))));
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedLists() {
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2,
+            Types.ListType.ofOptional(3,
+                Types.ListType.ofOptional(4,
+                    Types.ListType.ofOptional(5,
+                        Types.ListType.ofOptional(6,
+                            Types.ListType.ofOptional(7,
+                                Types.ListType.ofOptional(8,
+                                    Types.ListType.ofOptional(9,
+                                        Types.ListType.ofOptional(10,
+                                            DecimalType.of(11, 20))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedStruct() {
+    Schema newSchema = new Schema(optional(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "struct3", Types.StructType.of(
+                    optional(4, "struct4", Types.StructType.of(
+                        optional(5, "struct5", Types.StructType.of(
+                            optional(6, "struct6", Types.StructType.of(
+                                optional(7, "aString", StringType.get()))))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedMaps() {
+    Schema newSchema = new Schema(optional(1, "struct", Types.MapType.ofOptional(
+        2, 3, StringType.get(), Types.MapType.ofOptional(
+            4, 5, StringType.get(), Types.MapType.ofOptional(
+                6, 7, StringType.get(), Types.MapType.ofOptional(
+                    8 ,9, StringType.get(), Types.MapType.ofOptional(
+                        10 ,11, StringType.get(), Types.MapType.ofOptional(
+                            12, 13, StringType.get(), StringType.get()))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelList() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aList.element: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, LongType.get())));
+    new SchemaUpdate(currentSchema, 2).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapValue() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.value: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), LongType.get())));
+    Schema apply = new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+    System.out.println(apply.toString());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapKey() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.key: string -> uuid");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , UUIDType.get(), StringType.get())));
+    new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // int	32-bit signed integers -> Can promote to long
+  public void testTypePromoteIntegerToLong() {
+    Schema currentSchema = new Schema(required(1, "aCol", IntegerType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", LongType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(LongType.get(), applied.asStruct().fields().get(0).type());
+  }
+
+  @Test
+  // float	32-bit IEEE 754 floating point -> Can promote to double
+  public void testTypePromoteFloatToDouble() {
+    Schema currentSchema = new Schema(required(1, "aCol", FloatType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", DoubleType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(DoubleType.get(), applied.asStruct().fields().get(0).type());
+    // Doesn't work Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    // java.lang.AssertionError:
+    // Expected :struct<1: aCol: required double>
+    // Actual   :struct<1: aCol: required double ()>
+  }
+
+  @Test
+  public void testInvalidTypePromoteDoubleToFloat() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aCol: double -> float");
+
+    Schema currentSchema = new Schema(required(1, "aCol", DoubleType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", FloatType.get()));
+
+    new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // decimal(P,S)	Fixed-point decimal; precision P, scale S	-> Scale is fixed [1], precision must be 38 or less
+  public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() {
+    Schema currentSchema = new Schema(required(1, "aCol", DecimalType.of(20, 1)));
+    Schema newSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1)));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddPrimitiveToNestedStruct() {
+    Schema schema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+          optional(2, "struct2", Types.StructType.of(
+                  optional(3, "list", Types.ListType.ofOptional(
+                      4, Types.StructType.of(
+                          optional(5, "value", StringType.get())))))))));
+
+    Schema newSchema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "list", Types.ListType.ofOptional(
+                    4, Types.StructType.of(
+                        optional(5, "time", TimeType.get())))))))));

Review comment:
       seems like "value" is missing from newSchema.  I think deletes are not handled that's why. We should consider adding deletes and renames too

##########
File path: core/src/main/java/org/apache/iceberg/SchemaUpdate.java
##########
@@ -308,6 +314,159 @@ public UpdateSchema moveAfter(String name, String afterName) {
     return this;
   }
 
+  @Override
+  public UpdateSchema upsertSchema(Schema newSchema) {
+    TypeUtil.visit(newSchema, new ApplyUpdates(this, schema));
+    return this;
+  }
+
+  private static class ApplyUpdates extends TypeUtil.SchemaVisitor<Void> {
+    private static final Joiner DOT = Joiner.on(".");
+    private final Deque<String> fieldNames = Lists.newLinkedList();
+    private NestedField currentField = null;
+
+    private final Schema baseSchema;
+    private final UpdateSchema api;
+    private final Map<String, Integer> indexByName;
+
+    private ApplyUpdates(UpdateSchema api, Schema baseSchema) {
+      this.api = api;
+      this.baseSchema = baseSchema;
+      this.indexByName = TypeUtil.indexByName(baseSchema.asStruct());
+    }
+
+    @Override
+    public void beforeListElement(NestedField elementField) {
+      beforeField(elementField);
+    }
+
+    @Override
+    public void afterListElement(NestedField elementField) {
+      afterField(elementField);
+    }
+
+    @Override
+    public void beforeMapKey(Types.NestedField keyField) {
+      beforeField(keyField);
+    }
+
+    @Override
+    public void afterMapKey(Types.NestedField keyField) {
+      afterField(keyField);
+    }
+
+    @Override
+    public void beforeMapValue(Types.NestedField valueField) {
+      beforeField(valueField);
+    }
+
+    @Override
+    public void afterMapValue(Types.NestedField valueField) {
+      afterField(valueField);
+    }
+
+    @Override
+    public void beforeField(NestedField field) {
+      fieldNames.push(field.name()); // we don't expect `element` to show up - it breaks
+      currentField = field;
+    }
+
+    @Override
+    public void afterField(NestedField field) {
+      fieldNames.pop();
+    }
+
+    @Override
+    public Void field(NestedField field, Void fieldResult) {
+      return super.field(field, fieldResult);
+    }
+
+    @Override
+    public Void list(ListType list, Void elementResult) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), Types.ListType.ofOptional(0,   list.elementType()), ancestors());
+      } else if (!field.type().isListType()) {
+        throw new IllegalArgumentException(
+            "Cannot update existing field: " + fullName + " of type: " + field
+                .type() + " to type list");
+      }
+      return null;
+    }
+
+    @Override
+    public Void map(MapType map, Void keyResult, Void valueResult) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), Types.MapType.ofOptional(0, 1,   map.keyType(), map.valueType()), ancestors());
+      } else if (!field.type().isMapType()) {
+        throw new IllegalArgumentException(
+            "Cannot update existing field: " + fullName + " of type: " + field
+                .type() + " to type map");
+      }
+      return null;
+    }
+
+    @Override
+    public Void struct(Types.StructType struct, List<Void> fieldResults) {
+      if(fieldNames.isEmpty()) return null; // this is the root struct
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), Types.StructType.of(struct.fields()), ancestors());
+      } else if (!field.type().isStructType()) {
+        throw new IllegalArgumentException(
+            "Cannot update existing field: " + fullName + " of type: " + field.type()
+                + " to type struct");
+      }
+      return null;
+    }
+
+    @Override
+    public Void primitive(PrimitiveType primitive) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      PrimitiveType newFieldType = Types.fromPrimitiveString(primitive.toString());
+      if (field == null) {
+        addColumn(currentField.name(), Types.fromPrimitiveString(primitive.toString()), ancestors());
+      } else if (!field.type().isPrimitiveType()) {
+        throw new IllegalArgumentException(
+            "Cannot update existing field: " + field.name() + " of type: "
+                + field.type() + " to primitive type: " + primitive.typeId().name());
+      } else if (!newFieldType.equals(field.type())) {

Review comment:
       We may also need to handle renames and deletes. These can be detected if we do the updates [additions, type widening and renames and deletes] using field ids instead of names. This was one of the main comments by @rdblue  on #759 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org