You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/07 19:51:00 UTC

[GitHub] [iceberg] fbocse opened a new pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

fbocse opened a new pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177


   Continues on (most) the work done by @rdsr on https://github.com/apache/iceberg/pull/759


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#issuecomment-689023022


   Merged! Thanks for all your work, @fbocse and @rdsr!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#issuecomment-667414780


   @fbocse, the case I was thinking about was whether we want to handle promotion from required to optional if the observed schema doesn't contain a field, or if its field is optional.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
fbocse commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r451755790



##########
File path: core/src/test/java/org/apache/iceberg/TestSchemaUpdateSync.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.types.Type.PrimitiveType;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.BinaryType;
+import org.apache.iceberg.types.Types.BooleanType;
+import org.apache.iceberg.types.Types.DateType;
+import org.apache.iceberg.types.Types.DecimalType;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.FixedType;
+import org.apache.iceberg.types.Types.FloatType;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.TimeType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.types.Types.UUIDType;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestSchemaUpdateSync {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private static List<? extends PrimitiveType> primitiveTypes() {
+    return Lists.newArrayList(StringType.get(),
+        TimeType.get(),
+        TimestampType.withoutZone(),
+        TimestampType.withZone(),
+        UUIDType.get(),
+        DateType.get(),
+        BooleanType.get(),
+        BinaryType.get(),
+        DoubleType.get(),
+        IntegerType.get(),
+        FixedType.ofLength(10),
+        DecimalType.of(10, 2),
+        LongType.get(),
+        FloatType.get()
+    );
+  }
+
+  private static NestedField[] primitiveFields(Integer initialValue, List<? extends PrimitiveType> primitiveTypes) {
+    AtomicInteger i = new AtomicInteger(initialValue);
+    return primitiveTypes.stream()
+        .map(type -> optional(i.incrementAndGet(), type.toString(),
+            Types.fromPrimitiveString(type.toString()))).toArray(NestedField[]::new);
+  }
+
+  @Test
+  public void testAddTopLevelPrimitives() {
+    Schema newSchema = new Schema(primitiveFields(0, primitiveTypes()));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddTopLevelListOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aList", Types.ListType.ofOptional(2, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelMapOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aMap", Types.MapType.ofOptional(2, 3, primitiveType, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelStructOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(currentSchema).apply();
+      Assert.assertEquals(currentSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitive() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+      Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitives() {
+    Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+    Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+        primitiveFields(1, primitiveTypes()))));
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedLists() {
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2,
+            Types.ListType.ofOptional(3,
+                Types.ListType.ofOptional(4,
+                    Types.ListType.ofOptional(5,
+                        Types.ListType.ofOptional(6,
+                            Types.ListType.ofOptional(7,
+                                Types.ListType.ofOptional(8,
+                                    Types.ListType.ofOptional(9,
+                                        Types.ListType.ofOptional(10,
+                                            DecimalType.of(11, 20))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedStruct() {
+    Schema newSchema = new Schema(optional(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "struct3", Types.StructType.of(
+                    optional(4, "struct4", Types.StructType.of(
+                        optional(5, "struct5", Types.StructType.of(
+                            optional(6, "struct6", Types.StructType.of(
+                                optional(7, "aString", StringType.get()))))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedMaps() {
+    Schema newSchema = new Schema(optional(1, "struct", Types.MapType.ofOptional(
+        2, 3, StringType.get(), Types.MapType.ofOptional(
+            4, 5, StringType.get(), Types.MapType.ofOptional(
+                6, 7, StringType.get(), Types.MapType.ofOptional(
+                    8 ,9, StringType.get(), Types.MapType.ofOptional(
+                        10 ,11, StringType.get(), Types.MapType.ofOptional(
+                            12, 13, StringType.get(), StringType.get()))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelList() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aList.element: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, LongType.get())));
+    new SchemaUpdate(currentSchema, 2).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapValue() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.value: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), LongType.get())));
+    Schema apply = new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+    System.out.println(apply.toString());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapKey() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.key: string -> uuid");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , UUIDType.get(), StringType.get())));
+    new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // int	32-bit signed integers -> Can promote to long
+  public void testTypePromoteIntegerToLong() {
+    Schema currentSchema = new Schema(required(1, "aCol", IntegerType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", LongType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(LongType.get(), applied.asStruct().fields().get(0).type());
+  }
+
+  @Test
+  // float	32-bit IEEE 754 floating point -> Can promote to double
+  public void testTypePromoteFloatToDouble() {
+    Schema currentSchema = new Schema(required(1, "aCol", FloatType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", DoubleType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(DoubleType.get(), applied.asStruct().fields().get(0).type());
+    // Doesn't work Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    // java.lang.AssertionError:
+    // Expected :struct<1: aCol: required double>
+    // Actual   :struct<1: aCol: required double ()>
+  }
+
+  @Test
+  public void testInvalidTypePromoteDoubleToFloat() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aCol: double -> float");
+
+    Schema currentSchema = new Schema(required(1, "aCol", DoubleType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", FloatType.get()));
+
+    new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // decimal(P,S)	Fixed-point decimal; precision P, scale S	-> Scale is fixed [1], precision must be 38 or less
+  public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() {
+    Schema currentSchema = new Schema(required(1, "aCol", DecimalType.of(20, 1)));
+    Schema newSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1)));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddPrimitiveToNestedStruct() {
+    Schema schema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+          optional(2, "struct2", Types.StructType.of(
+                  optional(3, "list", Types.ListType.ofOptional(
+                      4, Types.StructType.of(
+                          optional(5, "value", StringType.get())))))))));
+
+    Schema newSchema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "list", Types.ListType.ofOptional(
+                    4, Types.StructType.of(
+                        optional(5, "time", TimeType.get())))))))));

Review comment:
       Right, nor should it, the main use-case for this API is to accommodate `schema unions` as @rdblue's summed it up during our chat  in the Iceberg sync-up today.
   The use-case is as follows:
   
   1.  think a dataset with a large schema - call it `effective schema` 
   2. we get data that only contains particular columns - those columns make up only for a subset of the large schema - call it `observed schema` 
   3. as these chunks of data and their respective observed schemas are being landed in the iceberg table we want to keep the table schema on-par with the union of these observed schemas
   
   So in this particular `expected` is the union of `schema` and `newSchema`. 
   Does this make sense? 
   Does this resemble your use-case as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
fbocse commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r482068443



##########
File path: core/src/main/java/org/apache/iceberg/schema/UnionByNameVisitor.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.schema;
+
+import java.util.List;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Visitor class that accumulates the set of changes needed to evolve an existing schema into the union of the
+ * existing and a new schema. Changes are added to an {@link UpdateSchema} operation.
+ */
+public class UnionByNameVisitor extends SchemaWithPartnerVisitor<Integer, Boolean> {
+
+  private final UpdateSchema api;
+  private final Schema partnerSchema;
+
+  private UnionByNameVisitor(UpdateSchema api, Schema partnerSchema) {
+    this.api = api;
+    this.partnerSchema = partnerSchema;
+  }
+
+  /**
+   * Adds changes needed to produce a union of two schemas to an {@link UpdateSchema} operation.
+   * <p>
+   * Changes are accumulated to evolve the existingSchema into a union with newSchema.
+   *
+   * @param api an UpdateSchema for adding changes
+   * @param existingSchema an existing schema
+   * @param newSchema a new schema to compare with the existing
+   */
+  public static void visit(UpdateSchema api, Schema existingSchema, Schema newSchema) {
+    visit(newSchema, -1, new UnionByNameVisitor(api, existingSchema), new PartnerIdByNameAccessors(existingSchema));
+  }
+
+  @Override
+  public Boolean struct(Types.StructType struct, Integer partnerId, List<Boolean> missingPositions) {
+    if (partnerId == null) {
+      return true;
+    }
+
+    List<Types.NestedField> fields = struct.fields();
+    Types.StructType partnerStruct = findFieldType(partnerId).asStructType();
+    IntStream.range(0, missingPositions.size())
+        .forEach(pos -> {
+          Boolean isMissing = missingPositions.get(pos);
+          Types.NestedField field = fields.get(pos);
+          if (isMissing) {
+            addColumn(partnerId, field);
+          } else {
+            updateColumn(field, partnerStruct.field(field.name()));
+          }
+        });
+
+    return false;
+  }
+
+  @Override
+  public Boolean field(Types.NestedField field, Integer partnerId, Boolean isFieldMissing) {
+    return partnerId == null;
+  }
+
+  @Override
+  public Boolean list(Types.ListType list, Integer partnerId, Boolean isElementMissing) {
+    if (partnerId == null) {
+      return true;
+    }
+
+    Preconditions.checkState(!isElementMissing, "Error traversing schemas: element is missing, but list is present");
+
+    Types.ListType partnerList = findFieldType(partnerId).asListType();
+    updateColumn(list.fields().get(0), partnerList.fields().get(0));
+
+    return false;
+  }
+
+  @Override
+  public Boolean map(Types.MapType map, Integer partnerId, Boolean isKeyMissing, Boolean isValueMissing) {
+    if (partnerId == null) {
+      return true;
+    }
+
+    Preconditions.checkState(!isKeyMissing, "Error traversing schemas: key is missing, but map is present");
+    Preconditions.checkState(!isValueMissing, "Error traversing schemas: value is missing, but map is present");
+
+    Types.MapType partnerMap = findFieldType(partnerId).asMapType();
+    updateColumn(map.fields().get(0), partnerMap.fields().get(0));
+    updateColumn(map.fields().get(1), partnerMap.fields().get(1));
+
+    return false;
+  }
+
+  @Override
+  public Boolean primitive(Type.PrimitiveType primitive, Integer partnerId) {
+    return partnerId == null;
+  }
+
+  private Type findFieldType(int fieldId) {
+    if (fieldId == -1) {
+      return partnerSchema.asStruct();
+    } else {
+      return partnerSchema.findField(fieldId).type();
+    }
+  }
+
+  private void addColumn(int parentId, Types.NestedField field) {
+    String parentName = partnerSchema.findColumnName(parentId);
+    api.addColumn(parentName, field.name(), field.type(), field.doc());
+  }
+
+  private void updateColumn(Types.NestedField field, Types.NestedField existingField) {
+    String fullName = partnerSchema.findColumnName(existingField.fieldId());
+
+    boolean needsOptionalUpdate = field.isOptional() && existingField.isRequired();
+    boolean needsTypeUpdate = field.type().isPrimitiveType() && !field.type().equals(existingField.type());
+    boolean needsDocUpdate = field.doc() != null && !field.doc().equals(existingField.doc());
+
+    if (needsOptionalUpdate) {
+      api.makeColumnOptional(fullName);
+    }
+
+    if (needsTypeUpdate) {
+      api.updateColumn(fullName, field.type().asPrimitiveType());
+    }
+
+    if (needsDocUpdate) {
+      api.updateColumnDoc(fullName, field.doc());
+    }
+  }
+
+  private static class PartnerIdByNameAccessors implements PartnerAccessors<Integer> {
+    private final Schema partnerSchema;
+
+    private PartnerIdByNameAccessors(Schema partnerSchema) {
+      this.partnerSchema = partnerSchema;
+    }
+
+    @Override
+    public Integer fieldPartner(Integer partnerFieldId, int fieldId, String name) {
+      Types.StructType struct;
+      if (partnerFieldId == -1) {

Review comment:
       @rdblue I think this is pretty much the only change I've made to your poc code - tests are passing just fine - will add some more from past use-cases - I realise that an exhaustive test suite is pretty much impossible for this feature but I'll try to cover both basic and complex use-cases




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#issuecomment-668126179


   @fbocse, @rdsr, while I was looking at this, I prototyped a way to avoid a couple of the issues that I pointed out with this. I've posted that [union prototype](https://gist.github.com/rdblue/1d39763be41b59b8637d50162f7fe2ea) as a gist.
   
   The gist includes a `SchemaWithPartnerVisitor` pattern that is like the `SchemaWithTypeVisitor`. It handles the logic of traversing two schemas at the same time and passing "partner" objects along with the schema that is being traversed. The partner objects are accessed using a small set of methods, like `fieldPartner` to access a field's partner from a struct's partner.
   
   The gist also includes an implementation, `UnionByNameVisitor`. That visitor extends `SchemaWithPartnerVisitor`, where the partner is the equivalent field ID in the "existing" schema. By passing the existing schema's IDs, we can avoid keeping track of the ancestor names in the visitor. Instead, the prototype looks up the name in the existing schema from the ID when it needs to call `addColumn` with a parent name. That avoids all of the string logic and the need for `beforeField` callbacks. It also implements my suggestion to return a boolean to indicate whether a type is missing.
   
   See what you think about that prototype. I think it's good to have better separation between the traversal logic and the union logic, it is nice to not need to maintain the ancestor list, and it is a bit cleaner when detecting what changes need to be applied.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
fbocse commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r451755790



##########
File path: core/src/test/java/org/apache/iceberg/TestSchemaUpdateSync.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.types.Type.PrimitiveType;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.BinaryType;
+import org.apache.iceberg.types.Types.BooleanType;
+import org.apache.iceberg.types.Types.DateType;
+import org.apache.iceberg.types.Types.DecimalType;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.FixedType;
+import org.apache.iceberg.types.Types.FloatType;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.TimeType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.types.Types.UUIDType;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestSchemaUpdateSync {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private static List<? extends PrimitiveType> primitiveTypes() {
+    return Lists.newArrayList(StringType.get(),
+        TimeType.get(),
+        TimestampType.withoutZone(),
+        TimestampType.withZone(),
+        UUIDType.get(),
+        DateType.get(),
+        BooleanType.get(),
+        BinaryType.get(),
+        DoubleType.get(),
+        IntegerType.get(),
+        FixedType.ofLength(10),
+        DecimalType.of(10, 2),
+        LongType.get(),
+        FloatType.get()
+    );
+  }
+
+  private static NestedField[] primitiveFields(Integer initialValue, List<? extends PrimitiveType> primitiveTypes) {
+    AtomicInteger i = new AtomicInteger(initialValue);
+    return primitiveTypes.stream()
+        .map(type -> optional(i.incrementAndGet(), type.toString(),
+            Types.fromPrimitiveString(type.toString()))).toArray(NestedField[]::new);
+  }
+
+  @Test
+  public void testAddTopLevelPrimitives() {
+    Schema newSchema = new Schema(primitiveFields(0, primitiveTypes()));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddTopLevelListOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aList", Types.ListType.ofOptional(2, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelMapOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aMap", Types.MapType.ofOptional(2, 3, primitiveType, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelStructOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(currentSchema).apply();
+      Assert.assertEquals(currentSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitive() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+      Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitives() {
+    Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+    Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+        primitiveFields(1, primitiveTypes()))));
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedLists() {
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2,
+            Types.ListType.ofOptional(3,
+                Types.ListType.ofOptional(4,
+                    Types.ListType.ofOptional(5,
+                        Types.ListType.ofOptional(6,
+                            Types.ListType.ofOptional(7,
+                                Types.ListType.ofOptional(8,
+                                    Types.ListType.ofOptional(9,
+                                        Types.ListType.ofOptional(10,
+                                            DecimalType.of(11, 20))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedStruct() {
+    Schema newSchema = new Schema(optional(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "struct3", Types.StructType.of(
+                    optional(4, "struct4", Types.StructType.of(
+                        optional(5, "struct5", Types.StructType.of(
+                            optional(6, "struct6", Types.StructType.of(
+                                optional(7, "aString", StringType.get()))))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedMaps() {
+    Schema newSchema = new Schema(optional(1, "struct", Types.MapType.ofOptional(
+        2, 3, StringType.get(), Types.MapType.ofOptional(
+            4, 5, StringType.get(), Types.MapType.ofOptional(
+                6, 7, StringType.get(), Types.MapType.ofOptional(
+                    8 ,9, StringType.get(), Types.MapType.ofOptional(
+                        10 ,11, StringType.get(), Types.MapType.ofOptional(
+                            12, 13, StringType.get(), StringType.get()))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelList() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aList.element: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, LongType.get())));
+    new SchemaUpdate(currentSchema, 2).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapValue() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.value: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), LongType.get())));
+    Schema apply = new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+    System.out.println(apply.toString());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapKey() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.key: string -> uuid");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , UUIDType.get(), StringType.get())));
+    new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // int	32-bit signed integers -> Can promote to long
+  public void testTypePromoteIntegerToLong() {
+    Schema currentSchema = new Schema(required(1, "aCol", IntegerType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", LongType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(LongType.get(), applied.asStruct().fields().get(0).type());
+  }
+
+  @Test
+  // float	32-bit IEEE 754 floating point -> Can promote to double
+  public void testTypePromoteFloatToDouble() {
+    Schema currentSchema = new Schema(required(1, "aCol", FloatType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", DoubleType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(DoubleType.get(), applied.asStruct().fields().get(0).type());
+    // Doesn't work Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    // java.lang.AssertionError:
+    // Expected :struct<1: aCol: required double>
+    // Actual   :struct<1: aCol: required double ()>
+  }
+
+  @Test
+  public void testInvalidTypePromoteDoubleToFloat() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aCol: double -> float");
+
+    Schema currentSchema = new Schema(required(1, "aCol", DoubleType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", FloatType.get()));
+
+    new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // decimal(P,S)	Fixed-point decimal; precision P, scale S	-> Scale is fixed [1], precision must be 38 or less
+  public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() {
+    Schema currentSchema = new Schema(required(1, "aCol", DecimalType.of(20, 1)));
+    Schema newSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1)));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddPrimitiveToNestedStruct() {
+    Schema schema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+          optional(2, "struct2", Types.StructType.of(
+                  optional(3, "list", Types.ListType.ofOptional(
+                      4, Types.StructType.of(
+                          optional(5, "value", StringType.get())))))))));
+
+    Schema newSchema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "list", Types.ListType.ofOptional(
+                    4, Types.StructType.of(
+                        optional(5, "time", TimeType.get())))))))));

Review comment:
       Right, "value" is missing, however the expectation isn't to delete it, the main use-case for this API is to accommodate `schema unions` as @rdblue's summed it up during our chat  in the Iceberg sync-up today.
   The use-case is as follows:
   
   1.  think a dataset with a large schema - call it `effective schema` 
   2. we get data that only contains particular columns - those columns make up only for a subset of the large schema - call it `observed schema` 
   3. as these chunks of data and their respective observed schemas are being landed in the iceberg table we want to track the union of all observed schemas as the Iceberg schema
   
   So in this particular `expected` is the union of `schema` and `newSchema`. 
   Does this make sense? 
   Does this resemble your use-case as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r460620209



##########
File path: core/src/test/java/org/apache/iceberg/TestSchemaUpdateSync.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.types.Type.PrimitiveType;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.BinaryType;
+import org.apache.iceberg.types.Types.BooleanType;
+import org.apache.iceberg.types.Types.DateType;
+import org.apache.iceberg.types.Types.DecimalType;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.FixedType;
+import org.apache.iceberg.types.Types.FloatType;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.TimeType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.types.Types.UUIDType;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestSchemaUpdateSync {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private static List<? extends PrimitiveType> primitiveTypes() {
+    return Lists.newArrayList(StringType.get(),
+        TimeType.get(),
+        TimestampType.withoutZone(),
+        TimestampType.withZone(),
+        UUIDType.get(),
+        DateType.get(),
+        BooleanType.get(),
+        BinaryType.get(),
+        DoubleType.get(),
+        IntegerType.get(),
+        FixedType.ofLength(10),
+        DecimalType.of(10, 2),
+        LongType.get(),
+        FloatType.get()
+    );
+  }
+
+  private static NestedField[] primitiveFields(Integer initialValue, List<? extends PrimitiveType> primitiveTypes) {
+    AtomicInteger i = new AtomicInteger(initialValue);
+    return primitiveTypes.stream()
+        .map(type -> optional(i.incrementAndGet(), type.toString(),
+            Types.fromPrimitiveString(type.toString()))).toArray(NestedField[]::new);
+  }
+
+  @Test
+  public void testAddTopLevelPrimitives() {
+    Schema newSchema = new Schema(primitiveFields(0, primitiveTypes()));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddTopLevelListOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aList", Types.ListType.ofOptional(2, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelMapOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aMap", Types.MapType.ofOptional(2, 3, primitiveType, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelStructOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(currentSchema).apply();
+      Assert.assertEquals(currentSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitive() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+      Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitives() {
+    Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+    Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+        primitiveFields(1, primitiveTypes()))));
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedLists() {
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2,
+            Types.ListType.ofOptional(3,
+                Types.ListType.ofOptional(4,
+                    Types.ListType.ofOptional(5,
+                        Types.ListType.ofOptional(6,
+                            Types.ListType.ofOptional(7,
+                                Types.ListType.ofOptional(8,
+                                    Types.ListType.ofOptional(9,
+                                        Types.ListType.ofOptional(10,
+                                            DecimalType.of(11, 20))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedStruct() {
+    Schema newSchema = new Schema(optional(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "struct3", Types.StructType.of(
+                    optional(4, "struct4", Types.StructType.of(
+                        optional(5, "struct5", Types.StructType.of(
+                            optional(6, "struct6", Types.StructType.of(
+                                optional(7, "aString", StringType.get()))))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedMaps() {
+    Schema newSchema = new Schema(optional(1, "struct", Types.MapType.ofOptional(
+        2, 3, StringType.get(), Types.MapType.ofOptional(
+            4, 5, StringType.get(), Types.MapType.ofOptional(
+                6, 7, StringType.get(), Types.MapType.ofOptional(
+                    8 ,9, StringType.get(), Types.MapType.ofOptional(
+                        10 ,11, StringType.get(), Types.MapType.ofOptional(
+                            12, 13, StringType.get(), StringType.get()))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelList() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aList.element: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, LongType.get())));
+    new SchemaUpdate(currentSchema, 2).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapValue() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.value: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), LongType.get())));
+    Schema apply = new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+    System.out.println(apply.toString());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapKey() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.key: string -> uuid");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , UUIDType.get(), StringType.get())));
+    new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // int	32-bit signed integers -> Can promote to long
+  public void testTypePromoteIntegerToLong() {
+    Schema currentSchema = new Schema(required(1, "aCol", IntegerType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", LongType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(LongType.get(), applied.asStruct().fields().get(0).type());
+  }
+
+  @Test
+  // float	32-bit IEEE 754 floating point -> Can promote to double
+  public void testTypePromoteFloatToDouble() {
+    Schema currentSchema = new Schema(required(1, "aCol", FloatType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", DoubleType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(DoubleType.get(), applied.asStruct().fields().get(0).type());
+    // Doesn't work Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    // java.lang.AssertionError:
+    // Expected :struct<1: aCol: required double>
+    // Actual   :struct<1: aCol: required double ()>
+  }
+
+  @Test
+  public void testInvalidTypePromoteDoubleToFloat() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aCol: double -> float");
+
+    Schema currentSchema = new Schema(required(1, "aCol", DoubleType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", FloatType.get()));
+
+    new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // decimal(P,S)	Fixed-point decimal; precision P, scale S	-> Scale is fixed [1], precision must be 38 or less
+  public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() {
+    Schema currentSchema = new Schema(required(1, "aCol", DecimalType.of(20, 1)));
+    Schema newSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1)));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddPrimitiveToNestedStruct() {
+    Schema schema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+          optional(2, "struct2", Types.StructType.of(
+                  optional(3, "list", Types.ListType.ofOptional(
+                      4, Types.StructType.of(
+                          optional(5, "value", StringType.get())))))))));
+
+    Schema newSchema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "list", Types.ListType.ofOptional(
+                    4, Types.StructType.of(
+                        optional(5, "time", TimeType.get())))))))));

Review comment:
       I'll find time to look at this tomorrow, hopefully. Sorry for the delay.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse removed a comment on pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
fbocse removed a comment on pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#issuecomment-685740015


   @rdblue @rdsd PR check in travis CI seems stuck, tried to reschedule it by closing/reopening PR but didn't work, any suggestions on how to get the hook working and build validated? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r482610125



##########
File path: api/src/main/java/org/apache/iceberg/UpdateSchema.java
##########
@@ -361,4 +361,14 @@ default UpdateSchema updateColumn(String name, Type.PrimitiveType newType, Strin
    *                                  change conflicts with other changes.
    */
   UpdateSchema moveAfter(String name, String afterName);
+
+
+  /**
+   * Applies all the additions and updates [type widening, field documentation]
+   * from the provided new schema
+   *
+   * @param newSchema - Input schema from which updates are applied
+   * @return this for method chaining
+   */
+  UpdateSchema unionWithByFieldName(Schema newSchema);

Review comment:
       How about `unionByName` or `unionByNameWith`?
   
   We'll also need to fill in the docs placeholder: `[type widening, field documentation]`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r453991666



##########
File path: core/src/test/java/org/apache/iceberg/TestSchemaUpdateSync.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.types.Type.PrimitiveType;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.BinaryType;
+import org.apache.iceberg.types.Types.BooleanType;
+import org.apache.iceberg.types.Types.DateType;
+import org.apache.iceberg.types.Types.DecimalType;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.FixedType;
+import org.apache.iceberg.types.Types.FloatType;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.TimeType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.types.Types.UUIDType;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestSchemaUpdateSync {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private static List<? extends PrimitiveType> primitiveTypes() {
+    return Lists.newArrayList(StringType.get(),
+        TimeType.get(),
+        TimestampType.withoutZone(),
+        TimestampType.withZone(),
+        UUIDType.get(),
+        DateType.get(),
+        BooleanType.get(),
+        BinaryType.get(),
+        DoubleType.get(),
+        IntegerType.get(),
+        FixedType.ofLength(10),
+        DecimalType.of(10, 2),
+        LongType.get(),
+        FloatType.get()
+    );
+  }
+
+  private static NestedField[] primitiveFields(Integer initialValue, List<? extends PrimitiveType> primitiveTypes) {
+    AtomicInteger i = new AtomicInteger(initialValue);
+    return primitiveTypes.stream()
+        .map(type -> optional(i.incrementAndGet(), type.toString(),
+            Types.fromPrimitiveString(type.toString()))).toArray(NestedField[]::new);
+  }
+
+  @Test
+  public void testAddTopLevelPrimitives() {
+    Schema newSchema = new Schema(primitiveFields(0, primitiveTypes()));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddTopLevelListOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aList", Types.ListType.ofOptional(2, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelMapOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aMap", Types.MapType.ofOptional(2, 3, primitiveType, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelStructOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(currentSchema).apply();
+      Assert.assertEquals(currentSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitive() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+      Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitives() {
+    Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+    Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+        primitiveFields(1, primitiveTypes()))));
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedLists() {
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2,
+            Types.ListType.ofOptional(3,
+                Types.ListType.ofOptional(4,
+                    Types.ListType.ofOptional(5,
+                        Types.ListType.ofOptional(6,
+                            Types.ListType.ofOptional(7,
+                                Types.ListType.ofOptional(8,
+                                    Types.ListType.ofOptional(9,
+                                        Types.ListType.ofOptional(10,
+                                            DecimalType.of(11, 20))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedStruct() {
+    Schema newSchema = new Schema(optional(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "struct3", Types.StructType.of(
+                    optional(4, "struct4", Types.StructType.of(
+                        optional(5, "struct5", Types.StructType.of(
+                            optional(6, "struct6", Types.StructType.of(
+                                optional(7, "aString", StringType.get()))))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedMaps() {
+    Schema newSchema = new Schema(optional(1, "struct", Types.MapType.ofOptional(
+        2, 3, StringType.get(), Types.MapType.ofOptional(
+            4, 5, StringType.get(), Types.MapType.ofOptional(
+                6, 7, StringType.get(), Types.MapType.ofOptional(
+                    8 ,9, StringType.get(), Types.MapType.ofOptional(
+                        10 ,11, StringType.get(), Types.MapType.ofOptional(
+                            12, 13, StringType.get(), StringType.get()))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelList() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aList.element: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, LongType.get())));
+    new SchemaUpdate(currentSchema, 2).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapValue() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.value: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), LongType.get())));
+    Schema apply = new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+    System.out.println(apply.toString());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapKey() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.key: string -> uuid");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , UUIDType.get(), StringType.get())));
+    new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // int	32-bit signed integers -> Can promote to long
+  public void testTypePromoteIntegerToLong() {
+    Schema currentSchema = new Schema(required(1, "aCol", IntegerType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", LongType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(LongType.get(), applied.asStruct().fields().get(0).type());
+  }
+
+  @Test
+  // float	32-bit IEEE 754 floating point -> Can promote to double
+  public void testTypePromoteFloatToDouble() {
+    Schema currentSchema = new Schema(required(1, "aCol", FloatType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", DoubleType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(DoubleType.get(), applied.asStruct().fields().get(0).type());
+    // Doesn't work Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    // java.lang.AssertionError:
+    // Expected :struct<1: aCol: required double>
+    // Actual   :struct<1: aCol: required double ()>
+  }
+
+  @Test
+  public void testInvalidTypePromoteDoubleToFloat() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aCol: double -> float");
+
+    Schema currentSchema = new Schema(required(1, "aCol", DoubleType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", FloatType.get()));
+
+    new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // decimal(P,S)	Fixed-point decimal; precision P, scale S	-> Scale is fixed [1], precision must be 38 or less
+  public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() {
+    Schema currentSchema = new Schema(required(1, "aCol", DecimalType.of(20, 1)));
+    Schema newSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1)));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddPrimitiveToNestedStruct() {
+    Schema schema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+          optional(2, "struct2", Types.StructType.of(
+                  optional(3, "list", Types.ListType.ofOptional(
+                      4, Types.StructType.of(
+                          optional(5, "value", StringType.get())))))))));
+
+    Schema newSchema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "list", Types.ListType.ofOptional(
+                    4, Types.StructType.of(
+                        optional(5, "time", TimeType.get())))))))));

Review comment:
       What about renames?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
fbocse commented on pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#issuecomment-658197287


   @rdblue during the sync-up when we talked on the use-case for this API I remember you brought up some ideas that we'd need to account for in this implementation - I remember it was related to upserting fields wrt to being required/ optional, do you happen to recall this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r451298044



##########
File path: core/src/test/java/org/apache/iceberg/TestSchemaUpdateSync.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.types.Type.PrimitiveType;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.BinaryType;
+import org.apache.iceberg.types.Types.BooleanType;
+import org.apache.iceberg.types.Types.DateType;
+import org.apache.iceberg.types.Types.DecimalType;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.FixedType;
+import org.apache.iceberg.types.Types.FloatType;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.TimeType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.types.Types.UUIDType;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestSchemaUpdateSync {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private static List<? extends PrimitiveType> primitiveTypes() {
+    return Lists.newArrayList(StringType.get(),
+        TimeType.get(),
+        TimestampType.withoutZone(),
+        TimestampType.withZone(),
+        UUIDType.get(),
+        DateType.get(),
+        BooleanType.get(),
+        BinaryType.get(),
+        DoubleType.get(),
+        IntegerType.get(),
+        FixedType.ofLength(10),
+        DecimalType.of(10, 2),
+        LongType.get(),
+        FloatType.get()
+    );
+  }
+
+  private static NestedField[] primitiveFields(Integer initialValue, List<? extends PrimitiveType> primitiveTypes) {
+    AtomicInteger i = new AtomicInteger(initialValue);
+    return primitiveTypes.stream()
+        .map(type -> optional(i.incrementAndGet(), type.toString(),
+            Types.fromPrimitiveString(type.toString()))).toArray(NestedField[]::new);
+  }
+
+  @Test
+  public void testAddTopLevelPrimitives() {
+    Schema newSchema = new Schema(primitiveFields(0, primitiveTypes()));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddTopLevelListOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aList", Types.ListType.ofOptional(2, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelMapOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aMap", Types.MapType.ofOptional(2, 3, primitiveType, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelStructOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(currentSchema).apply();
+      Assert.assertEquals(currentSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitive() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+      Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitives() {
+    Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+    Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+        primitiveFields(1, primitiveTypes()))));
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedLists() {
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2,
+            Types.ListType.ofOptional(3,
+                Types.ListType.ofOptional(4,
+                    Types.ListType.ofOptional(5,
+                        Types.ListType.ofOptional(6,
+                            Types.ListType.ofOptional(7,
+                                Types.ListType.ofOptional(8,
+                                    Types.ListType.ofOptional(9,
+                                        Types.ListType.ofOptional(10,
+                                            DecimalType.of(11, 20))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedStruct() {
+    Schema newSchema = new Schema(optional(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "struct3", Types.StructType.of(
+                    optional(4, "struct4", Types.StructType.of(
+                        optional(5, "struct5", Types.StructType.of(
+                            optional(6, "struct6", Types.StructType.of(
+                                optional(7, "aString", StringType.get()))))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedMaps() {
+    Schema newSchema = new Schema(optional(1, "struct", Types.MapType.ofOptional(
+        2, 3, StringType.get(), Types.MapType.ofOptional(
+            4, 5, StringType.get(), Types.MapType.ofOptional(
+                6, 7, StringType.get(), Types.MapType.ofOptional(
+                    8 ,9, StringType.get(), Types.MapType.ofOptional(
+                        10 ,11, StringType.get(), Types.MapType.ofOptional(
+                            12, 13, StringType.get(), StringType.get()))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelList() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aList.element: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, LongType.get())));
+    new SchemaUpdate(currentSchema, 2).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapValue() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.value: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), LongType.get())));
+    Schema apply = new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+    System.out.println(apply.toString());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapKey() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.key: string -> uuid");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , UUIDType.get(), StringType.get())));
+    new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // int	32-bit signed integers -> Can promote to long
+  public void testTypePromoteIntegerToLong() {
+    Schema currentSchema = new Schema(required(1, "aCol", IntegerType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", LongType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(LongType.get(), applied.asStruct().fields().get(0).type());
+  }
+
+  @Test
+  // float	32-bit IEEE 754 floating point -> Can promote to double
+  public void testTypePromoteFloatToDouble() {
+    Schema currentSchema = new Schema(required(1, "aCol", FloatType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", DoubleType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(DoubleType.get(), applied.asStruct().fields().get(0).type());
+    // Doesn't work Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    // java.lang.AssertionError:
+    // Expected :struct<1: aCol: required double>
+    // Actual   :struct<1: aCol: required double ()>
+  }
+
+  @Test
+  public void testInvalidTypePromoteDoubleToFloat() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aCol: double -> float");
+
+    Schema currentSchema = new Schema(required(1, "aCol", DoubleType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", FloatType.get()));
+
+    new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // decimal(P,S)	Fixed-point decimal; precision P, scale S	-> Scale is fixed [1], precision must be 38 or less
+  public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() {
+    Schema currentSchema = new Schema(required(1, "aCol", DecimalType.of(20, 1)));
+    Schema newSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1)));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddPrimitiveToNestedStruct() {
+    Schema schema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+          optional(2, "struct2", Types.StructType.of(
+                  optional(3, "list", Types.ListType.ofOptional(
+                      4, Types.StructType.of(
+                          optional(5, "value", StringType.get())))))))));
+
+    Schema newSchema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "list", Types.ListType.ofOptional(
+                    4, Types.StructType.of(
+                        optional(5, "time", TimeType.get())))))))));

Review comment:
       seems like "value" is missing from newSchema.  I think deletes are not handled that's why. We should consider adding deletes and renames too

##########
File path: core/src/main/java/org/apache/iceberg/SchemaUpdate.java
##########
@@ -308,6 +314,159 @@ public UpdateSchema moveAfter(String name, String afterName) {
     return this;
   }
 
+  @Override
+  public UpdateSchema upsertSchema(Schema newSchema) {
+    TypeUtil.visit(newSchema, new ApplyUpdates(this, schema));
+    return this;
+  }
+
+  private static class ApplyUpdates extends TypeUtil.SchemaVisitor<Void> {
+    private static final Joiner DOT = Joiner.on(".");
+    private final Deque<String> fieldNames = Lists.newLinkedList();
+    private NestedField currentField = null;
+
+    private final Schema baseSchema;
+    private final UpdateSchema api;
+    private final Map<String, Integer> indexByName;
+
+    private ApplyUpdates(UpdateSchema api, Schema baseSchema) {
+      this.api = api;
+      this.baseSchema = baseSchema;
+      this.indexByName = TypeUtil.indexByName(baseSchema.asStruct());
+    }
+
+    @Override
+    public void beforeListElement(NestedField elementField) {
+      beforeField(elementField);
+    }
+
+    @Override
+    public void afterListElement(NestedField elementField) {
+      afterField(elementField);
+    }
+
+    @Override
+    public void beforeMapKey(Types.NestedField keyField) {
+      beforeField(keyField);
+    }
+
+    @Override
+    public void afterMapKey(Types.NestedField keyField) {
+      afterField(keyField);
+    }
+
+    @Override
+    public void beforeMapValue(Types.NestedField valueField) {
+      beforeField(valueField);
+    }
+
+    @Override
+    public void afterMapValue(Types.NestedField valueField) {
+      afterField(valueField);
+    }
+
+    @Override
+    public void beforeField(NestedField field) {
+      fieldNames.push(field.name()); // we don't expect `element` to show up - it breaks
+      currentField = field;
+    }
+
+    @Override
+    public void afterField(NestedField field) {
+      fieldNames.pop();
+    }
+
+    @Override
+    public Void field(NestedField field, Void fieldResult) {
+      return super.field(field, fieldResult);
+    }
+
+    @Override
+    public Void list(ListType list, Void elementResult) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), Types.ListType.ofOptional(0,   list.elementType()), ancestors());
+      } else if (!field.type().isListType()) {
+        throw new IllegalArgumentException(
+            "Cannot update existing field: " + fullName + " of type: " + field
+                .type() + " to type list");
+      }
+      return null;
+    }
+
+    @Override
+    public Void map(MapType map, Void keyResult, Void valueResult) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), Types.MapType.ofOptional(0, 1,   map.keyType(), map.valueType()), ancestors());
+      } else if (!field.type().isMapType()) {
+        throw new IllegalArgumentException(
+            "Cannot update existing field: " + fullName + " of type: " + field
+                .type() + " to type map");
+      }
+      return null;
+    }
+
+    @Override
+    public Void struct(Types.StructType struct, List<Void> fieldResults) {
+      if(fieldNames.isEmpty()) return null; // this is the root struct
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), Types.StructType.of(struct.fields()), ancestors());
+      } else if (!field.type().isStructType()) {
+        throw new IllegalArgumentException(
+            "Cannot update existing field: " + fullName + " of type: " + field.type()
+                + " to type struct");
+      }
+      return null;
+    }
+
+    @Override
+    public Void primitive(PrimitiveType primitive) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      PrimitiveType newFieldType = Types.fromPrimitiveString(primitive.toString());
+      if (field == null) {
+        addColumn(currentField.name(), Types.fromPrimitiveString(primitive.toString()), ancestors());
+      } else if (!field.type().isPrimitiveType()) {
+        throw new IllegalArgumentException(
+            "Cannot update existing field: " + field.name() + " of type: "
+                + field.type() + " to primitive type: " + primitive.typeId().name());
+      } else if (!newFieldType.equals(field.type())) {

Review comment:
       We may also need to handle renames and deletes. These can be detected if we do the updates [additions, type widening and renames and deletes] using field ids instead of names. This was one of the main comments by @rdblue  on #759 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
fbocse commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r482068443



##########
File path: core/src/main/java/org/apache/iceberg/schema/UnionByNameVisitor.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.schema;
+
+import java.util.List;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Visitor class that accumulates the set of changes needed to evolve an existing schema into the union of the
+ * existing and a new schema. Changes are added to an {@link UpdateSchema} operation.
+ */
+public class UnionByNameVisitor extends SchemaWithPartnerVisitor<Integer, Boolean> {
+
+  private final UpdateSchema api;
+  private final Schema partnerSchema;
+
+  private UnionByNameVisitor(UpdateSchema api, Schema partnerSchema) {
+    this.api = api;
+    this.partnerSchema = partnerSchema;
+  }
+
+  /**
+   * Adds changes needed to produce a union of two schemas to an {@link UpdateSchema} operation.
+   * <p>
+   * Changes are accumulated to evolve the existingSchema into a union with newSchema.
+   *
+   * @param api an UpdateSchema for adding changes
+   * @param existingSchema an existing schema
+   * @param newSchema a new schema to compare with the existing
+   */
+  public static void visit(UpdateSchema api, Schema existingSchema, Schema newSchema) {
+    visit(newSchema, -1, new UnionByNameVisitor(api, existingSchema), new PartnerIdByNameAccessors(existingSchema));
+  }
+
+  @Override
+  public Boolean struct(Types.StructType struct, Integer partnerId, List<Boolean> missingPositions) {
+    if (partnerId == null) {
+      return true;
+    }
+
+    List<Types.NestedField> fields = struct.fields();
+    Types.StructType partnerStruct = findFieldType(partnerId).asStructType();
+    IntStream.range(0, missingPositions.size())
+        .forEach(pos -> {
+          Boolean isMissing = missingPositions.get(pos);
+          Types.NestedField field = fields.get(pos);
+          if (isMissing) {
+            addColumn(partnerId, field);
+          } else {
+            updateColumn(field, partnerStruct.field(field.name()));
+          }
+        });
+
+    return false;
+  }
+
+  @Override
+  public Boolean field(Types.NestedField field, Integer partnerId, Boolean isFieldMissing) {
+    return partnerId == null;
+  }
+
+  @Override
+  public Boolean list(Types.ListType list, Integer partnerId, Boolean isElementMissing) {
+    if (partnerId == null) {
+      return true;
+    }
+
+    Preconditions.checkState(!isElementMissing, "Error traversing schemas: element is missing, but list is present");
+
+    Types.ListType partnerList = findFieldType(partnerId).asListType();
+    updateColumn(list.fields().get(0), partnerList.fields().get(0));
+
+    return false;
+  }
+
+  @Override
+  public Boolean map(Types.MapType map, Integer partnerId, Boolean isKeyMissing, Boolean isValueMissing) {
+    if (partnerId == null) {
+      return true;
+    }
+
+    Preconditions.checkState(!isKeyMissing, "Error traversing schemas: key is missing, but map is present");
+    Preconditions.checkState(!isValueMissing, "Error traversing schemas: value is missing, but map is present");
+
+    Types.MapType partnerMap = findFieldType(partnerId).asMapType();
+    updateColumn(map.fields().get(0), partnerMap.fields().get(0));
+    updateColumn(map.fields().get(1), partnerMap.fields().get(1));
+
+    return false;
+  }
+
+  @Override
+  public Boolean primitive(Type.PrimitiveType primitive, Integer partnerId) {
+    return partnerId == null;
+  }
+
+  private Type findFieldType(int fieldId) {
+    if (fieldId == -1) {
+      return partnerSchema.asStruct();
+    } else {
+      return partnerSchema.findField(fieldId).type();
+    }
+  }
+
+  private void addColumn(int parentId, Types.NestedField field) {
+    String parentName = partnerSchema.findColumnName(parentId);
+    api.addColumn(parentName, field.name(), field.type(), field.doc());
+  }
+
+  private void updateColumn(Types.NestedField field, Types.NestedField existingField) {
+    String fullName = partnerSchema.findColumnName(existingField.fieldId());
+
+    boolean needsOptionalUpdate = field.isOptional() && existingField.isRequired();
+    boolean needsTypeUpdate = field.type().isPrimitiveType() && !field.type().equals(existingField.type());
+    boolean needsDocUpdate = field.doc() != null && !field.doc().equals(existingField.doc());
+
+    if (needsOptionalUpdate) {
+      api.makeColumnOptional(fullName);
+    }
+
+    if (needsTypeUpdate) {
+      api.updateColumn(fullName, field.type().asPrimitiveType());
+    }
+
+    if (needsDocUpdate) {
+      api.updateColumnDoc(fullName, field.doc());
+    }
+  }
+
+  private static class PartnerIdByNameAccessors implements PartnerAccessors<Integer> {
+    private final Schema partnerSchema;
+
+    private PartnerIdByNameAccessors(Schema partnerSchema) {
+      this.partnerSchema = partnerSchema;
+    }
+
+    @Override
+    public Integer fieldPartner(Integer partnerFieldId, int fieldId, String name) {
+      Types.StructType struct;
+      if (partnerFieldId == -1) {

Review comment:
       @rdblue I think this conditional here is pretty much the only change I've made to your poc code - tests are passing just fine - will add some more from past use-cases - I realise that an exhaustive test suite is pretty much impossible for this feature but I'll try to cover both basic and complex use-cases




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r482498138



##########
File path: core/src/main/java/org/apache/iceberg/schema/SchemaWithPartnerVisitor.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.schema;
+
+import java.util.List;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+public abstract class SchemaWithPartnerVisitor<P, R> implements TypeVisitor<P, R> {

Review comment:
       Why extract `TypeVisitor<P, R>` from this? It isn't obvious to me how that's useful.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
fbocse commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r451755790



##########
File path: core/src/test/java/org/apache/iceberg/TestSchemaUpdateSync.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.types.Type.PrimitiveType;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.BinaryType;
+import org.apache.iceberg.types.Types.BooleanType;
+import org.apache.iceberg.types.Types.DateType;
+import org.apache.iceberg.types.Types.DecimalType;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.FixedType;
+import org.apache.iceberg.types.Types.FloatType;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.TimeType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.types.Types.UUIDType;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestSchemaUpdateSync {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private static List<? extends PrimitiveType> primitiveTypes() {
+    return Lists.newArrayList(StringType.get(),
+        TimeType.get(),
+        TimestampType.withoutZone(),
+        TimestampType.withZone(),
+        UUIDType.get(),
+        DateType.get(),
+        BooleanType.get(),
+        BinaryType.get(),
+        DoubleType.get(),
+        IntegerType.get(),
+        FixedType.ofLength(10),
+        DecimalType.of(10, 2),
+        LongType.get(),
+        FloatType.get()
+    );
+  }
+
+  private static NestedField[] primitiveFields(Integer initialValue, List<? extends PrimitiveType> primitiveTypes) {
+    AtomicInteger i = new AtomicInteger(initialValue);
+    return primitiveTypes.stream()
+        .map(type -> optional(i.incrementAndGet(), type.toString(),
+            Types.fromPrimitiveString(type.toString()))).toArray(NestedField[]::new);
+  }
+
+  @Test
+  public void testAddTopLevelPrimitives() {
+    Schema newSchema = new Schema(primitiveFields(0, primitiveTypes()));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddTopLevelListOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aList", Types.ListType.ofOptional(2, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelMapOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aMap", Types.MapType.ofOptional(2, 3, primitiveType, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelStructOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(currentSchema).apply();
+      Assert.assertEquals(currentSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitive() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+      Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitives() {
+    Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+    Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+        primitiveFields(1, primitiveTypes()))));
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedLists() {
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2,
+            Types.ListType.ofOptional(3,
+                Types.ListType.ofOptional(4,
+                    Types.ListType.ofOptional(5,
+                        Types.ListType.ofOptional(6,
+                            Types.ListType.ofOptional(7,
+                                Types.ListType.ofOptional(8,
+                                    Types.ListType.ofOptional(9,
+                                        Types.ListType.ofOptional(10,
+                                            DecimalType.of(11, 20))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedStruct() {
+    Schema newSchema = new Schema(optional(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "struct3", Types.StructType.of(
+                    optional(4, "struct4", Types.StructType.of(
+                        optional(5, "struct5", Types.StructType.of(
+                            optional(6, "struct6", Types.StructType.of(
+                                optional(7, "aString", StringType.get()))))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedMaps() {
+    Schema newSchema = new Schema(optional(1, "struct", Types.MapType.ofOptional(
+        2, 3, StringType.get(), Types.MapType.ofOptional(
+            4, 5, StringType.get(), Types.MapType.ofOptional(
+                6, 7, StringType.get(), Types.MapType.ofOptional(
+                    8 ,9, StringType.get(), Types.MapType.ofOptional(
+                        10 ,11, StringType.get(), Types.MapType.ofOptional(
+                            12, 13, StringType.get(), StringType.get()))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelList() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aList.element: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, LongType.get())));
+    new SchemaUpdate(currentSchema, 2).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapValue() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.value: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), LongType.get())));
+    Schema apply = new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+    System.out.println(apply.toString());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapKey() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.key: string -> uuid");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , UUIDType.get(), StringType.get())));
+    new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // int	32-bit signed integers -> Can promote to long
+  public void testTypePromoteIntegerToLong() {
+    Schema currentSchema = new Schema(required(1, "aCol", IntegerType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", LongType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(LongType.get(), applied.asStruct().fields().get(0).type());
+  }
+
+  @Test
+  // float	32-bit IEEE 754 floating point -> Can promote to double
+  public void testTypePromoteFloatToDouble() {
+    Schema currentSchema = new Schema(required(1, "aCol", FloatType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", DoubleType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(DoubleType.get(), applied.asStruct().fields().get(0).type());
+    // Doesn't work Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    // java.lang.AssertionError:
+    // Expected :struct<1: aCol: required double>
+    // Actual   :struct<1: aCol: required double ()>
+  }
+
+  @Test
+  public void testInvalidTypePromoteDoubleToFloat() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aCol: double -> float");
+
+    Schema currentSchema = new Schema(required(1, "aCol", DoubleType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", FloatType.get()));
+
+    new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // decimal(P,S)	Fixed-point decimal; precision P, scale S	-> Scale is fixed [1], precision must be 38 or less
+  public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() {
+    Schema currentSchema = new Schema(required(1, "aCol", DecimalType.of(20, 1)));
+    Schema newSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1)));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddPrimitiveToNestedStruct() {
+    Schema schema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+          optional(2, "struct2", Types.StructType.of(
+                  optional(3, "list", Types.ListType.ofOptional(
+                      4, Types.StructType.of(
+                          optional(5, "value", StringType.get())))))))));
+
+    Schema newSchema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "list", Types.ListType.ofOptional(
+                    4, Types.StructType.of(
+                        optional(5, "time", TimeType.get())))))))));

Review comment:
       Right, nor should it, the main use-case for this API is to accommodate `schema unions` as @rdblue's summed it up during our chat  in the Iceberg sync-up today.
   The use-case is as follows:
   
   1.  think a dataset with a large schema - call it `effective schema` 
   2. we get data that only contains particular columns - those columns make up only for a subset of the large schema - call it `observed schema` 
   3. as these chunks of data and their respective observed schemas are being landed in the iceberg table we want to track the union of all observed schemas as the Iceberg schema
   
   So in this particular `expected` is the union of `schema` and `newSchema`. 
   Does this make sense? 
   Does this resemble your use-case as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r463874818



##########
File path: core/src/test/java/org/apache/iceberg/TestSchemaUpdateSync.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.types.Type.PrimitiveType;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.BinaryType;
+import org.apache.iceberg.types.Types.BooleanType;
+import org.apache.iceberg.types.Types.DateType;
+import org.apache.iceberg.types.Types.DecimalType;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.FixedType;
+import org.apache.iceberg.types.Types.FloatType;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.TimeType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.types.Types.UUIDType;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestSchemaUpdateSync {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private static List<? extends PrimitiveType> primitiveTypes() {
+    return Lists.newArrayList(StringType.get(),
+        TimeType.get(),
+        TimestampType.withoutZone(),
+        TimestampType.withZone(),
+        UUIDType.get(),
+        DateType.get(),
+        BooleanType.get(),
+        BinaryType.get(),
+        DoubleType.get(),
+        IntegerType.get(),
+        FixedType.ofLength(10),
+        DecimalType.of(10, 2),
+        LongType.get(),
+        FloatType.get()
+    );
+  }
+
+  private static NestedField[] primitiveFields(Integer initialValue, List<? extends PrimitiveType> primitiveTypes) {
+    AtomicInteger i = new AtomicInteger(initialValue);
+    return primitiveTypes.stream()
+        .map(type -> optional(i.incrementAndGet(), type.toString(),
+            Types.fromPrimitiveString(type.toString()))).toArray(NestedField[]::new);
+  }
+
+  @Test
+  public void testAddTopLevelPrimitives() {
+    Schema newSchema = new Schema(primitiveFields(0, primitiveTypes()));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddTopLevelListOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aList", Types.ListType.ofOptional(2, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelMapOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aMap", Types.MapType.ofOptional(2, 3, primitiveType, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelStructOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(currentSchema).apply();
+      Assert.assertEquals(currentSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitive() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+      Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitives() {
+    Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+    Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+        primitiveFields(1, primitiveTypes()))));
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedLists() {
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2,
+            Types.ListType.ofOptional(3,
+                Types.ListType.ofOptional(4,
+                    Types.ListType.ofOptional(5,
+                        Types.ListType.ofOptional(6,
+                            Types.ListType.ofOptional(7,
+                                Types.ListType.ofOptional(8,
+                                    Types.ListType.ofOptional(9,
+                                        Types.ListType.ofOptional(10,
+                                            DecimalType.of(11, 20))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedStruct() {
+    Schema newSchema = new Schema(optional(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "struct3", Types.StructType.of(
+                    optional(4, "struct4", Types.StructType.of(
+                        optional(5, "struct5", Types.StructType.of(
+                            optional(6, "struct6", Types.StructType.of(
+                                optional(7, "aString", StringType.get()))))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedMaps() {
+    Schema newSchema = new Schema(optional(1, "struct", Types.MapType.ofOptional(
+        2, 3, StringType.get(), Types.MapType.ofOptional(
+            4, 5, StringType.get(), Types.MapType.ofOptional(
+                6, 7, StringType.get(), Types.MapType.ofOptional(
+                    8 ,9, StringType.get(), Types.MapType.ofOptional(
+                        10 ,11, StringType.get(), Types.MapType.ofOptional(
+                            12, 13, StringType.get(), StringType.get()))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelList() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aList.element: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, LongType.get())));
+    new SchemaUpdate(currentSchema, 2).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapValue() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.value: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), LongType.get())));
+    Schema apply = new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+    System.out.println(apply.toString());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapKey() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.key: string -> uuid");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , UUIDType.get(), StringType.get())));
+    new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // int	32-bit signed integers -> Can promote to long
+  public void testTypePromoteIntegerToLong() {
+    Schema currentSchema = new Schema(required(1, "aCol", IntegerType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", LongType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(LongType.get(), applied.asStruct().fields().get(0).type());
+  }
+
+  @Test
+  // float	32-bit IEEE 754 floating point -> Can promote to double
+  public void testTypePromoteFloatToDouble() {
+    Schema currentSchema = new Schema(required(1, "aCol", FloatType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", DoubleType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(DoubleType.get(), applied.asStruct().fields().get(0).type());
+    // Doesn't work Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    // java.lang.AssertionError:
+    // Expected :struct<1: aCol: required double>
+    // Actual   :struct<1: aCol: required double ()>
+  }
+
+  @Test
+  public void testInvalidTypePromoteDoubleToFloat() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aCol: double -> float");
+
+    Schema currentSchema = new Schema(required(1, "aCol", DoubleType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", FloatType.get()));
+
+    new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // decimal(P,S)	Fixed-point decimal; precision P, scale S	-> Scale is fixed [1], precision must be 38 or less
+  public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() {
+    Schema currentSchema = new Schema(required(1, "aCol", DecimalType.of(20, 1)));
+    Schema newSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1)));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddPrimitiveToNestedStruct() {
+    Schema schema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+          optional(2, "struct2", Types.StructType.of(
+                  optional(3, "list", Types.ListType.ofOptional(
+                      4, Types.StructType.of(
+                          optional(5, "value", StringType.get())))))))));
+
+    Schema newSchema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "list", Types.ListType.ofOptional(
+                    4, Types.StructType.of(
+                        optional(5, "time", TimeType.get())))))))));

Review comment:
       I just went back to the sync notes to refresh my memory and found that I hadn't posted them to the doc! I updated it. Sorry about that.
   
   Just catching up on this now. I think the main use case that we're trying to solve here is the union case, where you just need to produce a schema with a union of the fields from all of the observed schemas. I think focusing on that use case makes the choice of whether to support delete clear: just because a column is missing, doesn't mean it shouldn't be in the union. Renames are similar: if the field exists, do nothing and if it doesn't exist, add it.
   
   The only case where we could detect a rename in a union operation is if both schemas have IDs. But then I would say that we should avoid unnecessary modification of the union schema. That's because we don't know which name is correct (the existing name or the observed name) but using the observed name is more likely to break existing queries for the table.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r453991404



##########
File path: core/src/test/java/org/apache/iceberg/TestSchemaUpdateSync.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.types.Type.PrimitiveType;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.BinaryType;
+import org.apache.iceberg.types.Types.BooleanType;
+import org.apache.iceberg.types.Types.DateType;
+import org.apache.iceberg.types.Types.DecimalType;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.FixedType;
+import org.apache.iceberg.types.Types.FloatType;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.TimeType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.types.Types.UUIDType;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestSchemaUpdateSync {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private static List<? extends PrimitiveType> primitiveTypes() {
+    return Lists.newArrayList(StringType.get(),
+        TimeType.get(),
+        TimestampType.withoutZone(),
+        TimestampType.withZone(),
+        UUIDType.get(),
+        DateType.get(),
+        BooleanType.get(),
+        BinaryType.get(),
+        DoubleType.get(),
+        IntegerType.get(),
+        FixedType.ofLength(10),
+        DecimalType.of(10, 2),
+        LongType.get(),
+        FloatType.get()
+    );
+  }
+
+  private static NestedField[] primitiveFields(Integer initialValue, List<? extends PrimitiveType> primitiveTypes) {
+    AtomicInteger i = new AtomicInteger(initialValue);
+    return primitiveTypes.stream()
+        .map(type -> optional(i.incrementAndGet(), type.toString(),
+            Types.fromPrimitiveString(type.toString()))).toArray(NestedField[]::new);
+  }
+
+  @Test
+  public void testAddTopLevelPrimitives() {
+    Schema newSchema = new Schema(primitiveFields(0, primitiveTypes()));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddTopLevelListOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aList", Types.ListType.ofOptional(2, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelMapOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aMap", Types.MapType.ofOptional(2, 3, primitiveType, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelStructOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(currentSchema).apply();
+      Assert.assertEquals(currentSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitive() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+      Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitives() {
+    Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+    Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+        primitiveFields(1, primitiveTypes()))));
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedLists() {
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2,
+            Types.ListType.ofOptional(3,
+                Types.ListType.ofOptional(4,
+                    Types.ListType.ofOptional(5,
+                        Types.ListType.ofOptional(6,
+                            Types.ListType.ofOptional(7,
+                                Types.ListType.ofOptional(8,
+                                    Types.ListType.ofOptional(9,
+                                        Types.ListType.ofOptional(10,
+                                            DecimalType.of(11, 20))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedStruct() {
+    Schema newSchema = new Schema(optional(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "struct3", Types.StructType.of(
+                    optional(4, "struct4", Types.StructType.of(
+                        optional(5, "struct5", Types.StructType.of(
+                            optional(6, "struct6", Types.StructType.of(
+                                optional(7, "aString", StringType.get()))))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedMaps() {
+    Schema newSchema = new Schema(optional(1, "struct", Types.MapType.ofOptional(
+        2, 3, StringType.get(), Types.MapType.ofOptional(
+            4, 5, StringType.get(), Types.MapType.ofOptional(
+                6, 7, StringType.get(), Types.MapType.ofOptional(
+                    8 ,9, StringType.get(), Types.MapType.ofOptional(
+                        10 ,11, StringType.get(), Types.MapType.ofOptional(
+                            12, 13, StringType.get(), StringType.get()))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelList() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aList.element: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, LongType.get())));
+    new SchemaUpdate(currentSchema, 2).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapValue() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.value: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), LongType.get())));
+    Schema apply = new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+    System.out.println(apply.toString());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapKey() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.key: string -> uuid");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , UUIDType.get(), StringType.get())));
+    new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // int	32-bit signed integers -> Can promote to long
+  public void testTypePromoteIntegerToLong() {
+    Schema currentSchema = new Schema(required(1, "aCol", IntegerType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", LongType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(LongType.get(), applied.asStruct().fields().get(0).type());
+  }
+
+  @Test
+  // float	32-bit IEEE 754 floating point -> Can promote to double
+  public void testTypePromoteFloatToDouble() {
+    Schema currentSchema = new Schema(required(1, "aCol", FloatType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", DoubleType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(DoubleType.get(), applied.asStruct().fields().get(0).type());
+    // Doesn't work Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    // java.lang.AssertionError:
+    // Expected :struct<1: aCol: required double>
+    // Actual   :struct<1: aCol: required double ()>
+  }
+
+  @Test
+  public void testInvalidTypePromoteDoubleToFloat() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aCol: double -> float");
+
+    Schema currentSchema = new Schema(required(1, "aCol", DoubleType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", FloatType.get()));
+
+    new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // decimal(P,S)	Fixed-point decimal; precision P, scale S	-> Scale is fixed [1], precision must be 38 or less
+  public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() {
+    Schema currentSchema = new Schema(required(1, "aCol", DecimalType.of(20, 1)));
+    Schema newSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1)));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddPrimitiveToNestedStruct() {
+    Schema schema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+          optional(2, "struct2", Types.StructType.of(
+                  optional(3, "list", Types.ListType.ofOptional(
+                      4, Types.StructType.of(
+                          optional(5, "value", StringType.get())))))))));
+
+    Schema newSchema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "list", Types.ListType.ofOptional(
+                    4, Types.StructType.of(
+                        optional(5, "time", TimeType.get())))))))));

Review comment:
       Yes, that use case makes sense to me! It will work for our use case too as we do not support deletes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r464525224



##########
File path: core/src/main/java/org/apache/iceberg/SchemaUpdate.java
##########
@@ -308,6 +314,162 @@ public UpdateSchema moveAfter(String name, String afterName) {
     return this;
   }
 
+  @Override
+  public UpdateSchema upsertSchema(Schema newSchema) {
+    TypeUtil.visit(newSchema, new ApplyUpdates(this, schema));
+    return this;
+  }
+
+  private static class ApplyUpdates extends TypeUtil.SchemaVisitor<Void> {
+    private static final Joiner DOT = Joiner.on(".");
+    private final Deque<String> fieldNames = Lists.newLinkedList();
+    private NestedField currentField = null;
+
+    private final Schema baseSchema;
+    private final UpdateSchema api;
+    private final Map<String, Integer> indexByName;
+
+    private ApplyUpdates(UpdateSchema api, Schema baseSchema) {
+      this.api = api;
+      this.baseSchema = baseSchema;
+      this.indexByName = TypeUtil.indexByName(baseSchema.asStruct());
+    }
+
+    @Override
+    public void beforeListElement(NestedField elementField) {
+      beforeField(elementField);
+    }
+
+    @Override
+    public void afterListElement(NestedField elementField) {
+      afterField(elementField);
+    }
+
+    @Override
+    public void beforeMapKey(Types.NestedField keyField) {
+      beforeField(keyField);
+    }
+
+    @Override
+    public void afterMapKey(Types.NestedField keyField) {
+      afterField(keyField);
+    }
+
+    @Override
+    public void beforeMapValue(Types.NestedField valueField) {
+      beforeField(valueField);
+    }
+
+    @Override
+    public void afterMapValue(Types.NestedField valueField) {
+      afterField(valueField);
+    }
+
+    @Override
+    public void beforeField(NestedField field) {
+      fieldNames.push(field.name()); // we don't expect `element` to show up - it breaks
+      currentField = field;
+    }
+
+    @Override
+    public void afterField(NestedField field) {
+      fieldNames.pop();
+    }
+
+    @Override
+    public Void field(NestedField field, Void fieldResult) {
+      return super.field(field, fieldResult);
+    }
+
+    @Override
+    public Void list(ListType list, Void elementResult) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), Types.ListType.ofOptional(0,   list.elementType()), ancestors());
+      } else if (!field.type().isListType()) {
+        throw new IllegalArgumentException(

Review comment:
       This visitor includes quite a bit of logic for traversing two schemas at once, by keeping track of the current name and looking up the equivalent in the existing/base schema. This check is also primarily to validate structure.
   
   I think it would be good to separate the logic to traverse schemas into a different visitor class, like we've done with the other "partner" visitors. That way, the union visitor would just be handed two types at the same place in a schema and can decide whether the observed type needs to be added, used to update, or if no action is needed.
   
   In addition, we should be able to construct that visitor so that it keeps track of the place in the existing schema, so that we don't need to keep track of ancestors.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r463887853



##########
File path: api/src/main/java/org/apache/iceberg/UpdateSchema.java
##########
@@ -361,4 +361,14 @@ default UpdateSchema updateColumn(String name, Type.PrimitiveType newType, Strin
    *                                  change conflicts with other changes.
    */
   UpdateSchema moveAfter(String name, String afterName);
+
+
+  /**
+   * Applies all the additions and updates [type widening, field documentation]
+   * from the input schema
+   *
+   * @param newSchema - Input schema from which updates are applied
+   * @return this for method chaining
+   */
+  UpdateSchema upsertSchema(Schema newSchema);

Review comment:
       It seems odd to me that this is here. Does this support multiple calls to this method, or do we expect to mix this union operation with the others? If not, then I think we should consider how to separate the union logic from the API. We try to make sure that all of the configuration on table operations can be used at the same time, unless they logically conflict (like rename x -> y then delete y).
   
   If this is going to be incompatible or even just not recommended with the other configuration methods, then we should consider moving it to either its own operation or a class that helps configure the normal `UpdateSchema` operation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
fbocse commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r454374562



##########
File path: core/src/test/java/org/apache/iceberg/TestSchemaUpdateSync.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.types.Type.PrimitiveType;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.BinaryType;
+import org.apache.iceberg.types.Types.BooleanType;
+import org.apache.iceberg.types.Types.DateType;
+import org.apache.iceberg.types.Types.DecimalType;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.FixedType;
+import org.apache.iceberg.types.Types.FloatType;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.TimeType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.types.Types.UUIDType;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestSchemaUpdateSync {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private static List<? extends PrimitiveType> primitiveTypes() {
+    return Lists.newArrayList(StringType.get(),
+        TimeType.get(),
+        TimestampType.withoutZone(),
+        TimestampType.withZone(),
+        UUIDType.get(),
+        DateType.get(),
+        BooleanType.get(),
+        BinaryType.get(),
+        DoubleType.get(),
+        IntegerType.get(),
+        FixedType.ofLength(10),
+        DecimalType.of(10, 2),
+        LongType.get(),
+        FloatType.get()
+    );
+  }
+
+  private static NestedField[] primitiveFields(Integer initialValue, List<? extends PrimitiveType> primitiveTypes) {
+    AtomicInteger i = new AtomicInteger(initialValue);
+    return primitiveTypes.stream()
+        .map(type -> optional(i.incrementAndGet(), type.toString(),
+            Types.fromPrimitiveString(type.toString()))).toArray(NestedField[]::new);
+  }
+
+  @Test
+  public void testAddTopLevelPrimitives() {
+    Schema newSchema = new Schema(primitiveFields(0, primitiveTypes()));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddTopLevelListOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aList", Types.ListType.ofOptional(2, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelMapOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aMap", Types.MapType.ofOptional(2, 3, primitiveType, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelStructOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(currentSchema).apply();
+      Assert.assertEquals(currentSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitive() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+      Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitives() {
+    Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+    Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+        primitiveFields(1, primitiveTypes()))));
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedLists() {
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2,
+            Types.ListType.ofOptional(3,
+                Types.ListType.ofOptional(4,
+                    Types.ListType.ofOptional(5,
+                        Types.ListType.ofOptional(6,
+                            Types.ListType.ofOptional(7,
+                                Types.ListType.ofOptional(8,
+                                    Types.ListType.ofOptional(9,
+                                        Types.ListType.ofOptional(10,
+                                            DecimalType.of(11, 20))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedStruct() {
+    Schema newSchema = new Schema(optional(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "struct3", Types.StructType.of(
+                    optional(4, "struct4", Types.StructType.of(
+                        optional(5, "struct5", Types.StructType.of(
+                            optional(6, "struct6", Types.StructType.of(
+                                optional(7, "aString", StringType.get()))))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedMaps() {
+    Schema newSchema = new Schema(optional(1, "struct", Types.MapType.ofOptional(
+        2, 3, StringType.get(), Types.MapType.ofOptional(
+            4, 5, StringType.get(), Types.MapType.ofOptional(
+                6, 7, StringType.get(), Types.MapType.ofOptional(
+                    8 ,9, StringType.get(), Types.MapType.ofOptional(
+                        10 ,11, StringType.get(), Types.MapType.ofOptional(
+                            12, 13, StringType.get(), StringType.get()))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelList() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aList.element: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, LongType.get())));
+    new SchemaUpdate(currentSchema, 2).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapValue() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.value: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), LongType.get())));
+    Schema apply = new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+    System.out.println(apply.toString());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapKey() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.key: string -> uuid");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , UUIDType.get(), StringType.get())));
+    new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // int	32-bit signed integers -> Can promote to long
+  public void testTypePromoteIntegerToLong() {
+    Schema currentSchema = new Schema(required(1, "aCol", IntegerType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", LongType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(LongType.get(), applied.asStruct().fields().get(0).type());
+  }
+
+  @Test
+  // float	32-bit IEEE 754 floating point -> Can promote to double
+  public void testTypePromoteFloatToDouble() {
+    Schema currentSchema = new Schema(required(1, "aCol", FloatType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", DoubleType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(DoubleType.get(), applied.asStruct().fields().get(0).type());
+    // Doesn't work Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    // java.lang.AssertionError:
+    // Expected :struct<1: aCol: required double>
+    // Actual   :struct<1: aCol: required double ()>
+  }
+
+  @Test
+  public void testInvalidTypePromoteDoubleToFloat() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aCol: double -> float");
+
+    Schema currentSchema = new Schema(required(1, "aCol", DoubleType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", FloatType.get()));
+
+    new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // decimal(P,S)	Fixed-point decimal; precision P, scale S	-> Scale is fixed [1], precision must be 38 or less
+  public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() {
+    Schema currentSchema = new Schema(required(1, "aCol", DecimalType.of(20, 1)));
+    Schema newSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1)));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddPrimitiveToNestedStruct() {
+    Schema schema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+          optional(2, "struct2", Types.StructType.of(
+                  optional(3, "list", Types.ListType.ofOptional(
+                      4, Types.StructType.of(
+                          optional(5, "value", StringType.get())))))))));
+
+    Schema newSchema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "list", Types.ListType.ofOptional(
+                    4, Types.StructType.of(
+                        optional(5, "time", TimeType.get())))))))));

Review comment:
       Dunno really, do you account for renames in your use-case?
   If so, do you have a plan on how to push those changes down to Iceberg APIs, do you know before committing to Iceberg what the effective columns you're renaming are? Would we rely on Iceberg to perform the renames implicitly, how would it differentiate between an add or a rename?
    
   In the particular use-case I've described I didn't account for that requirement. Observed schemas don't require column renames.
   However I assume that the only way we could reliably implement this is to reconcile column name changes by relying on the field ids though, right? 
   Also for this particular use-case of sub-schemas getting unioned to support column renames the operations are additive so they can be running out of order - eventually the union schema is the same no matter the order in which the observed schemas were committed. 
   But for things like renames and deletes we'd need to account for order too, right?
   I hate building things that need to account for order cause most of the time I get them wrong.
   So I'd say that I don't think that we can combine the two implementations into a single method...
   Wdyt?
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
fbocse commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r454374562



##########
File path: core/src/test/java/org/apache/iceberg/TestSchemaUpdateSync.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.types.Type.PrimitiveType;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.BinaryType;
+import org.apache.iceberg.types.Types.BooleanType;
+import org.apache.iceberg.types.Types.DateType;
+import org.apache.iceberg.types.Types.DecimalType;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.FixedType;
+import org.apache.iceberg.types.Types.FloatType;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.TimeType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.types.Types.UUIDType;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestSchemaUpdateSync {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private static List<? extends PrimitiveType> primitiveTypes() {
+    return Lists.newArrayList(StringType.get(),
+        TimeType.get(),
+        TimestampType.withoutZone(),
+        TimestampType.withZone(),
+        UUIDType.get(),
+        DateType.get(),
+        BooleanType.get(),
+        BinaryType.get(),
+        DoubleType.get(),
+        IntegerType.get(),
+        FixedType.ofLength(10),
+        DecimalType.of(10, 2),
+        LongType.get(),
+        FloatType.get()
+    );
+  }
+
+  private static NestedField[] primitiveFields(Integer initialValue, List<? extends PrimitiveType> primitiveTypes) {
+    AtomicInteger i = new AtomicInteger(initialValue);
+    return primitiveTypes.stream()
+        .map(type -> optional(i.incrementAndGet(), type.toString(),
+            Types.fromPrimitiveString(type.toString()))).toArray(NestedField[]::new);
+  }
+
+  @Test
+  public void testAddTopLevelPrimitives() {
+    Schema newSchema = new Schema(primitiveFields(0, primitiveTypes()));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddTopLevelListOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aList", Types.ListType.ofOptional(2, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelMapOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aMap", Types.MapType.ofOptional(2, 3, primitiveType, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelStructOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(currentSchema).apply();
+      Assert.assertEquals(currentSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitive() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+      Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitives() {
+    Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+    Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+        primitiveFields(1, primitiveTypes()))));
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedLists() {
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2,
+            Types.ListType.ofOptional(3,
+                Types.ListType.ofOptional(4,
+                    Types.ListType.ofOptional(5,
+                        Types.ListType.ofOptional(6,
+                            Types.ListType.ofOptional(7,
+                                Types.ListType.ofOptional(8,
+                                    Types.ListType.ofOptional(9,
+                                        Types.ListType.ofOptional(10,
+                                            DecimalType.of(11, 20))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedStruct() {
+    Schema newSchema = new Schema(optional(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "struct3", Types.StructType.of(
+                    optional(4, "struct4", Types.StructType.of(
+                        optional(5, "struct5", Types.StructType.of(
+                            optional(6, "struct6", Types.StructType.of(
+                                optional(7, "aString", StringType.get()))))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedMaps() {
+    Schema newSchema = new Schema(optional(1, "struct", Types.MapType.ofOptional(
+        2, 3, StringType.get(), Types.MapType.ofOptional(
+            4, 5, StringType.get(), Types.MapType.ofOptional(
+                6, 7, StringType.get(), Types.MapType.ofOptional(
+                    8 ,9, StringType.get(), Types.MapType.ofOptional(
+                        10 ,11, StringType.get(), Types.MapType.ofOptional(
+                            12, 13, StringType.get(), StringType.get()))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelList() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aList.element: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, LongType.get())));
+    new SchemaUpdate(currentSchema, 2).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapValue() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.value: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), LongType.get())));
+    Schema apply = new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+    System.out.println(apply.toString());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapKey() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.key: string -> uuid");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , UUIDType.get(), StringType.get())));
+    new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // int	32-bit signed integers -> Can promote to long
+  public void testTypePromoteIntegerToLong() {
+    Schema currentSchema = new Schema(required(1, "aCol", IntegerType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", LongType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(LongType.get(), applied.asStruct().fields().get(0).type());
+  }
+
+  @Test
+  // float	32-bit IEEE 754 floating point -> Can promote to double
+  public void testTypePromoteFloatToDouble() {
+    Schema currentSchema = new Schema(required(1, "aCol", FloatType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", DoubleType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(DoubleType.get(), applied.asStruct().fields().get(0).type());
+    // Doesn't work Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    // java.lang.AssertionError:
+    // Expected :struct<1: aCol: required double>
+    // Actual   :struct<1: aCol: required double ()>
+  }
+
+  @Test
+  public void testInvalidTypePromoteDoubleToFloat() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aCol: double -> float");
+
+    Schema currentSchema = new Schema(required(1, "aCol", DoubleType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", FloatType.get()));
+
+    new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // decimal(P,S)	Fixed-point decimal; precision P, scale S	-> Scale is fixed [1], precision must be 38 or less
+  public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() {
+    Schema currentSchema = new Schema(required(1, "aCol", DecimalType.of(20, 1)));
+    Schema newSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1)));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddPrimitiveToNestedStruct() {
+    Schema schema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+          optional(2, "struct2", Types.StructType.of(
+                  optional(3, "list", Types.ListType.ofOptional(
+                      4, Types.StructType.of(
+                          optional(5, "value", StringType.get())))))))));
+
+    Schema newSchema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "list", Types.ListType.ofOptional(
+                    4, Types.StructType.of(
+                        optional(5, "time", TimeType.get())))))))));

Review comment:
       Dunno really, do you need renames in your use-case?
   If so, do you have a plan on how to push those changes down to Iceberg APIs, do you know before committing to Iceberg what the effective columns you're renaming are? Would we rely on Iceberg to perform the renames implicitly, how would it differentiate between an add or a rename?
    
   In the particular use-case I've described I didn't account for that requirement. Observed schemas don't require column renames.
   However I assume that the only way we could reliably implement this is to reconcile column name changes by relying on the field ids though, right? 
   Also for this particular use-case of sub-schemas getting unioned to support column renames the operations are additive so they can be running out of order - eventually the union schema is the same no matter the order in which the observed schemas were committed. 
   But for things like renames and deletes we'd need to account for order too, right?
   I hate building things that need to account for order cause most of the time I get them wrong.
   So I'd say that I don't think that we can combine the two implementations into a single method...
   Wdyt?
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
fbocse commented on pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#issuecomment-685740015


   @rdblue @rdsd PR check in travis CI seems stuck, tried to reschedule it by closing/reopening PR but didn't work, any suggestions on how to get the hook working and build validated? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
fbocse commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r483199490



##########
File path: api/src/main/java/org/apache/iceberg/UpdateSchema.java
##########
@@ -361,4 +361,27 @@ default UpdateSchema updateColumn(String name, Type.PrimitiveType newType, Strin
    *                                  change conflicts with other changes.
    */
   UpdateSchema moveAfter(String name, String afterName);
+
+
+  /**
+   * Applies all field additions and updates from the provided new schema to the existing schema so

Review comment:
       @rdblue is this javadoc overdoing it a bit?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
fbocse commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r482069642



##########
File path: core/src/main/java/org/apache/iceberg/SchemaUpdate.java
##########
@@ -308,6 +309,12 @@ public UpdateSchema moveAfter(String name, String afterName) {
     return this;
   }
 
+  @Override
+  public UpdateSchema unionWithByFieldName(Schema newSchema) {
+    UnionByNameVisitor.visit(this, schema, newSchema);

Review comment:
       @rdblue you're right, the implementation, while a bit more complex than previous one feels less complicated.

##########
File path: core/src/main/java/org/apache/iceberg/SchemaUpdate.java
##########
@@ -308,6 +309,12 @@ public UpdateSchema moveAfter(String name, String afterName) {
     return this;
   }
 
+  @Override
+  public UpdateSchema unionWithByFieldName(Schema newSchema) {
+    UnionByNameVisitor.visit(this, schema, newSchema);

Review comment:
       @rdblue you're right, the implementation, while a bit more complex than previous one, feels less complicated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
fbocse commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r482066555



##########
File path: api/src/main/java/org/apache/iceberg/UpdateSchema.java
##########
@@ -361,4 +361,14 @@ default UpdateSchema updateColumn(String name, Type.PrimitiveType newType, Strin
    *                                  change conflicts with other changes.
    */
   UpdateSchema moveAfter(String name, String afterName);
+
+
+  /**
+   * Applies all the additions and updates [type widening, field documentation]
+   * from the provided new schema
+   *
+   * @param newSchema - Input schema from which updates are applied
+   * @return this for method chaining
+   */
+  UpdateSchema unionWithByFieldName(Schema newSchema);

Review comment:
       @rdblue @rdsr pls weigh in on the naming of this new API - I went for something that's as accurate as possible but pls drop a note in case you have a better suggestion for it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse closed pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
fbocse closed pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r460583211



##########
File path: core/src/test/java/org/apache/iceberg/TestSchemaUpdateSync.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.types.Type.PrimitiveType;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.BinaryType;
+import org.apache.iceberg.types.Types.BooleanType;
+import org.apache.iceberg.types.Types.DateType;
+import org.apache.iceberg.types.Types.DecimalType;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.FixedType;
+import org.apache.iceberg.types.Types.FloatType;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.TimeType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.types.Types.UUIDType;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestSchemaUpdateSync {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private static List<? extends PrimitiveType> primitiveTypes() {
+    return Lists.newArrayList(StringType.get(),
+        TimeType.get(),
+        TimestampType.withoutZone(),
+        TimestampType.withZone(),
+        UUIDType.get(),
+        DateType.get(),
+        BooleanType.get(),
+        BinaryType.get(),
+        DoubleType.get(),
+        IntegerType.get(),
+        FixedType.ofLength(10),
+        DecimalType.of(10, 2),
+        LongType.get(),
+        FloatType.get()
+    );
+  }
+
+  private static NestedField[] primitiveFields(Integer initialValue, List<? extends PrimitiveType> primitiveTypes) {
+    AtomicInteger i = new AtomicInteger(initialValue);
+    return primitiveTypes.stream()
+        .map(type -> optional(i.incrementAndGet(), type.toString(),
+            Types.fromPrimitiveString(type.toString()))).toArray(NestedField[]::new);
+  }
+
+  @Test
+  public void testAddTopLevelPrimitives() {
+    Schema newSchema = new Schema(primitiveFields(0, primitiveTypes()));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddTopLevelListOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aList", Types.ListType.ofOptional(2, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelMapOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aMap", Types.MapType.ofOptional(2, 3, primitiveType, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelStructOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(currentSchema).apply();
+      Assert.assertEquals(currentSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitive() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+      Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitives() {
+    Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+    Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+        primitiveFields(1, primitiveTypes()))));
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedLists() {
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2,
+            Types.ListType.ofOptional(3,
+                Types.ListType.ofOptional(4,
+                    Types.ListType.ofOptional(5,
+                        Types.ListType.ofOptional(6,
+                            Types.ListType.ofOptional(7,
+                                Types.ListType.ofOptional(8,
+                                    Types.ListType.ofOptional(9,
+                                        Types.ListType.ofOptional(10,
+                                            DecimalType.of(11, 20))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedStruct() {
+    Schema newSchema = new Schema(optional(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "struct3", Types.StructType.of(
+                    optional(4, "struct4", Types.StructType.of(
+                        optional(5, "struct5", Types.StructType.of(
+                            optional(6, "struct6", Types.StructType.of(
+                                optional(7, "aString", StringType.get()))))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedMaps() {
+    Schema newSchema = new Schema(optional(1, "struct", Types.MapType.ofOptional(
+        2, 3, StringType.get(), Types.MapType.ofOptional(
+            4, 5, StringType.get(), Types.MapType.ofOptional(
+                6, 7, StringType.get(), Types.MapType.ofOptional(
+                    8 ,9, StringType.get(), Types.MapType.ofOptional(
+                        10 ,11, StringType.get(), Types.MapType.ofOptional(
+                            12, 13, StringType.get(), StringType.get()))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelList() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aList.element: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, LongType.get())));
+    new SchemaUpdate(currentSchema, 2).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapValue() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.value: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), LongType.get())));
+    Schema apply = new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+    System.out.println(apply.toString());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapKey() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.key: string -> uuid");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , UUIDType.get(), StringType.get())));
+    new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // int	32-bit signed integers -> Can promote to long
+  public void testTypePromoteIntegerToLong() {
+    Schema currentSchema = new Schema(required(1, "aCol", IntegerType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", LongType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(LongType.get(), applied.asStruct().fields().get(0).type());
+  }
+
+  @Test
+  // float	32-bit IEEE 754 floating point -> Can promote to double
+  public void testTypePromoteFloatToDouble() {
+    Schema currentSchema = new Schema(required(1, "aCol", FloatType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", DoubleType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(DoubleType.get(), applied.asStruct().fields().get(0).type());
+    // Doesn't work Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    // java.lang.AssertionError:
+    // Expected :struct<1: aCol: required double>
+    // Actual   :struct<1: aCol: required double ()>
+  }
+
+  @Test
+  public void testInvalidTypePromoteDoubleToFloat() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aCol: double -> float");
+
+    Schema currentSchema = new Schema(required(1, "aCol", DoubleType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", FloatType.get()));
+
+    new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // decimal(P,S)	Fixed-point decimal; precision P, scale S	-> Scale is fixed [1], precision must be 38 or less
+  public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() {
+    Schema currentSchema = new Schema(required(1, "aCol", DecimalType.of(20, 1)));
+    Schema newSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1)));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddPrimitiveToNestedStruct() {
+    Schema schema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+          optional(2, "struct2", Types.StructType.of(
+                  optional(3, "list", Types.ListType.ofOptional(
+                      4, Types.StructType.of(
+                          optional(5, "value", StringType.get())))))))));
+
+    Schema newSchema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "list", Types.ListType.ofOptional(
+                    4, Types.StructType.of(
+                        optional(5, "time", TimeType.get())))))))));

Review comment:
       Yea, @rdblue had some thoughts on renames as well https://github.com/apache/iceberg/pull/759#discussion_r382871744 . I'm not sure if that thinking has evolved in the last meetup.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r485078335



##########
File path: api/src/main/java/org/apache/iceberg/UpdateSchema.java
##########
@@ -361,4 +361,27 @@ default UpdateSchema updateColumn(String name, Type.PrimitiveType newType, Strin
    *                                  change conflicts with other changes.
    */
   UpdateSchema moveAfter(String name, String afterName);
+
+
+  /**
+   * Applies all field additions and updates from the provided new schema to the existing schema so

Review comment:
       Looks good to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r482500311



##########
File path: core/src/main/java/org/apache/iceberg/schema/UnionByNameVisitor.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.schema;
+
+import java.util.List;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Visitor class that accumulates the set of changes needed to evolve an existing schema into the union of the
+ * existing and a new schema. Changes are added to an {@link UpdateSchema} operation.
+ */
+public class UnionByNameVisitor extends SchemaWithPartnerVisitor<Integer, Boolean> {
+
+  private final UpdateSchema api;
+  private final Schema partnerSchema;
+
+  private UnionByNameVisitor(UpdateSchema api, Schema partnerSchema) {
+    this.api = api;
+    this.partnerSchema = partnerSchema;
+  }
+
+  /**
+   * Adds changes needed to produce a union of two schemas to an {@link UpdateSchema} operation.
+   * <p>
+   * Changes are accumulated to evolve the existingSchema into a union with newSchema.
+   *
+   * @param api an UpdateSchema for adding changes
+   * @param existingSchema an existing schema
+   * @param newSchema a new schema to compare with the existing
+   */
+  public static void visit(UpdateSchema api, Schema existingSchema, Schema newSchema) {
+    visit(newSchema, -1, new UnionByNameVisitor(api, existingSchema), new PartnerIdByNameAccessors(existingSchema));
+  }
+
+  @Override
+  public Boolean struct(Types.StructType struct, Integer partnerId, List<Boolean> missingPositions) {
+    if (partnerId == null) {
+      return true;
+    }
+
+    List<Types.NestedField> fields = struct.fields();
+    Types.StructType partnerStruct = findFieldType(partnerId).asStructType();
+    IntStream.range(0, missingPositions.size())
+        .forEach(pos -> {
+          Boolean isMissing = missingPositions.get(pos);
+          Types.NestedField field = fields.get(pos);
+          if (isMissing) {
+            addColumn(partnerId, field);
+          } else {
+            updateColumn(field, partnerStruct.field(field.name()));
+          }
+        });
+
+    return false;
+  }
+
+  @Override
+  public Boolean field(Types.NestedField field, Integer partnerId, Boolean isFieldMissing) {
+    return partnerId == null;
+  }
+
+  @Override
+  public Boolean list(Types.ListType list, Integer partnerId, Boolean isElementMissing) {
+    if (partnerId == null) {
+      return true;
+    }
+
+    Preconditions.checkState(!isElementMissing, "Error traversing schemas: element is missing, but list is present");
+
+    Types.ListType partnerList = findFieldType(partnerId).asListType();
+    updateColumn(list.fields().get(0), partnerList.fields().get(0));
+
+    return false;
+  }
+
+  @Override
+  public Boolean map(Types.MapType map, Integer partnerId, Boolean isKeyMissing, Boolean isValueMissing) {
+    if (partnerId == null) {
+      return true;
+    }
+
+    Preconditions.checkState(!isKeyMissing, "Error traversing schemas: key is missing, but map is present");
+    Preconditions.checkState(!isValueMissing, "Error traversing schemas: value is missing, but map is present");
+
+    Types.MapType partnerMap = findFieldType(partnerId).asMapType();
+    updateColumn(map.fields().get(0), partnerMap.fields().get(0));
+    updateColumn(map.fields().get(1), partnerMap.fields().get(1));
+
+    return false;
+  }
+
+  @Override
+  public Boolean primitive(Type.PrimitiveType primitive, Integer partnerId) {
+    return partnerId == null;
+  }
+
+  private Type findFieldType(int fieldId) {
+    if (fieldId == -1) {
+      return partnerSchema.asStruct();
+    } else {
+      return partnerSchema.findField(fieldId).type();
+    }
+  }
+
+  private void addColumn(int parentId, Types.NestedField field) {
+    String parentName = partnerSchema.findColumnName(parentId);
+    api.addColumn(parentName, field.name(), field.type(), field.doc());
+  }
+
+  private void updateColumn(Types.NestedField field, Types.NestedField existingField) {
+    String fullName = partnerSchema.findColumnName(existingField.fieldId());
+
+    boolean needsOptionalUpdate = field.isOptional() && existingField.isRequired();
+    boolean needsTypeUpdate = field.type().isPrimitiveType() && !field.type().equals(existingField.type());
+    boolean needsDocUpdate = field.doc() != null && !field.doc().equals(existingField.doc());
+
+    if (needsOptionalUpdate) {
+      api.makeColumnOptional(fullName);
+    }
+
+    if (needsTypeUpdate) {
+      api.updateColumn(fullName, field.type().asPrimitiveType());
+    }
+
+    if (needsDocUpdate) {
+      api.updateColumnDoc(fullName, field.doc());
+    }
+  }
+
+  private static class PartnerIdByNameAccessors implements PartnerAccessors<Integer> {
+    private final Schema partnerSchema;
+
+    private PartnerIdByNameAccessors(Schema partnerSchema) {
+      this.partnerSchema = partnerSchema;
+    }
+
+    @Override
+    public Integer fieldPartner(Integer partnerFieldId, int fieldId, String name) {
+      Types.StructType struct;
+      if (partnerFieldId == -1) {

Review comment:
       I think this is correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
fbocse commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r451755790



##########
File path: core/src/test/java/org/apache/iceberg/TestSchemaUpdateSync.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.types.Type.PrimitiveType;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.BinaryType;
+import org.apache.iceberg.types.Types.BooleanType;
+import org.apache.iceberg.types.Types.DateType;
+import org.apache.iceberg.types.Types.DecimalType;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.FixedType;
+import org.apache.iceberg.types.Types.FloatType;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.TimeType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.types.Types.UUIDType;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestSchemaUpdateSync {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private static List<? extends PrimitiveType> primitiveTypes() {
+    return Lists.newArrayList(StringType.get(),
+        TimeType.get(),
+        TimestampType.withoutZone(),
+        TimestampType.withZone(),
+        UUIDType.get(),
+        DateType.get(),
+        BooleanType.get(),
+        BinaryType.get(),
+        DoubleType.get(),
+        IntegerType.get(),
+        FixedType.ofLength(10),
+        DecimalType.of(10, 2),
+        LongType.get(),
+        FloatType.get()
+    );
+  }
+
+  private static NestedField[] primitiveFields(Integer initialValue, List<? extends PrimitiveType> primitiveTypes) {
+    AtomicInteger i = new AtomicInteger(initialValue);
+    return primitiveTypes.stream()
+        .map(type -> optional(i.incrementAndGet(), type.toString(),
+            Types.fromPrimitiveString(type.toString()))).toArray(NestedField[]::new);
+  }
+
+  @Test
+  public void testAddTopLevelPrimitives() {
+    Schema newSchema = new Schema(primitiveFields(0, primitiveTypes()));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddTopLevelListOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aList", Types.ListType.ofOptional(2, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelMapOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema newSchema = new Schema(optional(1, "aMap", Types.MapType.ofOptional(2, 3, primitiveType, primitiveType)));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddTopLevelStructOfPrimitives() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(currentSchema).apply();
+      Assert.assertEquals(currentSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitive() {
+    for(PrimitiveType primitiveType : primitiveTypes()) {
+      Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+      Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+          optional(2, "primitive", primitiveType))));
+      Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+      Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    }
+  }
+
+  @Test
+  public void testAddNestedPrimitives() {
+    Schema currentSchema = new Schema(optional(1, "aStruct", Types.StructType.of()));
+    Schema newSchema = new Schema(optional(1, "aStruct", Types.StructType.of(
+        primitiveFields(1, primitiveTypes()))));
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedLists() {
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2,
+            Types.ListType.ofOptional(3,
+                Types.ListType.ofOptional(4,
+                    Types.ListType.ofOptional(5,
+                        Types.ListType.ofOptional(6,
+                            Types.ListType.ofOptional(7,
+                                Types.ListType.ofOptional(8,
+                                    Types.ListType.ofOptional(9,
+                                        Types.ListType.ofOptional(10,
+                                            DecimalType.of(11, 20))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedStruct() {
+    Schema newSchema = new Schema(optional(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "struct3", Types.StructType.of(
+                    optional(4, "struct4", Types.StructType.of(
+                        optional(5, "struct5", Types.StructType.of(
+                            optional(6, "struct6", Types.StructType.of(
+                                optional(7, "aString", StringType.get()))))))))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddNestedMaps() {
+    Schema newSchema = new Schema(optional(1, "struct", Types.MapType.ofOptional(
+        2, 3, StringType.get(), Types.MapType.ofOptional(
+            4, 5, StringType.get(), Types.MapType.ofOptional(
+                6, 7, StringType.get(), Types.MapType.ofOptional(
+                    8 ,9, StringType.get(), Types.MapType.ofOptional(
+                        10 ,11, StringType.get(), Types.MapType.ofOptional(
+                            12, 13, StringType.get(), StringType.get()))))))));
+    Schema applied = new SchemaUpdate(new Schema(), 0).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelList() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aList.element: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aList",
+        Types.ListType.ofOptional(2, LongType.get())));
+    new SchemaUpdate(currentSchema, 2).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapValue() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.value: string -> long");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3, StringType.get(), LongType.get())));
+    Schema apply = new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+    System.out.println(apply.toString());
+  }
+
+  @Test
+  public void testDetectInvalidTopLevelMapKey() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aMap.key: string -> uuid");
+
+    Schema currentSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , StringType.get(), StringType.get())));
+    Schema newSchema = new Schema(optional(1, "aMap",
+        Types.MapType.ofOptional(2,3 , UUIDType.get(), StringType.get())));
+    new SchemaUpdate(currentSchema, 3).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // int	32-bit signed integers -> Can promote to long
+  public void testTypePromoteIntegerToLong() {
+    Schema currentSchema = new Schema(required(1, "aCol", IntegerType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", LongType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(LongType.get(), applied.asStruct().fields().get(0).type());
+  }
+
+  @Test
+  // float	32-bit IEEE 754 floating point -> Can promote to double
+  public void testTypePromoteFloatToDouble() {
+    Schema currentSchema = new Schema(required(1, "aCol", FloatType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", DoubleType.get()));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(1, applied.asStruct().fields().size());
+    Assert.assertEquals(DoubleType.get(), applied.asStruct().fields().get(0).type());
+    // Doesn't work Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+    // java.lang.AssertionError:
+    // Expected :struct<1: aCol: required double>
+    // Actual   :struct<1: aCol: required double ()>
+  }
+
+  @Test
+  public void testInvalidTypePromoteDoubleToFloat() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot change column type: aCol: double -> float");
+
+    Schema currentSchema = new Schema(required(1, "aCol", DoubleType.get()));
+    Schema newSchema = new Schema(required(1, "aCol", FloatType.get()));
+
+    new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+  }
+
+  @Test
+  // decimal(P,S)	Fixed-point decimal; precision P, scale S	-> Scale is fixed [1], precision must be 38 or less
+  public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() {
+    Schema currentSchema = new Schema(required(1, "aCol", DecimalType.of(20, 1)));
+    Schema newSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1)));
+
+    Schema applied = new SchemaUpdate(currentSchema, 1).upsertSchema(newSchema).apply();
+    Assert.assertEquals(newSchema.asStruct(), applied.asStruct());
+  }
+
+  @Test
+  public void testAddPrimitiveToNestedStruct() {
+    Schema schema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+          optional(2, "struct2", Types.StructType.of(
+                  optional(3, "list", Types.ListType.ofOptional(
+                      4, Types.StructType.of(
+                          optional(5, "value", StringType.get())))))))));
+
+    Schema newSchema = new Schema(
+        required(1, "struct1", Types.StructType.of(
+            optional(2, "struct2", Types.StructType.of(
+                optional(3, "list", Types.ListType.ofOptional(
+                    4, Types.StructType.of(
+                        optional(5, "time", TimeType.get())))))))));

Review comment:
       Right, nor should it, the main use-case for this API is to accommodate schema unions as @rdblue's summed it up during our chat  in the Iceberg sync-up today.
   The use-case is as follows:
   
   1.  think a dataset with a large schema - call it `effective schema` 
   2. we get data that only contains particular columns - those columns make up only for a subset of the large schema - call it `observed schema` 
   3. as these chunks of data and their respective observed schemas are being landed in the iceberg table we want to keep the table schema on-par with the union of these observed schemas
   
   So in this particular `expected` is the union of `schema` and `newSchema`. 
   Does this make sense? 
   Does this resemble your use-case as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r464520655



##########
File path: core/src/main/java/org/apache/iceberg/SchemaUpdate.java
##########
@@ -308,6 +314,162 @@ public UpdateSchema moveAfter(String name, String afterName) {
     return this;
   }
 
+  @Override
+  public UpdateSchema upsertSchema(Schema newSchema) {
+    TypeUtil.visit(newSchema, new ApplyUpdates(this, schema));
+    return this;
+  }
+
+  private static class ApplyUpdates extends TypeUtil.SchemaVisitor<Void> {
+    private static final Joiner DOT = Joiner.on(".");
+    private final Deque<String> fieldNames = Lists.newLinkedList();
+    private NestedField currentField = null;
+
+    private final Schema baseSchema;
+    private final UpdateSchema api;
+    private final Map<String, Integer> indexByName;
+
+    private ApplyUpdates(UpdateSchema api, Schema baseSchema) {
+      this.api = api;
+      this.baseSchema = baseSchema;
+      this.indexByName = TypeUtil.indexByName(baseSchema.asStruct());
+    }
+
+    @Override
+    public void beforeListElement(NestedField elementField) {
+      beforeField(elementField);
+    }
+
+    @Override
+    public void afterListElement(NestedField elementField) {
+      afterField(elementField);
+    }
+
+    @Override
+    public void beforeMapKey(Types.NestedField keyField) {
+      beforeField(keyField);
+    }
+
+    @Override
+    public void afterMapKey(Types.NestedField keyField) {
+      afterField(keyField);
+    }
+
+    @Override
+    public void beforeMapValue(Types.NestedField valueField) {
+      beforeField(valueField);
+    }
+
+    @Override
+    public void afterMapValue(Types.NestedField valueField) {
+      afterField(valueField);
+    }
+
+    @Override
+    public void beforeField(NestedField field) {
+      fieldNames.push(field.name()); // we don't expect `element` to show up - it breaks
+      currentField = field;
+    }
+
+    @Override
+    public void afterField(NestedField field) {
+      fieldNames.pop();
+    }
+
+    @Override
+    public Void field(NestedField field, Void fieldResult) {
+      return super.field(field, fieldResult);
+    }
+
+    @Override
+    public Void list(ListType list, Void elementResult) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), Types.ListType.ofOptional(0,   list.elementType()), ancestors());
+      } else if (!field.type().isListType()) {
+        throw new IllegalArgumentException(
+            "Cannot update existing field: " + fullName + " of type: " + field
+                .type() + " to type list");
+      }
+      return null;
+    }
+
+    @Override
+    public Void map(MapType map, Void keyResult, Void valueResult) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), Types.MapType.ofOptional(0, 1, map.keyType(), map.valueType()), ancestors());
+      } else if (!field.type().isMapType()) {
+        throw new IllegalArgumentException(
+            "Cannot update existing field: " + fullName + " of type: " + field
+                .type() + " to type map");
+      }
+      return null;
+    }
+
+    @Override
+    public Void struct(Types.StructType struct, List<Void> fieldResults) {
+      if (fieldNames.isEmpty()) {
+        return null; // this is the root struct
+      }
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), Types.StructType.of(struct.fields()), ancestors());
+      } else if (!field.type().isStructType()) {
+        throw new IllegalArgumentException("Cannot update existing field: " + fullName + " of type: " + field.type() +
+                " to type struct");
+      }
+      return null;
+    }
+
+    @Override
+    public Void primitive(PrimitiveType primitive) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      PrimitiveType newFieldType = Types.fromPrimitiveString(primitive.toString());
+      if (field == null) {
+        addColumn(currentField.name(), Types.fromPrimitiveString(primitive.toString()), ancestors());
+      } else if (!field.type().isPrimitiveType()) {
+        throw new IllegalArgumentException("Cannot update existing field: " + field.name() + " of type: " +
+                field.type() + " to primitive type: " + primitive.typeId().name());
+      } else if (!newFieldType.equals(field.type())) {
+        updateColumn(field.type().asPrimitiveType(), fullName, field.doc(), newFieldType, currentField.doc());
+      }
+      return null;
+    }
+
+    private String ancestors() {
+      if (fieldNames.isEmpty()) {
+        return "";
+      }
+      String head = fieldNames.removeFirst();
+      String join = DOT.join(fieldNames.descendingIterator());
+      fieldNames.addFirst(head);
+      return join;
+    }
+
+    private void updateColumn(PrimitiveType fieldType, String fullName, String fieldDoc, PrimitiveType newFieldType,
+                              String newDoc) {
+      if (!fieldType.equals(newFieldType)) {
+        api.updateColumn(fullName, newFieldType.asPrimitiveType());
+      } else if (newDoc != null && !newDoc.equals(fieldDoc)) {
+        api.updateColumnDoc(fullName, newDoc);
+      }
+    }
+
+    private void addColumn(String name, Type type, String ancestors) {
+      if (ancestors.isEmpty()) {
+        api.addColumn(null, name, type);
+      } else if (indexByName.containsKey(ancestors)) {
+        api.addColumn(ancestors, name, type);
+      }
+      // At this point the parent of this column hasn't been added to the schema, not yet visited

Review comment:
       I had a hard time following the code and ended up using a debugger to figure out it wasn't trying to add columns in missing structs. It was good to see that there are test cases for this, but I think we can avoid the need for calling `addColumn` when it should not be added by using the visitor's return values.
   
   I prototyped a change that returns a Boolean that indicates whether a column is missing. That way, each method checks whether its equivalent is in the other struct. If it is not, the method simply returns true because the whole type should be added. If it is present, then containers may update the type, docs, and required boolean. I think that's easier to follow, so I'll post the changes for you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
fbocse commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r482061859



##########
File path: core/src/main/java/org/apache/iceberg/SchemaUpdate.java
##########
@@ -308,6 +314,162 @@ public UpdateSchema moveAfter(String name, String afterName) {
     return this;
   }
 
+  @Override
+  public UpdateSchema upsertSchema(Schema newSchema) {
+    TypeUtil.visit(newSchema, new ApplyUpdates(this, schema));
+    return this;
+  }
+
+  private static class ApplyUpdates extends TypeUtil.SchemaVisitor<Void> {
+    private static final Joiner DOT = Joiner.on(".");
+    private final Deque<String> fieldNames = Lists.newLinkedList();
+    private NestedField currentField = null;
+
+    private final Schema baseSchema;
+    private final UpdateSchema api;
+    private final Map<String, Integer> indexByName;
+
+    private ApplyUpdates(UpdateSchema api, Schema baseSchema) {
+      this.api = api;
+      this.baseSchema = baseSchema;
+      this.indexByName = TypeUtil.indexByName(baseSchema.asStruct());
+    }
+
+    @Override
+    public void beforeListElement(NestedField elementField) {
+      beforeField(elementField);
+    }
+
+    @Override
+    public void afterListElement(NestedField elementField) {
+      afterField(elementField);
+    }
+
+    @Override
+    public void beforeMapKey(Types.NestedField keyField) {
+      beforeField(keyField);
+    }
+
+    @Override
+    public void afterMapKey(Types.NestedField keyField) {
+      afterField(keyField);
+    }
+
+    @Override
+    public void beforeMapValue(Types.NestedField valueField) {
+      beforeField(valueField);
+    }
+
+    @Override
+    public void afterMapValue(Types.NestedField valueField) {
+      afterField(valueField);
+    }
+
+    @Override
+    public void beforeField(NestedField field) {
+      fieldNames.push(field.name()); // we don't expect `element` to show up - it breaks
+      currentField = field;
+    }
+
+    @Override
+    public void afterField(NestedField field) {
+      fieldNames.pop();
+    }
+
+    @Override
+    public Void field(NestedField field, Void fieldResult) {
+      return super.field(field, fieldResult);
+    }
+
+    @Override
+    public Void list(ListType list, Void elementResult) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), Types.ListType.ofOptional(0,   list.elementType()), ancestors());
+      } else if (!field.type().isListType()) {
+        throw new IllegalArgumentException(
+            "Cannot update existing field: " + fullName + " of type: " + field
+                .type() + " to type list");
+      }
+      return null;
+    }
+
+    @Override
+    public Void map(MapType map, Void keyResult, Void valueResult) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), Types.MapType.ofOptional(0, 1, map.keyType(), map.valueType()), ancestors());
+      } else if (!field.type().isMapType()) {
+        throw new IllegalArgumentException(
+            "Cannot update existing field: " + fullName + " of type: " + field
+                .type() + " to type map");
+      }
+      return null;
+    }
+
+    @Override
+    public Void struct(Types.StructType struct, List<Void> fieldResults) {
+      if (fieldNames.isEmpty()) {
+        return null; // this is the root struct
+      }
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), Types.StructType.of(struct.fields()), ancestors());
+      } else if (!field.type().isStructType()) {
+        throw new IllegalArgumentException("Cannot update existing field: " + fullName + " of type: " + field.type() +
+                " to type struct");
+      }
+      return null;
+    }
+
+    @Override
+    public Void primitive(PrimitiveType primitive) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      PrimitiveType newFieldType = Types.fromPrimitiveString(primitive.toString());
+      if (field == null) {
+        addColumn(currentField.name(), Types.fromPrimitiveString(primitive.toString()), ancestors());
+      } else if (!field.type().isPrimitiveType()) {
+        throw new IllegalArgumentException("Cannot update existing field: " + field.name() + " of type: " +
+                field.type() + " to primitive type: " + primitive.typeId().name());
+      } else if (!newFieldType.equals(field.type())) {
+        updateColumn(field.type().asPrimitiveType(), fullName, field.doc(), newFieldType, currentField.doc());
+      }
+      return null;
+    }
+
+    private String ancestors() {
+      if (fieldNames.isEmpty()) {
+        return "";
+      }
+      String head = fieldNames.removeFirst();
+      String join = DOT.join(fieldNames.descendingIterator());
+      fieldNames.addFirst(head);
+      return join;
+    }
+
+    private void updateColumn(PrimitiveType fieldType, String fullName, String fieldDoc, PrimitiveType newFieldType,
+                              String newDoc) {
+      if (!fieldType.equals(newFieldType)) {
+        api.updateColumn(fullName, newFieldType.asPrimitiveType());
+      } else if (newDoc != null && !newDoc.equals(fieldDoc)) {
+        api.updateColumnDoc(fullName, newDoc);
+      }
+    }
+
+    private void addColumn(String name, Type type, String ancestors) {
+      if (ancestors.isEmpty()) {
+        api.addColumn(null, name, type);
+      } else if (indexByName.containsKey(ancestors)) {
+        api.addColumn(ancestors, name, type);
+      }
+      // At this point the parent of this column hasn't been added to the schema, not yet visited

Review comment:
       Indeed, this aspect of the implementation will greatly improve the efficiency for the union of distinct schemas.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
fbocse commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r482809860



##########
File path: api/src/main/java/org/apache/iceberg/UpdateSchema.java
##########
@@ -361,4 +361,14 @@ default UpdateSchema updateColumn(String name, Type.PrimitiveType newType, Strin
    *                                  change conflicts with other changes.
    */
   UpdateSchema moveAfter(String name, String afterName);
+
+
+  /**
+   * Applies all the additions and updates [type widening, field documentation]
+   * from the provided new schema
+   *
+   * @param newSchema - Input schema from which updates are applied
+   * @return this for method chaining
+   */
+  UpdateSchema unionWithByFieldName(Schema newSchema);

Review comment:
       sure, I like either one @rdsr you make the call pls :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
fbocse commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r482069642



##########
File path: core/src/main/java/org/apache/iceberg/SchemaUpdate.java
##########
@@ -308,6 +309,12 @@ public UpdateSchema moveAfter(String name, String afterName) {
     return this;
   }
 
+  @Override
+  public UpdateSchema unionWithByFieldName(Schema newSchema) {
+    UnionByNameVisitor.visit(this, schema, newSchema);

Review comment:
       @rdblue you're right, the implementation, while a bit more complex than the previous one, feels less complicated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#issuecomment-686151932


   @fbocse, this looks good to me! Once the public method name and Javadoc are fixed, I'm +1. It would also be nice to merge `TypeVisitor` back into `SchemaWithPartnerVisitor` unless there's some independent use for it that I'm missing.
   
   @rdsr, what do you think?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org