You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/08/03 16:23:18 UTC

[GitHub] [iceberg] rdblue commented on a change in pull request #1177: Add schema upsert API for adding new fields and updating type (where applicable) using an input schema

rdblue commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r464520655



##########
File path: core/src/main/java/org/apache/iceberg/SchemaUpdate.java
##########
@@ -308,6 +314,162 @@ public UpdateSchema moveAfter(String name, String afterName) {
     return this;
   }
 
+  @Override
+  public UpdateSchema upsertSchema(Schema newSchema) {
+    TypeUtil.visit(newSchema, new ApplyUpdates(this, schema));
+    return this;
+  }
+
+  private static class ApplyUpdates extends TypeUtil.SchemaVisitor<Void> {
+    private static final Joiner DOT = Joiner.on(".");
+    private final Deque<String> fieldNames = Lists.newLinkedList();
+    private NestedField currentField = null;
+
+    private final Schema baseSchema;
+    private final UpdateSchema api;
+    private final Map<String, Integer> indexByName;
+
+    private ApplyUpdates(UpdateSchema api, Schema baseSchema) {
+      this.api = api;
+      this.baseSchema = baseSchema;
+      this.indexByName = TypeUtil.indexByName(baseSchema.asStruct());
+    }
+
+    @Override
+    public void beforeListElement(NestedField elementField) {
+      beforeField(elementField);
+    }
+
+    @Override
+    public void afterListElement(NestedField elementField) {
+      afterField(elementField);
+    }
+
+    @Override
+    public void beforeMapKey(Types.NestedField keyField) {
+      beforeField(keyField);
+    }
+
+    @Override
+    public void afterMapKey(Types.NestedField keyField) {
+      afterField(keyField);
+    }
+
+    @Override
+    public void beforeMapValue(Types.NestedField valueField) {
+      beforeField(valueField);
+    }
+
+    @Override
+    public void afterMapValue(Types.NestedField valueField) {
+      afterField(valueField);
+    }
+
+    @Override
+    public void beforeField(NestedField field) {
+      fieldNames.push(field.name()); // we don't expect `element` to show up - it breaks
+      currentField = field;
+    }
+
+    @Override
+    public void afterField(NestedField field) {
+      fieldNames.pop();
+    }
+
+    @Override
+    public Void field(NestedField field, Void fieldResult) {
+      return super.field(field, fieldResult);
+    }
+
+    @Override
+    public Void list(ListType list, Void elementResult) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), Types.ListType.ofOptional(0,   list.elementType()), ancestors());
+      } else if (!field.type().isListType()) {
+        throw new IllegalArgumentException(
+            "Cannot update existing field: " + fullName + " of type: " + field
+                .type() + " to type list");
+      }
+      return null;
+    }
+
+    @Override
+    public Void map(MapType map, Void keyResult, Void valueResult) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), Types.MapType.ofOptional(0, 1, map.keyType(), map.valueType()), ancestors());
+      } else if (!field.type().isMapType()) {
+        throw new IllegalArgumentException(
+            "Cannot update existing field: " + fullName + " of type: " + field
+                .type() + " to type map");
+      }
+      return null;
+    }
+
+    @Override
+    public Void struct(Types.StructType struct, List<Void> fieldResults) {
+      if (fieldNames.isEmpty()) {
+        return null; // this is the root struct
+      }
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), Types.StructType.of(struct.fields()), ancestors());
+      } else if (!field.type().isStructType()) {
+        throw new IllegalArgumentException("Cannot update existing field: " + fullName + " of type: " + field.type() +
+                " to type struct");
+      }
+      return null;
+    }
+
+    @Override
+    public Void primitive(PrimitiveType primitive) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      PrimitiveType newFieldType = Types.fromPrimitiveString(primitive.toString());
+      if (field == null) {
+        addColumn(currentField.name(), Types.fromPrimitiveString(primitive.toString()), ancestors());
+      } else if (!field.type().isPrimitiveType()) {
+        throw new IllegalArgumentException("Cannot update existing field: " + field.name() + " of type: " +
+                field.type() + " to primitive type: " + primitive.typeId().name());
+      } else if (!newFieldType.equals(field.type())) {
+        updateColumn(field.type().asPrimitiveType(), fullName, field.doc(), newFieldType, currentField.doc());
+      }
+      return null;
+    }
+
+    private String ancestors() {
+      if (fieldNames.isEmpty()) {
+        return "";
+      }
+      String head = fieldNames.removeFirst();
+      String join = DOT.join(fieldNames.descendingIterator());
+      fieldNames.addFirst(head);
+      return join;
+    }
+
+    private void updateColumn(PrimitiveType fieldType, String fullName, String fieldDoc, PrimitiveType newFieldType,
+                              String newDoc) {
+      if (!fieldType.equals(newFieldType)) {
+        api.updateColumn(fullName, newFieldType.asPrimitiveType());
+      } else if (newDoc != null && !newDoc.equals(fieldDoc)) {
+        api.updateColumnDoc(fullName, newDoc);
+      }
+    }
+
+    private void addColumn(String name, Type type, String ancestors) {
+      if (ancestors.isEmpty()) {
+        api.addColumn(null, name, type);
+      } else if (indexByName.containsKey(ancestors)) {
+        api.addColumn(ancestors, name, type);
+      }
+      // At this point the parent of this column hasn't been added to the schema, not yet visited

Review comment:
       I had a hard time following the code and ended up using a debugger to figure out it wasn't trying to add columns in missing structs. It was good to see that there are test cases for this, but I think we can avoid the need for calling `addColumn` when it should not be added by using the visitor's return values.
   
   I prototyped a change that returns a Boolean that indicates whether a column is missing. That way, each method checks whether its equivalent is in the other struct. If it is not, the method simply returns true because the whole type should be added. If it is present, then containers may update the type, docs, and required boolean. I think that's easier to follow, so I'll post the changes for you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org