You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/05/02 14:01:35 UTC

[GitHub] [iceberg] shardulm94 commented on a diff in pull request #4242: Support non-optional union types for Avro

shardulm94 commented on code in PR #4242:
URL: https://github.com/apache/iceberg/pull/4242#discussion_r862829516


##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -79,11 +79,31 @@ private static <T> T visitRecord(Types.StructType struct, Schema record, AvroSch
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit((Type) null, branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+
+    // simple union case
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else { // complex union case
+      Preconditions.checkArgument(type instanceof Types.StructType,
+          "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s", type, union);
+
+
+      int index = 1;
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          Preconditions.checkState(type.asStructType().fields().size() > index,
+              "Column projection on struct converted from Avro complex union type: %s is not supported", union);

Review Comment:
   Can we put this check to any of the column projection related class like `PruneColumns` or `BuildAvroProjection`? This shouldn't be in the visitor. 



##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -79,11 +79,31 @@ private static <T> T visitRecord(Types.StructType struct, Schema record, AvroSch
   private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit((Type) null, branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+
+    // simple union case
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else { // complex union case
+      Preconditions.checkArgument(type instanceof Types.StructType,
+          "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s", type, union);
+
+
+      int index = 1;

Review Comment:
   Can you add a comment to indicate that we start index from 1, because 0 is the `tag` field which doesn't exist in the original Avro schema.



##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java:
##########
@@ -52,8 +52,21 @@ public static <T> T visit(Schema schema, AvroSchemaVisitor<T> visitor) {
       case UNION:
         List<Schema> types = schema.getTypes();
         List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-        for (Schema type : types) {
-          options.add(visit(type, visitor));
+        if (AvroSchemaUtil.isOptionSchema(schema)) {
+          for (Schema type : types) {
+            options.add(visit(type, visitor));
+          }
+        } else {
+          // complex union case
+          int idx = 0;

Review Comment:
   Can you add a comment to indicate that this counts non-null indexes, or change the variable name to `nonNullIdx`?



##########
core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java:
##########
@@ -154,13 +154,13 @@ public Schema.Field field(Schema.Field field, Supplier<Schema> fieldResult) {
 
   @Override
   public Schema union(Schema union, Iterable<Schema> options) {
-    Preconditions.checkState(AvroSchemaUtil.isOptionSchema(union),
-        "Invalid schema: non-option unions are not supported: %s", union);
-    Schema nonNullOriginal = AvroSchemaUtil.fromOption(union);
-    Schema nonNullResult = AvroSchemaUtil.fromOptions(Lists.newArrayList(options));
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      Schema nonNullOriginal = AvroSchemaUtil.fromOption(union);
+      Schema nonNullResult = AvroSchemaUtil.fromOptions(Lists.newArrayList(options));
 
-    if (!Objects.equals(nonNullOriginal, nonNullResult)) {
-      return AvroSchemaUtil.toOption(nonNullResult);
+      if (!Objects.equals(nonNullOriginal, nonNullResult)) {
+        return AvroSchemaUtil.toOption(nonNullResult);
+      }
     }
 
     return union;

Review Comment:
   I think it is possible that the one or more of the union branch schemas has changed as a result of this visitor. E.g. a struct within the union can now have an extra field, or the data type of one of its fields promoted from `int` to `float`. To handle such a case, we should recreate the union schema from the visitor results rather than just returning the original union. Can you also add test cases for these scenarios?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -81,6 +83,10 @@ static ValueReader<InternalRow> struct(List<ValueReader<?>> readers, Types.Struc
     return new StructReader(readers, struct, idToConstant);
   }
 
+  static ValueReader<InternalRow> union(Schema schema, List<ValueReader<?>> readers) {
+    return new UnionReader(schema, readers);

Review Comment:
   Can we rename this method to `complexUnion` and the class to `ComplexUnionReader`?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +291,58 @@ protected void set(InternalRow struct, int pos, Object value) {
       }
     }
   }
+
+  private static class UnionReader implements ValueReader<InternalRow> {
+    private final Schema schema;
+    private final ValueReader[] readers;
+
+    private UnionReader(Schema schema, List<ValueReader<?>> readers) {
+      this.schema = schema;
+      this.readers = new ValueReader[readers.size()];
+      for (int i = 0; i < this.readers.length; i += 1) {
+        this.readers[i] = readers.get(i);
+      }
+    }
+
+    @Override
+    public InternalRow read(Decoder decoder, Object reuse) throws IOException {
+      // first we need to filter out NULL alternative if it exists in the union schema
+      int nullIndex = -1;
+      List<Schema> alts = schema.getTypes();
+      for (int i = 0; i < alts.size(); i++) {
+        Schema alt = alts.get(i);
+        if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+          nullIndex = i;
+          break;
+        }
+      }
+
+      int index = decoder.readIndex();
+      if (index == nullIndex) {
+        // if it is a null data, directly return null as the whole union result
+        return null;
+      }
+
+      // otherwise, we need to return an InternalRow as a struct data
+      InternalRow struct = new GenericInternalRow(nullIndex >= 0 ? alts.size() : alts.size() + 1);
+      for (int i = 0; i < struct.numFields(); i += 1) {
+        struct.setNullAt(i);
+      }
+
+      Object value = readers[index].read(decoder, reuse);
+
+      if (nullIndex < 0) {
+        struct.update(index + 1, value);
+        struct.setInt(0, index);
+      } else if (index < nullIndex) {
+        struct.update(index + 1, value);
+        struct.setInt(0, index);
+      } else {
+        struct.update(index, value);
+        struct.setInt(0, index - 1);
+      }

Review Comment:
   If we use `nullIndex = MAX_INT` to signify no nulls, we can simplify this code to:
   ```
   int outputFieldIndex = nullIndex < index ? index - 1 : index;
   struct.setInt(0, outputFieldIndex);
   struct.update(outputFieldIndex + 1, value); // add 1 to offset `tag` field
   ```



##########
core/src/main/java/org/apache/iceberg/avro/PruneColumns.java:
##########
@@ -323,4 +325,19 @@ private static Schema.Field copyField(Schema.Field field, Schema newSchema, Inte
   private static boolean isOptionSchemaWithNonNullFirstOption(Schema schema) {
     return AvroSchemaUtil.isOptionSchema(schema) && schema.getTypes().get(0).getType() != Schema.Type.NULL;
   }
+
+  // for primitive types, the visitResult will be null, we want to reuse the primitive types from the original
+  // schema, while for nested types, we want to use the visitResult because they have content from the previous
+  // recursive calls.
+  private static Schema copyUnion(Schema record, List<Schema> visitResults) {
+    List<Schema> alts = Lists.newArrayListWithExpectedSize(visitResults.size());

Review Comment:
   Instead of `alts`, can we use `branches`? This is consistent with other classes like `AvroSchemaVisitor` and `AvroSchemaWithTypeVisitor` and also the [Avro spec](https://avro.apache.org/docs/1.7.7/spec.html#:~:text=union%20data%20is%20first%20ordered%20by%20the%20branch%20within%20the%20union).



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +291,58 @@ protected void set(InternalRow struct, int pos, Object value) {
       }
     }
   }
+
+  private static class UnionReader implements ValueReader<InternalRow> {
+    private final Schema schema;
+    private final ValueReader[] readers;
+
+    private UnionReader(Schema schema, List<ValueReader<?>> readers) {
+      this.schema = schema;
+      this.readers = new ValueReader[readers.size()];
+      for (int i = 0; i < this.readers.length; i += 1) {
+        this.readers[i] = readers.get(i);
+      }
+    }
+
+    @Override
+    public InternalRow read(Decoder decoder, Object reuse) throws IOException {
+      // first we need to filter out NULL alternative if it exists in the union schema
+      int nullIndex = -1;
+      List<Schema> alts = schema.getTypes();
+      for (int i = 0; i < alts.size(); i++) {
+        Schema alt = alts.get(i);
+        if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+          nullIndex = i;
+          break;
+        }
+      }

Review Comment:
   Given that this code is not dependent on the arguments to `read`, we should be able to calculate this in the constructor once.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +291,58 @@ protected void set(InternalRow struct, int pos, Object value) {
       }
     }
   }
+
+  private static class UnionReader implements ValueReader<InternalRow> {
+    private final Schema schema;
+    private final ValueReader[] readers;
+
+    private UnionReader(Schema schema, List<ValueReader<?>> readers) {
+      this.schema = schema;
+      this.readers = new ValueReader[readers.size()];
+      for (int i = 0; i < this.readers.length; i += 1) {
+        this.readers[i] = readers.get(i);
+      }
+    }
+
+    @Override
+    public InternalRow read(Decoder decoder, Object reuse) throws IOException {
+      // first we need to filter out NULL alternative if it exists in the union schema
+      int nullIndex = -1;
+      List<Schema> alts = schema.getTypes();

Review Comment:
   Instead of `alts`, can we use `branches`? This is consistent with other classes like `AvroSchemaVisitor` and `AvroSchemaWithTypeVisitor` and also the [Avro spec](https://avro.apache.org/docs/1.7.7/spec.html#:~:text=union%20data%20is%20first%20ordered%20by%20the%20branch%20within%20the%20union).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org