You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/09/24 18:19:04 UTC

[GitHub] [iceberg] wmoustafa commented on a diff in pull request #5704: Support non-optional union types and column projection in complex union for Avro

wmoustafa commented on code in PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#discussion_r977084204


##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -82,11 +83,45 @@ private static <T> T visitRecord(
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit((Type) null, branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+
+    // simple union case
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else { // complex union case
+      Preconditions.checkArgument(
+          type instanceof Types.StructType,
+          "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s",
+          type,
+          union);
+      Map<String, Integer> fieldNameToId =
+          (Map) union.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID);
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          String name =
+              branch.getType().equals(Schema.Type.RECORD)
+                  ? branch.getName()
+                  : branch.getType().getName();

Review Comment:
   You might add a comment explaining the logic here.



##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java:
##########
@@ -78,6 +78,10 @@ public static Type convert(Schema schema) {
     return AvroSchemaVisitor.visit(schema, new SchemaToType(schema));
   }
 
+  public static Type convertToDeriveNameMapping(Schema schema) {

Review Comment:
   For consistency with the other APIs, rename this to `visit` and provide a flag to indicate whether to derive name mapping?



##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -82,11 +83,45 @@ private static <T> T visitRecord(
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit((Type) null, branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+
+    // simple union case
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else { // complex union case
+      Preconditions.checkArgument(
+          type instanceof Types.StructType,
+          "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s",
+          type,
+          union);
+      Map<String, Integer> fieldNameToId =
+          (Map) union.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID);
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          String name =
+              branch.getType().equals(Schema.Type.RECORD)
+                  ? branch.getName()
+                  : branch.getType().getName();
+          if (fieldNameToId.containsKey(name)) {
+            int fieldId = fieldNameToId.get(name);
+            Types.NestedField branchType = type.asStructType().field(fieldId);
+            if (branchType != null) {
+              options.add(visit(branchType.type(), branch, visitor));
+            } else {
+              Type pseudoBranchType = AvroSchemaUtil.convert(branch);
+              options.add(visit(pseudoBranchType, branch, visitor));
+            }
+          } else {
+            options.add(visit((Type) null, branch, visitor));
+          }

Review Comment:
   You might add a comment of how one could end up here.



##########
core/src/main/java/org/apache/iceberg/avro/SchemaToType.java:
##########
@@ -87,30 +97,63 @@ public Type record(Schema record, List<String> names, List<Type> fieldTypes) {
       this.nextId = 0;
     }
 
+    Map<String, Integer> fieldNameToId = Maps.newHashMap();
     for (int i = 0; i < fields.size(); i += 1) {
       Schema.Field field = fields.get(i);
       Type fieldType = fieldTypes.get(i);
       int fieldId = getId(field);
 
-      if (AvroSchemaUtil.isOptionSchema(field.schema())) {
+      if (AvroSchemaUtil.isOptionSchema(field.schema())
+          || AvroSchemaUtil.isOptionalComplexUnion(field.schema())) {
         newFields.add(Types.NestedField.optional(fieldId, field.name(), fieldType, field.doc()));
       } else {
         newFields.add(Types.NestedField.required(fieldId, field.name(), fieldType, field.doc()));
       }
+      fieldNameToId.put(field.name(), fieldId);
+    }
+
+    if (deriveNameMapping && record.getObjectProp(AVRO_FIELD_NAME_TO_ICEBERG_ID) == null) {
+      record.addProp(AVRO_FIELD_NAME_TO_ICEBERG_ID, fieldNameToId);

Review Comment:
   Should we add a map to the entire union type field or a prop to each branch, similar to how records/structs work for example?



##########
core/src/main/java/org/apache/iceberg/avro/SchemaToType.java:
##########
@@ -38,6 +43,11 @@ class SchemaToType extends AvroSchemaVisitor<Type> {
     }
   }
 
+  SchemaToType(Schema root, boolean deriveNameMapping) {

Review Comment:
   I think it might be cleaner to extract the name mapping injection to another class that either extends `AvroSchemaVisitor` or `SchemaToType`.



##########
core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java:
##########
@@ -60,6 +60,7 @@ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
   @Override
   public void setSchema(Schema newFileSchema) {
     this.fileSchema = newFileSchema;
+    AvroSchemaUtil.convertToDeriveNameMapping(this.fileSchema);

Review Comment:
   I think this line should change/go away after addressing some of the other comments.



##########
core/src/main/java/org/apache/iceberg/avro/SchemaToType.java:
##########
@@ -87,30 +97,63 @@ public Type record(Schema record, List<String> names, List<Type> fieldTypes) {
       this.nextId = 0;
     }
 
+    Map<String, Integer> fieldNameToId = Maps.newHashMap();
     for (int i = 0; i < fields.size(); i += 1) {
       Schema.Field field = fields.get(i);
       Type fieldType = fieldTypes.get(i);
       int fieldId = getId(field);
 
-      if (AvroSchemaUtil.isOptionSchema(field.schema())) {
+      if (AvroSchemaUtil.isOptionSchema(field.schema())
+          || AvroSchemaUtil.isOptionalComplexUnion(field.schema())) {
         newFields.add(Types.NestedField.optional(fieldId, field.name(), fieldType, field.doc()));
       } else {
         newFields.add(Types.NestedField.required(fieldId, field.name(), fieldType, field.doc()));
       }
+      fieldNameToId.put(field.name(), fieldId);
+    }
+
+    if (deriveNameMapping && record.getObjectProp(AVRO_FIELD_NAME_TO_ICEBERG_ID) == null) {
+      record.addProp(AVRO_FIELD_NAME_TO_ICEBERG_ID, fieldNameToId);
     }
 
     return Types.StructType.of(newFields);
   }
 
   @Override
   public Type union(Schema union, List<Type> options) {
-    Preconditions.checkArgument(
-        AvroSchemaUtil.isOptionSchema(union), "Unsupported type: non-option union: %s", union);
-    // records, arrays, and maps will check nullability later
-    if (options.get(0) == null) {
-      return options.get(1);
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      // Optional simple union
+      // records, arrays, and maps will check nullability later
+      if (options.get(0) == null) {
+        return options.get(1);
+      } else {
+        return options.get(0);
+      }
     } else {
-      return options.get(0);
+      // Complex union
+      Map<String, Integer> fieldNameToId = Maps.newHashMap();
+      List<Types.NestedField> newFields = Lists.newArrayList();
+      newFields.add(Types.NestedField.required(allocateId(), "tag", Types.IntegerType.get()));
+
+      int tagIndex = 0;
+      for (Type type : options) {
+        if (type != null) {
+          int fieldId = allocateId();
+          Schema schema = union.getTypes().get(tagIndex);
+          newFields.add(Types.NestedField.optional(fieldId, "field" + tagIndex++, type));
+          String name =
+              schema.getType().equals(Schema.Type.RECORD)
+                  ? schema.getName()
+                  : schema.getType().getName();
+          fieldNameToId.put(name, fieldId);
+        }
+      }
+
+      if (deriveNameMapping && union.getObjectProp(AVRO_FIELD_NAME_TO_ICEBERG_ID) == null) {
+        union.addProp(AVRO_FIELD_NAME_TO_ICEBERG_ID, fieldNameToId);

Review Comment:
   This class is not expected to change the input schema.



##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -82,11 +83,45 @@ private static <T> T visitRecord(
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit((Type) null, branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+
+    // simple union case
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else { // complex union case
+      Preconditions.checkArgument(
+          type instanceof Types.StructType,
+          "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s",
+          type,
+          union);
+      Map<String, Integer> fieldNameToId =
+          (Map) union.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID);
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          String name =
+              branch.getType().equals(Schema.Type.RECORD)
+                  ? branch.getName()
+                  : branch.getType().getName();
+          if (fieldNameToId.containsKey(name)) {
+            int fieldId = fieldNameToId.get(name);
+            Types.NestedField branchType = type.asStructType().field(fieldId);
+            if (branchType != null) {
+              options.add(visit(branchType.type(), branch, visitor));
+            } else {
+              Type pseudoBranchType = AvroSchemaUtil.convert(branch);
+              options.add(visit(pseudoBranchType, branch, visitor));
+            }

Review Comment:
   Good to add comments to the if/else branches.



##########
core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java:
##########
@@ -60,6 +60,7 @@ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
   @Override
   public void setSchema(Schema newFileSchema) {
     this.fileSchema = newFileSchema;
+    AvroSchemaUtil.convertToDeriveNameMapping(this.fileSchema);
     if (nameMapping == null && !AvroSchemaUtil.hasIds(fileSchema)) {
       nameMapping = MappingUtil.create(expectedSchema);

Review Comment:
   I was under the impression that we would add name/type name information to this map. For example ("int" -> 2, "com.my.namespace.MyRecord" -> 3). @rdblue, what do you think?



##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -82,11 +83,45 @@ private static <T> T visitRecord(
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit((Type) null, branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+
+    // simple union case
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else { // complex union case
+      Preconditions.checkArgument(
+          type instanceof Types.StructType,
+          "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s",
+          type,
+          union);
+      Map<String, Integer> fieldNameToId =
+          (Map) union.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID);
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          String name =
+              branch.getType().equals(Schema.Type.RECORD)
+                  ? branch.getName()
+                  : branch.getType().getName();

Review Comment:
   You might add a comment explaining the logic here.



##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -82,11 +83,45 @@ private static <T> T visitRecord(
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit((Type) null, branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+
+    // simple union case
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else { // complex union case
+      Preconditions.checkArgument(
+          type instanceof Types.StructType,
+          "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s",
+          type,
+          union);
+      Map<String, Integer> fieldNameToId =
+          (Map) union.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID);
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          String name =
+              branch.getType().equals(Schema.Type.RECORD)
+                  ? branch.getName()
+                  : branch.getType().getName();
+          if (fieldNameToId.containsKey(name)) {
+            int fieldId = fieldNameToId.get(name);
+            Types.NestedField branchType = type.asStructType().field(fieldId);

Review Comment:
   The output here does not sound to be a branch type, correct? This is a field. You might consider renaming to something like `expectedSchemaStructField`. You might as well fold the previous line to this line.



##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -82,11 +83,45 @@ private static <T> T visitRecord(
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit((Type) null, branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+
+    // simple union case
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else { // complex union case
+      Preconditions.checkArgument(
+          type instanceof Types.StructType,
+          "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s",
+          type,
+          union);
+      Map<String, Integer> fieldNameToId =
+          (Map) union.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID);
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          String name =
+              branch.getType().equals(Schema.Type.RECORD)
+                  ? branch.getName()
+                  : branch.getType().getName();

Review Comment:
   You might add a comment explaining the logic here.



##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -82,11 +83,45 @@ private static <T> T visitRecord(
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();

Review Comment:
   Terminology is a bit confusing (types, branches, options, etc). I would try to qualify them by the source, e.g., renaming `types` to `avroUnionBranches`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org