You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/09/04 17:40:02 UTC

[GitHub] [iceberg] yiqiangin opened a new pull request, #5704: Support non-optional union types and column projection in complex union for Avro

yiqiangin opened a new pull request, #5704:
URL: https://github.com/apache/iceberg/pull/5704

   This PR consists of two parts
   - the support for non-optional union types which is cherry picked from the unmerged PR https://github.com/apache/iceberg/pull/4242
   - the support for column projection in complex union which is an extension work of the previous PR
   
   In Iceberg, the complex union is represented by a struct with multiple fields. Without schema pruning caused by the column projection in the query, the number of fields equals to the number of types in the union plus one (for the tag field). When the column projection happens, the union schema of Iceberg is pruned and there are only a part of the fields in the struct according to the definition of column projection.
   In contrast, the union schema of Avro schema is not pruned in case of column projection, as the full union schema is needed to read the data from Avro file successfully.
   Also the readers to read the data of the union from Avro file are created based on the type schema from both Avro schema and Iceberg schema. The major problem to be solved here is to correlate the type in Avro schema with the type in Iceberg schema, especially in case that only a part of types exist in Iceberg schema with column projection.
   
   The main idea of the solution is as follows:
   - Build the mapping from the type name in Avro schema to the id of the corresponding field in Iceberg schema
   - When value readers are created, find the corresponding field in Iceberg schema for a type of Avro schema with the id stored in the mapping which key is the name of the type of Avro schema.
   
   The details of the implementation are as follows:
   - The mapping from the field name in Avro schema to the field id in Iceberg schema is derived during the conversion from Avro schema to Iceberg schema in the function of AvroSchemaUtil.convertToDeriveNameMapping and the class of SchemaToType.
   - The mapping of direct child fields of an Avro schema field is stored as a property named AvroFieldNameToIcebergId in this Avro schema field, therefore it can be easily accessed when Avro schema is traversed to generate the correspond readers to read Avro data file.
   - In case of union, the key of the mapping is the name of the branch in the union.
   - In case of complex union, the code of AvroSchemaWithTypeVisitor.visitUnion() first gets the mapping from the property of Avro schema, then get the field id in Iceberg schema using the type name in Avro schema, finally it uses the field id to get the field type in Iceberg schema:
      - if the corresponding field in Iceberg schema exists, the field is used to create the reader together with Avro schema node;
      - if the field for the given field id does not exist in Iceberg schema (which means this field is not projected in Iceberg schema), a pseudo branch type is created based on the corresponding Avro schema node to faciltate the creation of the reader.
   - In the class of UnionReader, the rows read from Avro data file are filtered according to the fields existing in Iceberg schema.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yiqiangin commented on a diff in pull request #5704: Support non-optional union types and column projection in complex union for Avro

Posted by GitBox <gi...@apache.org>.
yiqiangin commented on code in PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#discussion_r990346173


##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -82,11 +83,45 @@ private static <T> T visitRecord(
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit((Type) null, branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+
+    // simple union case
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else { // complex union case
+      Preconditions.checkArgument(
+          type instanceof Types.StructType,
+          "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s",
+          type,
+          union);
+      Map<String, Integer> fieldNameToId =
+          (Map) union.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID);
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          String name =
+              branch.getType().equals(Schema.Type.RECORD)
+                  ? branch.getName()
+                  : branch.getType().getName();
+          if (fieldNameToId.containsKey(name)) {
+            int fieldId = fieldNameToId.get(name);
+            Types.NestedField branchType = type.asStructType().field(fieldId);
+            if (branchType != null) {
+              options.add(visit(branchType.type(), branch, visitor));
+            } else {
+              Type pseudoBranchType = AvroSchemaUtil.convert(branch);
+              options.add(visit(pseudoBranchType, branch, visitor));
+            }
+          } else {
+            options.add(visit((Type) null, branch, visitor));
+          }
+        }
       }
     }
     return visitor.union(type, union, options);

Review Comment:
   I am afraid not. The readers for all branch types need to pass into `UnionReader`, as `UnionReader` needs to read all types of data from the union in Avro file to read the all the records successfully. Filtering the data based on the types projected in expected Iceberg schema can only happens after the data are read from Avro file  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wmoustafa commented on a diff in pull request #5704: Support non-optional union types and column projection in complex union for Avro

Posted by GitBox <gi...@apache.org>.
wmoustafa commented on code in PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#discussion_r979466542


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object value) {
       }
     }
   }
+
+  private static class ComplexUnionReader implements ValueReader<InternalRow> {
+    private static final String UNION_TAG_FIELD_NAME = "tag";
+    private final Schema schema;
+    private final List<Schema> branches;
+    private final ValueReader[] readers;
+    private int nullIndex;
+    private final int[] projectedFieldIdsToIdxInReturnedRow;
+    private boolean isTagFieldProjected;
+    private int numOfFieldsInReturnedRow;
+    private int nullTypeIndex;
+
+    private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers, Type expected) {
+      this.schema = schema;
+      this.branches = schema.getTypes();
+      this.readers = new ValueReader[readers.size()];
+      for (int i = 0; i < this.readers.length; i += 1) {
+        this.readers[i] = readers.get(i);
+      }
+
+      // checking if NULL type exists in Avro union schema
+      this.nullTypeIndex = -1;
+      for (int i = 0; i < this.schema.getTypes().size(); i++) {
+        Schema alt = this.schema.getTypes().get(i);
+        if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+          this.nullTypeIndex = i;
+          break;
+        }
+      }
+
+      // Creating an integer array to track the mapping between the index of fields to be projected
+      // and the index of the value for the field stored in the returned row,
+      // if the value for a field equals to -1, it means the value of this field should not be
+      // stored
+      // in the returned row
+      int numberOfTypes =
+          this.nullTypeIndex == -1
+              ? this.schema.getTypes().size()
+              : this.schema.getTypes().size() - 1;
+      this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+      Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+      this.numOfFieldsInReturnedRow = 0;
+      this.isTagFieldProjected = false;
+      for (Types.NestedField expectedStructField : expected.asStructType().fields()) {
+        String fieldName = expectedStructField.name();
+        if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+          this.isTagFieldProjected = true;
+          this.numOfFieldsInReturnedRow++;
+          continue;
+        }
+        int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
+        this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] =
+            this.numOfFieldsInReturnedRow++;
+      }
+    }
+
+    @Override
+    public InternalRow read(Decoder decoder, Object reuse) throws IOException {

Review Comment:
   Should not the logic here be:
   * Iterate on the Avro schema. For each branch, get the field ID from the Avro schema annotation. 
   * The assumption is Avro schema union preserves all the union branches even if some are not projected. So we still need to figure out if a field is projected or not. This can be achieved by looking up the field ID from the step above in the expected Iceberg schema. If the field ID is projected, populate the InternalRow index using the next suitable reader (hopefully reader order is preset properly in `AvroSchemaWithTypeVisitor` to match the expected projection).
   * If the field ID is not projected, skip.
   The above logic can be split/refactored between the constructor and the `read` method for efficiency.
   @rdblue Let me know if this matches your understanding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wmoustafa commented on a diff in pull request #5704: Support non-optional union types and column projection in complex union for Avro

Posted by GitBox <gi...@apache.org>.
wmoustafa commented on code in PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#discussion_r979466542


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object value) {
       }
     }
   }
+
+  private static class ComplexUnionReader implements ValueReader<InternalRow> {
+    private static final String UNION_TAG_FIELD_NAME = "tag";
+    private final Schema schema;
+    private final List<Schema> branches;
+    private final ValueReader[] readers;
+    private int nullIndex;
+    private final int[] projectedFieldIdsToIdxInReturnedRow;
+    private boolean isTagFieldProjected;
+    private int numOfFieldsInReturnedRow;
+    private int nullTypeIndex;
+
+    private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers, Type expected) {
+      this.schema = schema;
+      this.branches = schema.getTypes();
+      this.readers = new ValueReader[readers.size()];
+      for (int i = 0; i < this.readers.length; i += 1) {
+        this.readers[i] = readers.get(i);
+      }
+
+      // checking if NULL type exists in Avro union schema
+      this.nullTypeIndex = -1;
+      for (int i = 0; i < this.schema.getTypes().size(); i++) {
+        Schema alt = this.schema.getTypes().get(i);
+        if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+          this.nullTypeIndex = i;
+          break;
+        }
+      }
+
+      // Creating an integer array to track the mapping between the index of fields to be projected
+      // and the index of the value for the field stored in the returned row,
+      // if the value for a field equals to -1, it means the value of this field should not be
+      // stored
+      // in the returned row
+      int numberOfTypes =
+          this.nullTypeIndex == -1
+              ? this.schema.getTypes().size()
+              : this.schema.getTypes().size() - 1;
+      this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+      Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+      this.numOfFieldsInReturnedRow = 0;
+      this.isTagFieldProjected = false;
+      for (Types.NestedField expectedStructField : expected.asStructType().fields()) {
+        String fieldName = expectedStructField.name();
+        if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+          this.isTagFieldProjected = true;
+          this.numOfFieldsInReturnedRow++;
+          continue;
+        }
+        int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
+        this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] =
+            this.numOfFieldsInReturnedRow++;
+      }
+    }
+
+    @Override
+    public InternalRow read(Decoder decoder, Object reuse) throws IOException {

Review Comment:
   Should not the logic here be:
   * Iterate on the Avro schema. For each branch, get the field ID from the Avro schema annotation. 
   * The assumption is Avro schema union preserves all the union branches even if some are not projected. So we still need to figure out if a field is projected or not. This can be achieved by checking the field ID from the step above in the expected Iceberg schema. If the field ID is projected, populate the InternalRow index using the next suitable reader (hopefully reader order is preset properly in `AvroSchemaWithTypeVisitor` to match the expected projection).
   * If the field ID is not projected, skip.
   The above logic can be split/refactored between the constructor and the `read` method for efficiency.
   @rdblue Let me know if this matches your understanding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5704: Support non-optional union types and column projection in complex union for Avro

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#discussion_r985305158


##########
core/src/main/java/org/apache/iceberg/avro/PruneColumns.java:
##########
@@ -345,4 +345,27 @@ private static boolean isOptionSchemaWithNonNullFirstOption(Schema schema) {
     return AvroSchemaUtil.isOptionSchema(schema)
         && schema.getTypes().get(0).getType() != Schema.Type.NULL;
   }
+
+  // for primitive types, the visitResult will be null, we want to reuse the primitive types from
+  // the original
+  // schema, while for nested types, we want to use the visitResult because they have content from
+  // the previous
+  // recursive calls.

Review Comment:
   Can you fix line wrapping? Looks like this was auto-formatted.



##########
core/src/main/java/org/apache/iceberg/avro/PruneColumns.java:
##########
@@ -345,4 +345,27 @@ private static boolean isOptionSchemaWithNonNullFirstOption(Schema schema) {
     return AvroSchemaUtil.isOptionSchema(schema)
         && schema.getTypes().get(0).getType() != Schema.Type.NULL;
   }
+
+  // for primitive types, the visitResult will be null, we want to reuse the primitive types from
+  // the original
+  // schema, while for nested types, we want to use the visitResult because they have content from
+  // the previous
+  // recursive calls.
+  private static Schema copyUnion(Schema record, List<Schema> visitResults) {

Review Comment:
   Is there a better name for this? Maybe `pruneComplexUnion`?



##########
core/src/main/java/org/apache/iceberg/avro/PruneColumns.java:
##########
@@ -345,4 +345,27 @@ private static boolean isOptionSchemaWithNonNullFirstOption(Schema schema) {
     return AvroSchemaUtil.isOptionSchema(schema)
         && schema.getTypes().get(0).getType() != Schema.Type.NULL;
   }
+
+  // for primitive types, the visitResult will be null, we want to reuse the primitive types from
+  // the original
+  // schema, while for nested types, we want to use the visitResult because they have content from
+  // the previous
+  // recursive calls.
+  private static Schema copyUnion(Schema record, List<Schema> visitResults) {
+    List<Schema> branches = Lists.newArrayListWithExpectedSize(visitResults.size());
+    for (int i = 0; i < visitResults.size(); i++) {
+      if (visitResults.get(i) == null) {
+        branches.add(record.getTypes().get(i));

Review Comment:
   It looks like `record` is actually a `union` and not a record.



##########
core/src/main/java/org/apache/iceberg/avro/SchemaToType.java:
##########
@@ -38,6 +43,11 @@ class SchemaToType extends AvroSchemaVisitor<Type> {
     }
   }
 
+  SchemaToType(Schema root, boolean deriveNameMapping) {
+    this(root);
+    this.deriveNameMapping = deriveNameMapping;

Review Comment:
   I don't think this PR should build a name mapping. That can be added in a later PR, and it should not use a custom Avro property.
   
   Where possible, we avoid mixing jobs in the Iceberg project. This class converts a schema from Avro to Iceberg and should do only that. If you want to derive a mapping, I'd recommend building a visitor to do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5704: Support non-optional union types and column projection in complex union for Avro

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#discussion_r985309872


##########
core/src/main/java/org/apache/iceberg/avro/PruneColumns.java:
##########
@@ -345,4 +345,27 @@ private static boolean isOptionSchemaWithNonNullFirstOption(Schema schema) {
     return AvroSchemaUtil.isOptionSchema(schema)
         && schema.getTypes().get(0).getType() != Schema.Type.NULL;
   }
+
+  // for primitive types, the visitResult will be null, we want to reuse the primitive types from
+  // the original
+  // schema, while for nested types, we want to use the visitResult because they have content from
+  // the previous
+  // recursive calls.
+  private static Schema copyUnion(Schema record, List<Schema> visitResults) {
+    List<Schema> branches = Lists.newArrayListWithExpectedSize(visitResults.size());
+    for (int i = 0; i < visitResults.size(); i++) {
+      if (visitResults.get(i) == null) {
+        branches.add(record.getTypes().get(i));
+      } else {
+        branches.add(visitResults.get(i));
+      }
+    }
+    Schema schema = Schema.createUnion(branches);
+    if (record.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID) != null) {

Review Comment:
   @yiqiangin, @wmoustafa, I would expect this to apply the name mapping, but instead it passes on a custom schema property. I don't think that this approach is correct.
   
   The Avro implementation for name mapping is a little odd. For Parquet and ORC, there's a class that rewrites the schema and adds IDs. It looks like instead of taking that approach, the Avro implementer added name mapping to this class. That's okay, but that means that the name mapping should be applied here for the union work.
   
   We want to create guarantees that we can rely on to simplify other code. In this case, once `PruneColumns` is done, we're guaranteed to have an Avro schema with the correct field IDs annotated throughout.
   
   To do that, I think the field ID should be added to each Schema that is a branch of the union:
   
   ```java
     List<Schema> unionTypes = union.getTypes();
     for (int ind = 0; ind < branches.size(); ind += 1) {
       Schema branchSchema = visitResults.get(ind);
       if (branchSchema == null) {
         branchSchema = unionTypes.get(ind);
       }
   
       Integer branchId = AvroSchemaUtil.getBranchId(branchSchema, nameMapping, fieldNames());
       if (branchId != null) {
         optionSchema.addProp(AvroSchemaUtil.BRANCH_ID_PROP, String.valueOf(branchId));
       }
   
       branches.add(branchSchema)
     }
   
     return Schema.createUnion(branches);
   
   // AvroSchemaUtil additions:
   
     public static final String BRANCH_ID_PROP = "branch-id";
   
     static Integer getBranchId(
         Schema branch, NameMapping mapping, Iterable<String> parentFieldNames) {
       Object id = branch.getObjectProp(BRANCH_ID_PROP);
       if (id != null) {
         return toInt(id);
       } else if (mapping != null) {
         MappedField mappedField = findInMapping(mapping, parentFieldNames, branch.getName(), branch.getFullName());
         if (mappedField != null) {
           return mappedField.id();
         }
       }
   
       return null;
     }
   
     private static MappedField findInMapping(NameMapping mapping, Iterable<String> parentFieldNames, String... nameOpts) {
       List<String> names = Lists.newArrayList(parentFieldNames);
       for (String name : nameOpts) {
         names.add(name);
         MappedField field = mapping.find(name);
         if (field != null) {
           return field;
         }
       }
   
       return null;
     }
   ```



##########
core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java:
##########
@@ -60,6 +60,7 @@ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
   @Override
   public void setSchema(Schema newFileSchema) {
     this.fileSchema = newFileSchema;
+    AvroSchemaUtil.convertToDeriveNameMapping(this.fileSchema);
     if (nameMapping == null && !AvroSchemaUtil.hasIds(fileSchema)) {
       nameMapping = MappingUtil.create(expectedSchema);

Review Comment:
   Yes, exactly. See the code I posted above. I think we need to add `branch-id` to the union branches during pruning.



##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -82,11 +83,45 @@ private static <T> T visitRecord(
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit((Type) null, branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+
+    // simple union case
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else { // complex union case
+      Preconditions.checkArgument(
+          type instanceof Types.StructType,
+          "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s",
+          type,
+          union);
+      Map<String, Integer> fieldNameToId =
+          (Map) union.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID);

Review Comment:
   This matching should be done using branch IDs, not a map like this.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object value) {
       }
     }
   }
+
+  private static class ComplexUnionReader implements ValueReader<InternalRow> {

Review Comment:
   What is specific to Spark about this? Can we use an approach like the struct reader and have a generic one that is extended by Spark, Flink, etc. to make the type concrete?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -24,13 +24,17 @@
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import org.apache.avro.Schema;

Review Comment:
   I'd prefer not passing in the Avro schema. I think that the behavior should be that the `AvroSchemaWithTypeVisitor` visits each union branch and produces a `ValueReader`. Then the visitor implementation should create the index map and pass it into the reader. Not passing the schema in should keep the reader simple.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object value) {
       }
     }
   }
+
+  private static class ComplexUnionReader implements ValueReader<InternalRow> {
+    private static final String UNION_TAG_FIELD_NAME = "tag";
+    private final Schema schema;
+    private final List<Schema> branches;
+    private final ValueReader[] readers;
+    private int nullIndex;
+    private final int[] projectedFieldIdsToIdxInReturnedRow;
+    private boolean isTagFieldProjected;
+    private int numOfFieldsInReturnedRow;
+    private int nullTypeIndex;
+
+    private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers, Type expected) {
+      this.schema = schema;
+      this.branches = schema.getTypes();
+      this.readers = new ValueReader[readers.size()];
+      for (int i = 0; i < this.readers.length; i += 1) {
+        this.readers[i] = readers.get(i);
+      }
+
+      // checking if NULL type exists in Avro union schema
+      this.nullTypeIndex = -1;

Review Comment:
   If there is an index for null, then it should be handled just like any other value reader, right? It won't be projected, but if the union has the null index, the reader can be called and will do nothing.
   
   I guess the odd thing is that there isn't a `NullValueReader` that can be used as a placeholder? I think maybe adding one would be cleaner than adding special handling for null options.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object value) {
       }
     }
   }
+
+  private static class ComplexUnionReader implements ValueReader<InternalRow> {
+    private static final String UNION_TAG_FIELD_NAME = "tag";
+    private final Schema schema;
+    private final List<Schema> branches;
+    private final ValueReader[] readers;
+    private int nullIndex;
+    private final int[] projectedFieldIdsToIdxInReturnedRow;
+    private boolean isTagFieldProjected;
+    private int numOfFieldsInReturnedRow;
+    private int nullTypeIndex;
+
+    private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers, Type expected) {
+      this.schema = schema;
+      this.branches = schema.getTypes();
+      this.readers = new ValueReader[readers.size()];
+      for (int i = 0; i < this.readers.length; i += 1) {
+        this.readers[i] = readers.get(i);
+      }
+
+      // checking if NULL type exists in Avro union schema
+      this.nullTypeIndex = -1;
+      for (int i = 0; i < this.schema.getTypes().size(); i++) {
+        Schema alt = this.schema.getTypes().get(i);
+        if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+          this.nullTypeIndex = i;
+          break;
+        }
+      }
+
+      // Creating an integer array to track the mapping between the index of fields to be projected
+      // and the index of the value for the field stored in the returned row,
+      // if the value for a field equals to -1, it means the value of this field should not be
+      // stored
+      // in the returned row
+      int numberOfTypes =
+          this.nullTypeIndex == -1
+              ? this.schema.getTypes().size()
+              : this.schema.getTypes().size() - 1;
+      this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+      Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+      this.numOfFieldsInReturnedRow = 0;
+      this.isTagFieldProjected = false;
+      for (Types.NestedField expectedStructField : expected.asStructType().fields()) {
+        String fieldName = expectedStructField.name();
+        if (fieldName.equals(UNION_TAG_FIELD_NAME)) {

Review Comment:
   Is there a better way to identify the tag field?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object value) {
       }
     }
   }
+
+  private static class ComplexUnionReader implements ValueReader<InternalRow> {
+    private static final String UNION_TAG_FIELD_NAME = "tag";
+    private final Schema schema;
+    private final List<Schema> branches;
+    private final ValueReader[] readers;
+    private int nullIndex;
+    private final int[] projectedFieldIdsToIdxInReturnedRow;
+    private boolean isTagFieldProjected;
+    private int numOfFieldsInReturnedRow;
+    private int nullTypeIndex;
+
+    private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers, Type expected) {
+      this.schema = schema;
+      this.branches = schema.getTypes();
+      this.readers = new ValueReader[readers.size()];
+      for (int i = 0; i < this.readers.length; i += 1) {
+        this.readers[i] = readers.get(i);
+      }
+
+      // checking if NULL type exists in Avro union schema
+      this.nullTypeIndex = -1;
+      for (int i = 0; i < this.schema.getTypes().size(); i++) {
+        Schema alt = this.schema.getTypes().get(i);
+        if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+          this.nullTypeIndex = i;
+          break;
+        }
+      }
+
+      // Creating an integer array to track the mapping between the index of fields to be projected
+      // and the index of the value for the field stored in the returned row,
+      // if the value for a field equals to -1, it means the value of this field should not be
+      // stored
+      // in the returned row
+      int numberOfTypes =
+          this.nullTypeIndex == -1
+              ? this.schema.getTypes().size()
+              : this.schema.getTypes().size() - 1;
+      this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+      Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+      this.numOfFieldsInReturnedRow = 0;
+      this.isTagFieldProjected = false;
+      for (Types.NestedField expectedStructField : expected.asStructType().fields()) {
+        String fieldName = expectedStructField.name();
+        if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+          this.isTagFieldProjected = true;
+          this.numOfFieldsInReturnedRow++;
+          continue;

Review Comment:
   Minor: prefer `else` to `continue` when the logic is simple like this.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object value) {
       }
     }
   }
+
+  private static class ComplexUnionReader implements ValueReader<InternalRow> {
+    private static final String UNION_TAG_FIELD_NAME = "tag";
+    private final Schema schema;
+    private final List<Schema> branches;
+    private final ValueReader[] readers;
+    private int nullIndex;
+    private final int[] projectedFieldIdsToIdxInReturnedRow;
+    private boolean isTagFieldProjected;
+    private int numOfFieldsInReturnedRow;
+    private int nullTypeIndex;
+
+    private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers, Type expected) {
+      this.schema = schema;
+      this.branches = schema.getTypes();
+      this.readers = new ValueReader[readers.size()];
+      for (int i = 0; i < this.readers.length; i += 1) {
+        this.readers[i] = readers.get(i);
+      }
+
+      // checking if NULL type exists in Avro union schema
+      this.nullTypeIndex = -1;
+      for (int i = 0; i < this.schema.getTypes().size(); i++) {
+        Schema alt = this.schema.getTypes().get(i);
+        if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+          this.nullTypeIndex = i;
+          break;
+        }
+      }
+
+      // Creating an integer array to track the mapping between the index of fields to be projected
+      // and the index of the value for the field stored in the returned row,
+      // if the value for a field equals to -1, it means the value of this field should not be
+      // stored
+      // in the returned row
+      int numberOfTypes =
+          this.nullTypeIndex == -1
+              ? this.schema.getTypes().size()
+              : this.schema.getTypes().size() - 1;
+      this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+      Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+      this.numOfFieldsInReturnedRow = 0;
+      this.isTagFieldProjected = false;
+      for (Types.NestedField expectedStructField : expected.asStructType().fields()) {
+        String fieldName = expectedStructField.name();
+        if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+          this.isTagFieldProjected = true;
+          this.numOfFieldsInReturnedRow++;
+          continue;
+        }
+        int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));

Review Comment:
   This should not parse field names. It should instead use field IDs from the Iceberg schema and branch IDs from the Avro schema.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object value) {
       }
     }
   }
+
+  private static class ComplexUnionReader implements ValueReader<InternalRow> {
+    private static final String UNION_TAG_FIELD_NAME = "tag";
+    private final Schema schema;
+    private final List<Schema> branches;
+    private final ValueReader[] readers;
+    private int nullIndex;
+    private final int[] projectedFieldIdsToIdxInReturnedRow;
+    private boolean isTagFieldProjected;
+    private int numOfFieldsInReturnedRow;
+    private int nullTypeIndex;
+
+    private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers, Type expected) {
+      this.schema = schema;
+      this.branches = schema.getTypes();
+      this.readers = new ValueReader[readers.size()];
+      for (int i = 0; i < this.readers.length; i += 1) {
+        this.readers[i] = readers.get(i);
+      }
+
+      // checking if NULL type exists in Avro union schema
+      this.nullTypeIndex = -1;
+      for (int i = 0; i < this.schema.getTypes().size(); i++) {
+        Schema alt = this.schema.getTypes().get(i);
+        if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+          this.nullTypeIndex = i;
+          break;
+        }
+      }
+
+      // Creating an integer array to track the mapping between the index of fields to be projected
+      // and the index of the value for the field stored in the returned row,
+      // if the value for a field equals to -1, it means the value of this field should not be
+      // stored
+      // in the returned row
+      int numberOfTypes =
+          this.nullTypeIndex == -1
+              ? this.schema.getTypes().size()
+              : this.schema.getTypes().size() - 1;
+      this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+      Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+      this.numOfFieldsInReturnedRow = 0;
+      this.isTagFieldProjected = false;
+      for (Types.NestedField expectedStructField : expected.asStructType().fields()) {
+        String fieldName = expectedStructField.name();
+        if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+          this.isTagFieldProjected = true;
+          this.numOfFieldsInReturnedRow++;
+          continue;
+        }
+        int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
+        this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] =
+            this.numOfFieldsInReturnedRow++;
+      }
+    }
+
+    @Override
+    public InternalRow read(Decoder decoder, Object reuse) throws IOException {
+      int index = decoder.readIndex();
+      if (index == nullTypeIndex) {
+        // if it is a null data, directly return null as the whole union result
+        // we know for sure it is a null so the casting will always work.
+        return (InternalRow) readers[nullTypeIndex].read(decoder, reuse);

Review Comment:
   I assume that this always returns null, but it is really weird to return the result of a reader directly.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object value) {
       }
     }
   }
+
+  private static class ComplexUnionReader implements ValueReader<InternalRow> {
+    private static final String UNION_TAG_FIELD_NAME = "tag";
+    private final Schema schema;
+    private final List<Schema> branches;
+    private final ValueReader[] readers;
+    private int nullIndex;
+    private final int[] projectedFieldIdsToIdxInReturnedRow;
+    private boolean isTagFieldProjected;
+    private int numOfFieldsInReturnedRow;
+    private int nullTypeIndex;
+
+    private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers, Type expected) {
+      this.schema = schema;
+      this.branches = schema.getTypes();
+      this.readers = new ValueReader[readers.size()];
+      for (int i = 0; i < this.readers.length; i += 1) {
+        this.readers[i] = readers.get(i);
+      }
+
+      // checking if NULL type exists in Avro union schema
+      this.nullTypeIndex = -1;
+      for (int i = 0; i < this.schema.getTypes().size(); i++) {
+        Schema alt = this.schema.getTypes().get(i);
+        if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+          this.nullTypeIndex = i;
+          break;
+        }
+      }
+
+      // Creating an integer array to track the mapping between the index of fields to be projected
+      // and the index of the value for the field stored in the returned row,
+      // if the value for a field equals to -1, it means the value of this field should not be
+      // stored
+      // in the returned row
+      int numberOfTypes =
+          this.nullTypeIndex == -1
+              ? this.schema.getTypes().size()
+              : this.schema.getTypes().size() - 1;
+      this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+      Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+      this.numOfFieldsInReturnedRow = 0;
+      this.isTagFieldProjected = false;
+      for (Types.NestedField expectedStructField : expected.asStructType().fields()) {
+        String fieldName = expectedStructField.name();
+        if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+          this.isTagFieldProjected = true;
+          this.numOfFieldsInReturnedRow++;
+          continue;
+        }
+        int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
+        this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] =
+            this.numOfFieldsInReturnedRow++;
+      }
+    }
+
+    @Override
+    public InternalRow read(Decoder decoder, Object reuse) throws IOException {
+      int index = decoder.readIndex();
+      if (index == nullTypeIndex) {
+        // if it is a null data, directly return null as the whole union result
+        // we know for sure it is a null so the casting will always work.
+        return (InternalRow) readers[nullTypeIndex].read(decoder, reuse);

Review Comment:
   I assume that this always returns null, but it is really weird to return the result of a reader directly.
   
   What if the tag was projected? Why does this not produce `InternalRow(nullIndex, null, null, ... null)`?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object value) {
       }
     }
   }
+
+  private static class ComplexUnionReader implements ValueReader<InternalRow> {
+    private static final String UNION_TAG_FIELD_NAME = "tag";
+    private final Schema schema;
+    private final List<Schema> branches;
+    private final ValueReader[] readers;
+    private int nullIndex;
+    private final int[] projectedFieldIdsToIdxInReturnedRow;
+    private boolean isTagFieldProjected;
+    private int numOfFieldsInReturnedRow;
+    private int nullTypeIndex;
+
+    private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers, Type expected) {
+      this.schema = schema;
+      this.branches = schema.getTypes();
+      this.readers = new ValueReader[readers.size()];
+      for (int i = 0; i < this.readers.length; i += 1) {
+        this.readers[i] = readers.get(i);
+      }
+
+      // checking if NULL type exists in Avro union schema
+      this.nullTypeIndex = -1;
+      for (int i = 0; i < this.schema.getTypes().size(); i++) {
+        Schema alt = this.schema.getTypes().get(i);
+        if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+          this.nullTypeIndex = i;
+          break;
+        }
+      }
+
+      // Creating an integer array to track the mapping between the index of fields to be projected
+      // and the index of the value for the field stored in the returned row,
+      // if the value for a field equals to -1, it means the value of this field should not be
+      // stored
+      // in the returned row
+      int numberOfTypes =
+          this.nullTypeIndex == -1
+              ? this.schema.getTypes().size()
+              : this.schema.getTypes().size() - 1;
+      this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+      Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+      this.numOfFieldsInReturnedRow = 0;
+      this.isTagFieldProjected = false;
+      for (Types.NestedField expectedStructField : expected.asStructType().fields()) {
+        String fieldName = expectedStructField.name();
+        if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+          this.isTagFieldProjected = true;
+          this.numOfFieldsInReturnedRow++;
+          continue;
+        }
+        int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
+        this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] =
+            this.numOfFieldsInReturnedRow++;
+      }
+    }
+
+    @Override
+    public InternalRow read(Decoder decoder, Object reuse) throws IOException {
+      int index = decoder.readIndex();
+      if (index == nullTypeIndex) {
+        // if it is a null data, directly return null as the whole union result
+        // we know for sure it is a null so the casting will always work.
+        return (InternalRow) readers[nullTypeIndex].read(decoder, reuse);
+      }
+
+      // otherwise, we need to return an InternalRow as a struct data
+      InternalRow struct = new GenericInternalRow(numOfFieldsInReturnedRow);

Review Comment:
   Readers need to support an option to reuse the row. You can see how in the struct reader.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object value) {
       }
     }
   }
+
+  private static class ComplexUnionReader implements ValueReader<InternalRow> {
+    private static final String UNION_TAG_FIELD_NAME = "tag";
+    private final Schema schema;
+    private final List<Schema> branches;
+    private final ValueReader[] readers;
+    private int nullIndex;
+    private final int[] projectedFieldIdsToIdxInReturnedRow;
+    private boolean isTagFieldProjected;
+    private int numOfFieldsInReturnedRow;
+    private int nullTypeIndex;
+
+    private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers, Type expected) {
+      this.schema = schema;
+      this.branches = schema.getTypes();
+      this.readers = new ValueReader[readers.size()];
+      for (int i = 0; i < this.readers.length; i += 1) {
+        this.readers[i] = readers.get(i);
+      }
+
+      // checking if NULL type exists in Avro union schema
+      this.nullTypeIndex = -1;
+      for (int i = 0; i < this.schema.getTypes().size(); i++) {
+        Schema alt = this.schema.getTypes().get(i);
+        if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+          this.nullTypeIndex = i;
+          break;
+        }
+      }
+
+      // Creating an integer array to track the mapping between the index of fields to be projected
+      // and the index of the value for the field stored in the returned row,
+      // if the value for a field equals to -1, it means the value of this field should not be
+      // stored
+      // in the returned row
+      int numberOfTypes =
+          this.nullTypeIndex == -1
+              ? this.schema.getTypes().size()
+              : this.schema.getTypes().size() - 1;
+      this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+      Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+      this.numOfFieldsInReturnedRow = 0;
+      this.isTagFieldProjected = false;
+      for (Types.NestedField expectedStructField : expected.asStructType().fields()) {
+        String fieldName = expectedStructField.name();
+        if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+          this.isTagFieldProjected = true;
+          this.numOfFieldsInReturnedRow++;
+          continue;
+        }
+        int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
+        this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] =
+            this.numOfFieldsInReturnedRow++;
+      }
+    }
+
+    @Override
+    public InternalRow read(Decoder decoder, Object reuse) throws IOException {

Review Comment:
   I agree with @wmoustafa, although I think that this is correct to make the mapping array. That way the implementation is straightforward:
   
   ```java
     InternalRow row = reuseOrCreate(reuse); // this is where setNullAt happens
     int index = decoder.readIndex();
     int destIndex = projectionIndexes[index];
     if (destIndex >= 0) {
       Object value = readers[index].read(decoder, get(reuse, destIndex));
       row.update(destIndex, value);
     } else {
       readers[index].read(decoder, null);
     }
   
     return row;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yiqiangin commented on a diff in pull request #5704: Support non-optional union types and column projection in complex union for Avro

Posted by GitBox <gi...@apache.org>.
yiqiangin commented on code in PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#discussion_r990470991


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object value) {
       }
     }
   }
+
+  private static class ComplexUnionReader implements ValueReader<InternalRow> {
+    private static final String UNION_TAG_FIELD_NAME = "tag";
+    private final Schema schema;
+    private final List<Schema> branches;
+    private final ValueReader[] readers;
+    private int nullIndex;
+    private final int[] projectedFieldIdsToIdxInReturnedRow;
+    private boolean isTagFieldProjected;
+    private int numOfFieldsInReturnedRow;
+    private int nullTypeIndex;
+
+    private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers, Type expected) {
+      this.schema = schema;
+      this.branches = schema.getTypes();
+      this.readers = new ValueReader[readers.size()];
+      for (int i = 0; i < this.readers.length; i += 1) {
+        this.readers[i] = readers.get(i);
+      }
+
+      // checking if NULL type exists in Avro union schema
+      this.nullTypeIndex = -1;
+      for (int i = 0; i < this.schema.getTypes().size(); i++) {
+        Schema alt = this.schema.getTypes().get(i);
+        if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+          this.nullTypeIndex = i;
+          break;
+        }
+      }
+
+      // Creating an integer array to track the mapping between the index of fields to be projected
+      // and the index of the value for the field stored in the returned row,
+      // if the value for a field equals to -1, it means the value of this field should not be
+      // stored
+      // in the returned row
+      int numberOfTypes =
+          this.nullTypeIndex == -1
+              ? this.schema.getTypes().size()
+              : this.schema.getTypes().size() - 1;
+      this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+      Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+      this.numOfFieldsInReturnedRow = 0;
+      this.isTagFieldProjected = false;
+      for (Types.NestedField expectedStructField : expected.asStructType().fields()) {
+        String fieldName = expectedStructField.name();
+        if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+          this.isTagFieldProjected = true;
+          this.numOfFieldsInReturnedRow++;
+          continue;
+        }
+        int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
+        this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] =
+            this.numOfFieldsInReturnedRow++;
+      }
+    }
+
+    @Override
+    public InternalRow read(Decoder decoder, Object reuse) throws IOException {

Review Comment:
   Per Ryan's suggestion, Avro Schema is not passed into ComplexUnionReader.  The approach of mapping array is used to track the relationship between a branch type and the position of its value in the returned row.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wmoustafa commented on pull request #5704: Support non-optional union types and column projection in complex union for Avro

Posted by GitBox <gi...@apache.org>.
wmoustafa commented on PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#issuecomment-1254235984

   It seems references to "Avro schema" in the description are ambiguous. Could you disambiguate them? For example when saying `In contrast, the union schema of Avro schema is not pruned in case of column projection`, it is not clear which Avro schema you are referring to. This might apply to other schema type references. `File schema` is an example of an unambiguous reference.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wmoustafa commented on a diff in pull request #5704: Support non-optional union types and column projection in complex union for Avro

Posted by GitBox <gi...@apache.org>.
wmoustafa commented on code in PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#discussion_r979466542


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object value) {
       }
     }
   }
+
+  private static class ComplexUnionReader implements ValueReader<InternalRow> {
+    private static final String UNION_TAG_FIELD_NAME = "tag";
+    private final Schema schema;
+    private final List<Schema> branches;
+    private final ValueReader[] readers;
+    private int nullIndex;
+    private final int[] projectedFieldIdsToIdxInReturnedRow;
+    private boolean isTagFieldProjected;
+    private int numOfFieldsInReturnedRow;
+    private int nullTypeIndex;
+
+    private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers, Type expected) {
+      this.schema = schema;
+      this.branches = schema.getTypes();
+      this.readers = new ValueReader[readers.size()];
+      for (int i = 0; i < this.readers.length; i += 1) {
+        this.readers[i] = readers.get(i);
+      }
+
+      // checking if NULL type exists in Avro union schema
+      this.nullTypeIndex = -1;
+      for (int i = 0; i < this.schema.getTypes().size(); i++) {
+        Schema alt = this.schema.getTypes().get(i);
+        if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+          this.nullTypeIndex = i;
+          break;
+        }
+      }
+
+      // Creating an integer array to track the mapping between the index of fields to be projected
+      // and the index of the value for the field stored in the returned row,
+      // if the value for a field equals to -1, it means the value of this field should not be
+      // stored
+      // in the returned row
+      int numberOfTypes =
+          this.nullTypeIndex == -1
+              ? this.schema.getTypes().size()
+              : this.schema.getTypes().size() - 1;
+      this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+      Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+      this.numOfFieldsInReturnedRow = 0;
+      this.isTagFieldProjected = false;
+      for (Types.NestedField expectedStructField : expected.asStructType().fields()) {
+        String fieldName = expectedStructField.name();
+        if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+          this.isTagFieldProjected = true;
+          this.numOfFieldsInReturnedRow++;
+          continue;
+        }
+        int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
+        this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] =
+            this.numOfFieldsInReturnedRow++;
+      }
+    }
+
+    @Override
+    public InternalRow read(Decoder decoder, Object reuse) throws IOException {

Review Comment:
   Should not the logic here be:
   * Iterate on the Avro schema. For each branch, get the field ID from the Avro schema annotation. 
   * The assumption is Avro schema union preserves all the union branches even if some are not projected. So we still need to figure out if a field is projected or not. This can be achieved by looking up the field ID from the step above in the expected Iceberg schema. If the field ID is projected, populate the InternalRow index using the next suitable reader (hopefully reader order is preset properly in `AvroSchemaWithTypeVisitor` to match the expected projection).
   * If the field ID is not projected, skip.
   
   The above logic can be split/refactored between the constructor and the `read` method for efficiency.
   @rdblue Let me know if this matches your understanding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wmoustafa commented on a diff in pull request #5704: Support non-optional union types and column projection in complex union for Avro

Posted by GitBox <gi...@apache.org>.
wmoustafa commented on code in PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#discussion_r979284966


##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java:
##########
@@ -78,6 +78,10 @@ public static Type convert(Schema schema) {
     return AvroSchemaVisitor.visit(schema, new SchemaToType(schema));
   }
 
+  public static Type convertToDeriveNameMapping(Schema schema) {

Review Comment:
   We can revisit this comment after addressing some of the more fundamental ones below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wmoustafa commented on a diff in pull request #5704: Support non-optional union types and column projection in complex union for Avro

Posted by GitBox <gi...@apache.org>.
wmoustafa commented on code in PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#discussion_r979466542


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object value) {
       }
     }
   }
+
+  private static class ComplexUnionReader implements ValueReader<InternalRow> {
+    private static final String UNION_TAG_FIELD_NAME = "tag";
+    private final Schema schema;
+    private final List<Schema> branches;
+    private final ValueReader[] readers;
+    private int nullIndex;
+    private final int[] projectedFieldIdsToIdxInReturnedRow;
+    private boolean isTagFieldProjected;
+    private int numOfFieldsInReturnedRow;
+    private int nullTypeIndex;
+
+    private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers, Type expected) {
+      this.schema = schema;
+      this.branches = schema.getTypes();
+      this.readers = new ValueReader[readers.size()];
+      for (int i = 0; i < this.readers.length; i += 1) {
+        this.readers[i] = readers.get(i);
+      }
+
+      // checking if NULL type exists in Avro union schema
+      this.nullTypeIndex = -1;
+      for (int i = 0; i < this.schema.getTypes().size(); i++) {
+        Schema alt = this.schema.getTypes().get(i);
+        if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+          this.nullTypeIndex = i;
+          break;
+        }
+      }
+
+      // Creating an integer array to track the mapping between the index of fields to be projected
+      // and the index of the value for the field stored in the returned row,
+      // if the value for a field equals to -1, it means the value of this field should not be
+      // stored
+      // in the returned row
+      int numberOfTypes =
+          this.nullTypeIndex == -1
+              ? this.schema.getTypes().size()
+              : this.schema.getTypes().size() - 1;
+      this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+      Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+      this.numOfFieldsInReturnedRow = 0;
+      this.isTagFieldProjected = false;
+      for (Types.NestedField expectedStructField : expected.asStructType().fields()) {
+        String fieldName = expectedStructField.name();
+        if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+          this.isTagFieldProjected = true;
+          this.numOfFieldsInReturnedRow++;
+          continue;
+        }
+        int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
+        this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] =
+            this.numOfFieldsInReturnedRow++;
+      }
+    }
+
+    @Override
+    public InternalRow read(Decoder decoder, Object reuse) throws IOException {

Review Comment:
   Should not the logic here be:
   * Iterate on the Avro schema. For each branch, get the field ID from the Avro schema annotation. 
   * The assumption is Avro schema union preserves all the union branches even if some are not projected. So we still need to figure out if a field is projected or not. This can happen by checking the field ID from the step above in the expected Iceberg schema. If the field ID is projected, populate the InternalRow index using the next suitable reader (hopefully reader order is preset properly in `AvroSchemaWithTypeVisitor` to match the expected projection).
   * If the field ID is not projected, skip.
   The above logic can be split/refactored between the constructor and the `read` method for efficiency.
   @rdblue Let me know if this matches your understanding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yiqiangin commented on pull request #5704: Support non-optional union types and column projection in complex union for Avro

Posted by GitBox <gi...@apache.org>.
yiqiangin commented on PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#issuecomment-1255316274

   > It seems references to "Avro schema" in the description are ambiguous. Could you disambiguate them? For example when saying `In contrast, the union schema of Avro schema is not pruned in case of column projection`, it is not clear which Avro schema you are referring to. This might apply to other schema type references. `File schema` is an example of an unambiguous reference.
   
   Good point. The description is revised to remove the ambigulity.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wmoustafa commented on a diff in pull request #5704: Support non-optional union types and column projection in complex union for Avro

Posted by GitBox <gi...@apache.org>.
wmoustafa commented on code in PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#discussion_r979465023


##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -82,11 +83,45 @@ private static <T> T visitRecord(
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit((Type) null, branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+
+    // simple union case
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else { // complex union case
+      Preconditions.checkArgument(
+          type instanceof Types.StructType,
+          "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s",
+          type,
+          union);
+      Map<String, Integer> fieldNameToId =
+          (Map) union.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID);
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          String name =
+              branch.getType().equals(Schema.Type.RECORD)
+                  ? branch.getName()
+                  : branch.getType().getName();
+          if (fieldNameToId.containsKey(name)) {
+            int fieldId = fieldNameToId.get(name);
+            Types.NestedField branchType = type.asStructType().field(fieldId);
+            if (branchType != null) {
+              options.add(visit(branchType.type(), branch, visitor));
+            } else {
+              Type pseudoBranchType = AvroSchemaUtil.convert(branch);
+              options.add(visit(pseudoBranchType, branch, visitor));
+            }
+          } else {
+            options.add(visit((Type) null, branch, visitor));
+          }
+        }
       }
     }
     return visitor.union(type, union, options);

Review Comment:
   I think here you are returning all the readers and in the `UnionReader` you are trying to figure out which ones to use and which to ignore. Can we just pass the required readers here in a way that aligns with the expected schema ahead of time?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wmoustafa commented on a diff in pull request #5704: Support non-optional union types and column projection in complex union for Avro

Posted by GitBox <gi...@apache.org>.
wmoustafa commented on code in PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#discussion_r977084204


##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -82,11 +83,45 @@ private static <T> T visitRecord(
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit((Type) null, branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+
+    // simple union case
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else { // complex union case
+      Preconditions.checkArgument(
+          type instanceof Types.StructType,
+          "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s",
+          type,
+          union);
+      Map<String, Integer> fieldNameToId =
+          (Map) union.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID);
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          String name =
+              branch.getType().equals(Schema.Type.RECORD)
+                  ? branch.getName()
+                  : branch.getType().getName();

Review Comment:
   You might add a comment explaining the logic here.



##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java:
##########
@@ -78,6 +78,10 @@ public static Type convert(Schema schema) {
     return AvroSchemaVisitor.visit(schema, new SchemaToType(schema));
   }
 
+  public static Type convertToDeriveNameMapping(Schema schema) {

Review Comment:
   For consistency with the other APIs, rename this to `visit` and provide a flag to indicate whether to derive name mapping?



##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -82,11 +83,45 @@ private static <T> T visitRecord(
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit((Type) null, branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+
+    // simple union case
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else { // complex union case
+      Preconditions.checkArgument(
+          type instanceof Types.StructType,
+          "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s",
+          type,
+          union);
+      Map<String, Integer> fieldNameToId =
+          (Map) union.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID);
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          String name =
+              branch.getType().equals(Schema.Type.RECORD)
+                  ? branch.getName()
+                  : branch.getType().getName();
+          if (fieldNameToId.containsKey(name)) {
+            int fieldId = fieldNameToId.get(name);
+            Types.NestedField branchType = type.asStructType().field(fieldId);
+            if (branchType != null) {
+              options.add(visit(branchType.type(), branch, visitor));
+            } else {
+              Type pseudoBranchType = AvroSchemaUtil.convert(branch);
+              options.add(visit(pseudoBranchType, branch, visitor));
+            }
+          } else {
+            options.add(visit((Type) null, branch, visitor));
+          }

Review Comment:
   You might add a comment of how one could end up here.



##########
core/src/main/java/org/apache/iceberg/avro/SchemaToType.java:
##########
@@ -87,30 +97,63 @@ public Type record(Schema record, List<String> names, List<Type> fieldTypes) {
       this.nextId = 0;
     }
 
+    Map<String, Integer> fieldNameToId = Maps.newHashMap();
     for (int i = 0; i < fields.size(); i += 1) {
       Schema.Field field = fields.get(i);
       Type fieldType = fieldTypes.get(i);
       int fieldId = getId(field);
 
-      if (AvroSchemaUtil.isOptionSchema(field.schema())) {
+      if (AvroSchemaUtil.isOptionSchema(field.schema())
+          || AvroSchemaUtil.isOptionalComplexUnion(field.schema())) {
         newFields.add(Types.NestedField.optional(fieldId, field.name(), fieldType, field.doc()));
       } else {
         newFields.add(Types.NestedField.required(fieldId, field.name(), fieldType, field.doc()));
       }
+      fieldNameToId.put(field.name(), fieldId);
+    }
+
+    if (deriveNameMapping && record.getObjectProp(AVRO_FIELD_NAME_TO_ICEBERG_ID) == null) {
+      record.addProp(AVRO_FIELD_NAME_TO_ICEBERG_ID, fieldNameToId);

Review Comment:
   Should we add a map to the entire union type field or a prop to each branch, similar to how records/structs work for example?



##########
core/src/main/java/org/apache/iceberg/avro/SchemaToType.java:
##########
@@ -38,6 +43,11 @@ class SchemaToType extends AvroSchemaVisitor<Type> {
     }
   }
 
+  SchemaToType(Schema root, boolean deriveNameMapping) {

Review Comment:
   I think it might be cleaner to extract the name mapping injection to another class that either extends `AvroSchemaVisitor` or `SchemaToType`.



##########
core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java:
##########
@@ -60,6 +60,7 @@ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
   @Override
   public void setSchema(Schema newFileSchema) {
     this.fileSchema = newFileSchema;
+    AvroSchemaUtil.convertToDeriveNameMapping(this.fileSchema);

Review Comment:
   I think this line should change/go away after addressing some of the other comments.



##########
core/src/main/java/org/apache/iceberg/avro/SchemaToType.java:
##########
@@ -87,30 +97,63 @@ public Type record(Schema record, List<String> names, List<Type> fieldTypes) {
       this.nextId = 0;
     }
 
+    Map<String, Integer> fieldNameToId = Maps.newHashMap();
     for (int i = 0; i < fields.size(); i += 1) {
       Schema.Field field = fields.get(i);
       Type fieldType = fieldTypes.get(i);
       int fieldId = getId(field);
 
-      if (AvroSchemaUtil.isOptionSchema(field.schema())) {
+      if (AvroSchemaUtil.isOptionSchema(field.schema())
+          || AvroSchemaUtil.isOptionalComplexUnion(field.schema())) {
         newFields.add(Types.NestedField.optional(fieldId, field.name(), fieldType, field.doc()));
       } else {
         newFields.add(Types.NestedField.required(fieldId, field.name(), fieldType, field.doc()));
       }
+      fieldNameToId.put(field.name(), fieldId);
+    }
+
+    if (deriveNameMapping && record.getObjectProp(AVRO_FIELD_NAME_TO_ICEBERG_ID) == null) {
+      record.addProp(AVRO_FIELD_NAME_TO_ICEBERG_ID, fieldNameToId);
     }
 
     return Types.StructType.of(newFields);
   }
 
   @Override
   public Type union(Schema union, List<Type> options) {
-    Preconditions.checkArgument(
-        AvroSchemaUtil.isOptionSchema(union), "Unsupported type: non-option union: %s", union);
-    // records, arrays, and maps will check nullability later
-    if (options.get(0) == null) {
-      return options.get(1);
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      // Optional simple union
+      // records, arrays, and maps will check nullability later
+      if (options.get(0) == null) {
+        return options.get(1);
+      } else {
+        return options.get(0);
+      }
     } else {
-      return options.get(0);
+      // Complex union
+      Map<String, Integer> fieldNameToId = Maps.newHashMap();
+      List<Types.NestedField> newFields = Lists.newArrayList();
+      newFields.add(Types.NestedField.required(allocateId(), "tag", Types.IntegerType.get()));
+
+      int tagIndex = 0;
+      for (Type type : options) {
+        if (type != null) {
+          int fieldId = allocateId();
+          Schema schema = union.getTypes().get(tagIndex);
+          newFields.add(Types.NestedField.optional(fieldId, "field" + tagIndex++, type));
+          String name =
+              schema.getType().equals(Schema.Type.RECORD)
+                  ? schema.getName()
+                  : schema.getType().getName();
+          fieldNameToId.put(name, fieldId);
+        }
+      }
+
+      if (deriveNameMapping && union.getObjectProp(AVRO_FIELD_NAME_TO_ICEBERG_ID) == null) {
+        union.addProp(AVRO_FIELD_NAME_TO_ICEBERG_ID, fieldNameToId);

Review Comment:
   This class is not expected to change the input schema.



##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -82,11 +83,45 @@ private static <T> T visitRecord(
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit((Type) null, branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+
+    // simple union case
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else { // complex union case
+      Preconditions.checkArgument(
+          type instanceof Types.StructType,
+          "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s",
+          type,
+          union);
+      Map<String, Integer> fieldNameToId =
+          (Map) union.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID);
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          String name =
+              branch.getType().equals(Schema.Type.RECORD)
+                  ? branch.getName()
+                  : branch.getType().getName();
+          if (fieldNameToId.containsKey(name)) {
+            int fieldId = fieldNameToId.get(name);
+            Types.NestedField branchType = type.asStructType().field(fieldId);
+            if (branchType != null) {
+              options.add(visit(branchType.type(), branch, visitor));
+            } else {
+              Type pseudoBranchType = AvroSchemaUtil.convert(branch);
+              options.add(visit(pseudoBranchType, branch, visitor));
+            }

Review Comment:
   Good to add comments to the if/else branches.



##########
core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java:
##########
@@ -60,6 +60,7 @@ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
   @Override
   public void setSchema(Schema newFileSchema) {
     this.fileSchema = newFileSchema;
+    AvroSchemaUtil.convertToDeriveNameMapping(this.fileSchema);
     if (nameMapping == null && !AvroSchemaUtil.hasIds(fileSchema)) {
       nameMapping = MappingUtil.create(expectedSchema);

Review Comment:
   I was under the impression that we would add name/type name information to this map. For example ("int" -> 2, "com.my.namespace.MyRecord" -> 3). @rdblue, what do you think?



##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -82,11 +83,45 @@ private static <T> T visitRecord(
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit((Type) null, branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+
+    // simple union case
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else { // complex union case
+      Preconditions.checkArgument(
+          type instanceof Types.StructType,
+          "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s",
+          type,
+          union);
+      Map<String, Integer> fieldNameToId =
+          (Map) union.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID);
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          String name =
+              branch.getType().equals(Schema.Type.RECORD)
+                  ? branch.getName()
+                  : branch.getType().getName();

Review Comment:
   You might add a comment explaining the logic here.



##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -82,11 +83,45 @@ private static <T> T visitRecord(
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit((Type) null, branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+
+    // simple union case
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else { // complex union case
+      Preconditions.checkArgument(
+          type instanceof Types.StructType,
+          "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s",
+          type,
+          union);
+      Map<String, Integer> fieldNameToId =
+          (Map) union.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID);
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          String name =
+              branch.getType().equals(Schema.Type.RECORD)
+                  ? branch.getName()
+                  : branch.getType().getName();
+          if (fieldNameToId.containsKey(name)) {
+            int fieldId = fieldNameToId.get(name);
+            Types.NestedField branchType = type.asStructType().field(fieldId);

Review Comment:
   The output here does not sound to be a branch type, correct? This is a field. You might consider renaming to something like `expectedSchemaStructField`. You might as well fold the previous line to this line.



##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -82,11 +83,45 @@ private static <T> T visitRecord(
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit((Type) null, branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+
+    // simple union case
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else { // complex union case
+      Preconditions.checkArgument(
+          type instanceof Types.StructType,
+          "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s",
+          type,
+          union);
+      Map<String, Integer> fieldNameToId =
+          (Map) union.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID);
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          String name =
+              branch.getType().equals(Schema.Type.RECORD)
+                  ? branch.getName()
+                  : branch.getType().getName();

Review Comment:
   You might add a comment explaining the logic here.



##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -82,11 +83,45 @@ private static <T> T visitRecord(
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();

Review Comment:
   Terminology is a bit confusing (types, branches, options, etc). I would try to qualify them by the source, e.g., renaming `types` to `avroUnionBranches`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wmoustafa commented on a diff in pull request #5704: Support non-optional union types and column projection in complex union for Avro

Posted by GitBox <gi...@apache.org>.
wmoustafa commented on code in PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#discussion_r979284252


##########
core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java:
##########
@@ -60,6 +60,7 @@ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
   @Override
   public void setSchema(Schema newFileSchema) {
     this.fileSchema = newFileSchema;
+    AvroSchemaUtil.convertToDeriveNameMapping(this.fileSchema);
     if (nameMapping == null && !AvroSchemaUtil.hasIds(fileSchema)) {
       nameMapping = MappingUtil.create(expectedSchema);

Review Comment:
   I was under the impression that we would add name/type name information to the `nameMapping` maps. For example ("int" -> 2, "com.my.namespace.MyRecord" -> 3). @rdblue, what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org