You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/07 03:41:47 UTC

[GitHub] [iceberg] JingsongLi opened a new pull request #1173: FlinkTypeVisitor: Use LogicalTypeVisitor and supports MultisetType

JingsongLi opened a new pull request #1173:
URL: https://github.com/apache/iceberg/pull/1173


   This PR wants to import https://github.com/apache/iceberg/pull/1096
   
   ### Use `LogicalTypeVisitor`
   Flink has `LogicalTypeVisitor` and `DataTypeVisitor`, they are very useful for visiting types. We don't implement a custom visitor based on `instanceOf`, it's error prone and not very elegant.
   And for FieldsDataType, it not has a good design in 1.9 and 1.10, so in Flink 1.11, it has been refactored to be removed getFieldDataTypes. So I think maybe a LogicalTypeVisitor is enough, since we never touch the physical information in the DataTypes.
   
   ### Support MultisetType
   A `CollectionDataType` may be `MultisetType` too. We can map it to Map<T, Integer>.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1173: FlinkTypeVisitor: Use LogicalTypeVisitor and supports MultisetType

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1173:
URL: https://github.com/apache/iceberg/pull/1173#discussion_r455467964



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
##########
@@ -54,21 +54,22 @@ public void testConvertFlinkSchemaToIcebergSchema() {
         .field("decimal", DataTypes.DECIMAL(2, 2))
         .field("decimal2", DataTypes.DECIMAL(38, 2))
         .field("decimal3", DataTypes.DECIMAL(10, 1))
+        .field("multiset", DataTypes.MULTISET(DataTypes.STRING().notNull()))

Review comment:
       Null values are OK, the problem is null keys.
   For null keys support, looks like formats are OK, the only constraint of formats is that Avro only support string key of map type.
   But the thing is that whether we have any special optimizations for not null. The answer is yes, see `ParquetValueWriters.option`. If a null key comes to parquet writer, I think there should be `NullPointException`. This looks not so elegant.
   
   Another choice is what I said in https://github.com/apache/iceberg/pull/1096/files/8891cd5438306f0b4b226706058beff7c3cd4080#diff-12a375418217cdc6be26c73e02d56065R102 
   We can throw a `UnsupportedException` here to tell users, although Flink has default nullable map key.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1173: FlinkTypeVisitor: Use LogicalTypeVisitor and supports MultisetType

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1173:
URL: https://github.com/apache/iceberg/pull/1173#discussion_r450714611



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTypeVisitor.java
##########
@@ -19,65 +19,63 @@
 
 package org.apache.iceberg.flink;
 
-import java.util.List;
-import java.util.Map;
-import org.apache.flink.table.types.AtomicDataType;
-import org.apache.flink.table.types.CollectionDataType;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
-import org.apache.flink.table.types.KeyValueDataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
 
-public class FlinkTypeVisitor<T> {
+public abstract class FlinkTypeVisitor<T> implements LogicalTypeVisitor<T> {

Review comment:
       @JingsongLi I'm curious that what's the difference between the  flink style `LogicalTypeVisitor`  and iceberg style visitor... Currently, all of the visitor are iceberg style,  I'm not quite sure that what's the benifits to convert it to flink style visitor ... 
   
   Update:  OK, I read the background in this issues here (https://github.com/apache/iceberg/pull/1173#issue-445128002), sounds reasonable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1173: FlinkTypeVisitor: Use LogicalTypeVisitor and supports MultisetType

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1173:
URL: https://github.com/apache/iceberg/pull/1173#discussion_r453401652



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java
##########
@@ -64,86 +63,136 @@ private int getNextId() {
   }
 
   @Override
-  public Type fields(FieldsDataType fields, List<Type> types) {
-    List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(types.size());
-    boolean isRoot = root == fields;
+  public Type visit(CharType charType) {
+    return Types.StringType.get();
+  }
 
-    List<RowType.RowField> rowFields = ((RowType) fields.getLogicalType()).getFields();
-    Preconditions.checkArgument(rowFields.size() == types.size(), "fields list and types list should have same size.");
+  @Override
+  public Type visit(VarCharType varCharType) {
+    return Types.StringType.get();
+  }
 
-    for (int i = 0; i < rowFields.size(); i++) {
-      int id = isRoot ? i : getNextId();
+  @Override
+  public Type visit(BooleanType booleanType) {
+    return Types.BooleanType.get();
+  }
 
-      RowType.RowField field = rowFields.get(i);
-      String name = field.getName();
-      String comment = field.getDescription().orElse(null);
+  @Override
+  public Type visit(BinaryType binaryType) {
+    return Types.FixedType.ofLength(binaryType.getLength());
+  }
 
-      if (field.getType().isNullable()) {
-        newFields.add(Types.NestedField.optional(id, name, types.get(i), comment));
-      } else {
-        newFields.add(Types.NestedField.required(id, name, types.get(i), comment));
-      }
-    }
+  @Override
+  public Type visit(VarBinaryType varBinaryType) {
+    return Types.BinaryType.get();
+  }
 
-    return Types.StructType.of(newFields);
+  @Override
+  public Type visit(DecimalType decimalType) {
+    return Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale());
+  }
+
+  @Override
+  public Type visit(TinyIntType tinyIntType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(SmallIntType smallIntType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(IntType intType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(BigIntType bigIntType) {
+    return Types.LongType.get();
+  }
+
+  @Override
+  public Type visit(FloatType floatType) {
+    return Types.FloatType.get();
+  }
+
+  @Override
+  public Type visit(DoubleType doubleType) {
+    return Types.DoubleType.get();
+  }
+
+  @Override
+  public Type visit(DateType dateType) {
+    return Types.DateType.get();
+  }
+
+  @Override
+  public Type visit(TimeType timeType) {
+    return Types.TimeType.get();
   }
 
   @Override
-  public Type collection(CollectionDataType collection, Type elementType) {
-    if (collection.getElementDataType().getLogicalType().isNullable()) {
+  public Type visit(TimestampType timestampType) {
+    return Types.TimestampType.withoutZone();
+  }
+
+  @Override
+  public Type visit(LocalZonedTimestampType localZonedTimestampType) {
+    return Types.TimestampType.withZone();
+  }
+
+  @Override
+  public Type visit(ArrayType arrayType) {
+    Type elementType = arrayType.getElementType().accept(this);
+    if (arrayType.getElementType().isNullable()) {
       return Types.ListType.ofOptional(getNextId(), elementType);
     } else {
       return Types.ListType.ofRequired(getNextId(), elementType);
     }
   }
 
   @Override
-  public Type map(KeyValueDataType map, Type keyType, Type valueType) {
+  public Type visit(MultisetType multisetType) {
+    Type elementType = multisetType.getElementType().accept(this);
+    return Types.MapType.ofRequired(getNextId(), getNextId(), elementType, Types.IntegerType.get());
+  }
+
+  @Override
+  public Type visit(MapType mapType) {
     // keys in map are not allowed to be null.
-    if (map.getValueDataType().getLogicalType().isNullable()) {
+    Type keyType = mapType.getKeyType().accept(this);
+    Type valueType = mapType.getValueType().accept(this);
+    if (mapType.getValueType().isNullable()) {
       return Types.MapType.ofOptional(getNextId(), getNextId(), keyType, valueType);
     } else {
       return Types.MapType.ofRequired(getNextId(), getNextId(), keyType, valueType);
     }
   }
 
-  @SuppressWarnings("checkstyle:CyclomaticComplexity")
-  @Override
-  public Type atomic(AtomicDataType type) {
-    LogicalType inner = type.getLogicalType();
-    if (inner instanceof VarCharType ||
-        inner instanceof CharType) {
-      return Types.StringType.get();
-    } else if (inner instanceof BooleanType) {
-      return Types.BooleanType.get();
-    } else if (inner instanceof IntType ||
-        inner instanceof SmallIntType ||
-        inner instanceof TinyIntType) {
-      return Types.IntegerType.get();
-    } else if (inner instanceof BigIntType) {
-      return Types.LongType.get();
-    } else if (inner instanceof VarBinaryType) {
-      return Types.BinaryType.get();
-    } else if (inner instanceof BinaryType) {
-      BinaryType binaryType = (BinaryType) inner;
-      return Types.FixedType.ofLength(binaryType.getLength());
-    } else if (inner instanceof FloatType) {
-      return Types.FloatType.get();
-    } else if (inner instanceof DoubleType) {
-      return Types.DoubleType.get();
-    } else if (inner instanceof DateType) {
-      return Types.DateType.get();
-    } else if (inner instanceof TimeType) {
-      return Types.TimeType.get();
-    } else if (inner instanceof TimestampType) {
-      return Types.TimestampType.withoutZone();
-    } else if (inner instanceof LocalZonedTimestampType) {
-      return Types.TimestampType.withZone();
-    } else if (inner instanceof DecimalType) {
-      DecimalType decimalType = (DecimalType) inner;
-      return Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale());
-    } else {
-      throw new UnsupportedOperationException("Not a supported type: " + type.toString());
+  @Override
+  public Type visit(RowType rowType) {
+    List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(rowType.getFieldCount());
+    boolean isRoot = root == rowType;
+
+    List<Type> types = rowType.getFields().stream()

Review comment:
       I'm OK about the current twice loop here now, let's just keep the consistence id generation with spark here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1173: FlinkTypeVisitor: Use LogicalTypeVisitor and supports MultisetType

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1173:
URL: https://github.com/apache/iceberg/pull/1173#discussion_r454503006



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java
##########
@@ -64,86 +63,136 @@ private int getNextId() {
   }
 
   @Override
-  public Type fields(FieldsDataType fields, List<Type> types) {
-    List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(types.size());
-    boolean isRoot = root == fields;
+  public Type visit(CharType charType) {
+    return Types.StringType.get();
+  }
 
-    List<RowType.RowField> rowFields = ((RowType) fields.getLogicalType()).getFields();
-    Preconditions.checkArgument(rowFields.size() == types.size(), "fields list and types list should have same size.");
+  @Override
+  public Type visit(VarCharType varCharType) {
+    return Types.StringType.get();
+  }
 
-    for (int i = 0; i < rowFields.size(); i++) {
-      int id = isRoot ? i : getNextId();
+  @Override
+  public Type visit(BooleanType booleanType) {
+    return Types.BooleanType.get();
+  }
 
-      RowType.RowField field = rowFields.get(i);
-      String name = field.getName();
-      String comment = field.getDescription().orElse(null);
+  @Override
+  public Type visit(BinaryType binaryType) {
+    return Types.FixedType.ofLength(binaryType.getLength());
+  }
 
-      if (field.getType().isNullable()) {
-        newFields.add(Types.NestedField.optional(id, name, types.get(i), comment));
-      } else {
-        newFields.add(Types.NestedField.required(id, name, types.get(i), comment));
-      }
-    }
+  @Override
+  public Type visit(VarBinaryType varBinaryType) {
+    return Types.BinaryType.get();
+  }
 
-    return Types.StructType.of(newFields);
+  @Override
+  public Type visit(DecimalType decimalType) {
+    return Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale());
+  }
+
+  @Override
+  public Type visit(TinyIntType tinyIntType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(SmallIntType smallIntType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(IntType intType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(BigIntType bigIntType) {
+    return Types.LongType.get();
+  }
+
+  @Override
+  public Type visit(FloatType floatType) {
+    return Types.FloatType.get();
+  }
+
+  @Override
+  public Type visit(DoubleType doubleType) {
+    return Types.DoubleType.get();
+  }
+
+  @Override
+  public Type visit(DateType dateType) {
+    return Types.DateType.get();
+  }
+
+  @Override
+  public Type visit(TimeType timeType) {
+    return Types.TimeType.get();
   }
 
   @Override
-  public Type collection(CollectionDataType collection, Type elementType) {
-    if (collection.getElementDataType().getLogicalType().isNullable()) {
+  public Type visit(TimestampType timestampType) {
+    return Types.TimestampType.withoutZone();
+  }
+
+  @Override
+  public Type visit(LocalZonedTimestampType localZonedTimestampType) {
+    return Types.TimestampType.withZone();
+  }
+
+  @Override
+  public Type visit(ArrayType arrayType) {
+    Type elementType = arrayType.getElementType().accept(this);
+    if (arrayType.getElementType().isNullable()) {
       return Types.ListType.ofOptional(getNextId(), elementType);
     } else {
       return Types.ListType.ofRequired(getNextId(), elementType);
     }
   }
 
   @Override
-  public Type map(KeyValueDataType map, Type keyType, Type valueType) {
+  public Type visit(MultisetType multisetType) {
+    Type elementType = multisetType.getElementType().accept(this);
+    return Types.MapType.ofRequired(getNextId(), getNextId(), elementType, Types.IntegerType.get());
+  }
+
+  @Override
+  public Type visit(MapType mapType) {
     // keys in map are not allowed to be null.
-    if (map.getValueDataType().getLogicalType().isNullable()) {
+    Type keyType = mapType.getKeyType().accept(this);
+    Type valueType = mapType.getValueType().accept(this);
+    if (mapType.getValueType().isNullable()) {
       return Types.MapType.ofOptional(getNextId(), getNextId(), keyType, valueType);
     } else {
       return Types.MapType.ofRequired(getNextId(), getNextId(), keyType, valueType);
     }
   }
 
-  @SuppressWarnings("checkstyle:CyclomaticComplexity")
-  @Override
-  public Type atomic(AtomicDataType type) {
-    LogicalType inner = type.getLogicalType();
-    if (inner instanceof VarCharType ||
-        inner instanceof CharType) {
-      return Types.StringType.get();
-    } else if (inner instanceof BooleanType) {
-      return Types.BooleanType.get();
-    } else if (inner instanceof IntType ||
-        inner instanceof SmallIntType ||
-        inner instanceof TinyIntType) {
-      return Types.IntegerType.get();
-    } else if (inner instanceof BigIntType) {
-      return Types.LongType.get();
-    } else if (inner instanceof VarBinaryType) {
-      return Types.BinaryType.get();
-    } else if (inner instanceof BinaryType) {
-      BinaryType binaryType = (BinaryType) inner;
-      return Types.FixedType.ofLength(binaryType.getLength());
-    } else if (inner instanceof FloatType) {
-      return Types.FloatType.get();
-    } else if (inner instanceof DoubleType) {
-      return Types.DoubleType.get();
-    } else if (inner instanceof DateType) {
-      return Types.DateType.get();
-    } else if (inner instanceof TimeType) {
-      return Types.TimeType.get();
-    } else if (inner instanceof TimestampType) {
-      return Types.TimestampType.withoutZone();
-    } else if (inner instanceof LocalZonedTimestampType) {
-      return Types.TimestampType.withZone();
-    } else if (inner instanceof DecimalType) {
-      DecimalType decimalType = (DecimalType) inner;
-      return Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale());
-    } else {
-      throw new UnsupportedOperationException("Not a supported type: " + type.toString());
+  @Override
+  public Type visit(RowType rowType) {
+    List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(rowType.getFieldCount());
+    boolean isRoot = root == rowType;
+
+    List<Type> types = rowType.getFields().stream()
+        .map(f -> f.getType().accept(this))
+        .collect(Collectors.toList());
+
+    for (int i = 0; i < rowType.getFieldCount(); i++) {
+      int id = isRoot ? i : getNextId();
+
+      RowType.RowField field = rowType.getFields().get(i);
+      String name = field.getName();
+      String comment = field.getDescription().orElse(null);
+
+      if (field.getType().isNullable()) {
+        newFields.add(Types.NestedField.optional(id, name, types.get(i), comment));

Review comment:
       There is also a factory method that accepts a nullability boolean, `NestedField.of`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1173: FlinkTypeVisitor: Use LogicalTypeVisitor and supports MultisetType

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1173:
URL: https://github.com/apache/iceberg/pull/1173#discussion_r452089050



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java
##########
@@ -64,86 +63,136 @@ private int getNextId() {
   }
 
   @Override
-  public Type fields(FieldsDataType fields, List<Type> types) {
-    List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(types.size());
-    boolean isRoot = root == fields;
+  public Type visit(CharType charType) {
+    return Types.StringType.get();
+  }
 
-    List<RowType.RowField> rowFields = ((RowType) fields.getLogicalType()).getFields();
-    Preconditions.checkArgument(rowFields.size() == types.size(), "fields list and types list should have same size.");
+  @Override
+  public Type visit(VarCharType varCharType) {
+    return Types.StringType.get();
+  }
 
-    for (int i = 0; i < rowFields.size(); i++) {
-      int id = isRoot ? i : getNextId();
+  @Override
+  public Type visit(BooleanType booleanType) {
+    return Types.BooleanType.get();
+  }
 
-      RowType.RowField field = rowFields.get(i);
-      String name = field.getName();
-      String comment = field.getDescription().orElse(null);
+  @Override
+  public Type visit(BinaryType binaryType) {
+    return Types.FixedType.ofLength(binaryType.getLength());
+  }
 
-      if (field.getType().isNullable()) {
-        newFields.add(Types.NestedField.optional(id, name, types.get(i), comment));
-      } else {
-        newFields.add(Types.NestedField.required(id, name, types.get(i), comment));
-      }
-    }
+  @Override
+  public Type visit(VarBinaryType varBinaryType) {
+    return Types.BinaryType.get();
+  }
 
-    return Types.StructType.of(newFields);
+  @Override
+  public Type visit(DecimalType decimalType) {
+    return Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale());
+  }
+
+  @Override
+  public Type visit(TinyIntType tinyIntType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(SmallIntType smallIntType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(IntType intType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(BigIntType bigIntType) {
+    return Types.LongType.get();
+  }
+
+  @Override
+  public Type visit(FloatType floatType) {
+    return Types.FloatType.get();
+  }
+
+  @Override
+  public Type visit(DoubleType doubleType) {
+    return Types.DoubleType.get();
+  }
+
+  @Override
+  public Type visit(DateType dateType) {
+    return Types.DateType.get();
+  }
+
+  @Override
+  public Type visit(TimeType timeType) {
+    return Types.TimeType.get();
   }
 
   @Override
-  public Type collection(CollectionDataType collection, Type elementType) {
-    if (collection.getElementDataType().getLogicalType().isNullable()) {
+  public Type visit(TimestampType timestampType) {
+    return Types.TimestampType.withoutZone();
+  }
+
+  @Override
+  public Type visit(LocalZonedTimestampType localZonedTimestampType) {
+    return Types.TimestampType.withZone();
+  }
+
+  @Override
+  public Type visit(ArrayType arrayType) {
+    Type elementType = arrayType.getElementType().accept(this);
+    if (arrayType.getElementType().isNullable()) {
       return Types.ListType.ofOptional(getNextId(), elementType);
     } else {
       return Types.ListType.ofRequired(getNextId(), elementType);
     }
   }
 
   @Override
-  public Type map(KeyValueDataType map, Type keyType, Type valueType) {
+  public Type visit(MultisetType multisetType) {
+    Type elementType = multisetType.getElementType().accept(this);
+    return Types.MapType.ofRequired(getNextId(), getNextId(), elementType, Types.IntegerType.get());
+  }
+
+  @Override
+  public Type visit(MapType mapType) {
     // keys in map are not allowed to be null.
-    if (map.getValueDataType().getLogicalType().isNullable()) {
+    Type keyType = mapType.getKeyType().accept(this);
+    Type valueType = mapType.getValueType().accept(this);
+    if (mapType.getValueType().isNullable()) {
       return Types.MapType.ofOptional(getNextId(), getNextId(), keyType, valueType);
     } else {
       return Types.MapType.ofRequired(getNextId(), getNextId(), keyType, valueType);
     }
   }
 
-  @SuppressWarnings("checkstyle:CyclomaticComplexity")
-  @Override
-  public Type atomic(AtomicDataType type) {
-    LogicalType inner = type.getLogicalType();
-    if (inner instanceof VarCharType ||
-        inner instanceof CharType) {
-      return Types.StringType.get();
-    } else if (inner instanceof BooleanType) {
-      return Types.BooleanType.get();
-    } else if (inner instanceof IntType ||
-        inner instanceof SmallIntType ||
-        inner instanceof TinyIntType) {
-      return Types.IntegerType.get();
-    } else if (inner instanceof BigIntType) {
-      return Types.LongType.get();
-    } else if (inner instanceof VarBinaryType) {
-      return Types.BinaryType.get();
-    } else if (inner instanceof BinaryType) {
-      BinaryType binaryType = (BinaryType) inner;
-      return Types.FixedType.ofLength(binaryType.getLength());
-    } else if (inner instanceof FloatType) {
-      return Types.FloatType.get();
-    } else if (inner instanceof DoubleType) {
-      return Types.DoubleType.get();
-    } else if (inner instanceof DateType) {
-      return Types.DateType.get();
-    } else if (inner instanceof TimeType) {
-      return Types.TimeType.get();
-    } else if (inner instanceof TimestampType) {
-      return Types.TimestampType.withoutZone();
-    } else if (inner instanceof LocalZonedTimestampType) {
-      return Types.TimestampType.withZone();
-    } else if (inner instanceof DecimalType) {
-      DecimalType decimalType = (DecimalType) inner;
-      return Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale());
-    } else {
-      throw new UnsupportedOperationException("Not a supported type: " + type.toString());
+  @Override
+  public Type visit(RowType rowType) {
+    List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(rowType.getFieldCount());
+    boolean isRoot = root == rowType;
+
+    List<Type> types = rowType.getFields().stream()

Review comment:
       Seems here we don't need to loop twice ( the first loop to get `List<Type>` and the next loop to get `List<Types.NestedField>` ). Could be simplified like the following: 
   
   ```java
     @Override
     public Type visit(RowType rowType) {
       List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(rowType.getFieldCount());
       boolean isRoot = root == rowType;
   
       for (int i = 0; i < rowType.getFieldCount(); i++) {
         int id = isRoot ? i : getNextId();
   
         RowType.RowField field = rowType.getFields().get(i);
         String name = field.getName();
         String comment = field.getDescription().orElse(null);
         Type type = field.getType().accept(this);
   
         if (field.getType().isNullable()) {
           newFields.add(Types.NestedField.optional(id, name, type, comment));
         } else {
           newFields.add(Types.NestedField.required(id, name, type, comment));
         }
       }
   
       return Types.StructType.of(newFields);
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1173: FlinkTypeVisitor: Use LogicalTypeVisitor and supports MultisetType

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1173:
URL: https://github.com/apache/iceberg/pull/1173#discussion_r454504046



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
##########
@@ -54,21 +54,22 @@ public void testConvertFlinkSchemaToIcebergSchema() {
         .field("decimal", DataTypes.DECIMAL(2, 2))
         .field("decimal2", DataTypes.DECIMAL(38, 2))
         .field("decimal3", DataTypes.DECIMAL(10, 1))
+        .field("multiset", DataTypes.MULTISET(DataTypes.STRING().notNull()))

Review comment:
       What happens for a multiset of nullable items?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1173: FlinkTypeVisitor: Use LogicalTypeVisitor and supports MultisetType

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1173:
URL: https://github.com/apache/iceberg/pull/1173#discussion_r455333259



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
##########
@@ -54,21 +54,22 @@ public void testConvertFlinkSchemaToIcebergSchema() {
         .field("decimal", DataTypes.DECIMAL(2, 2))
         .field("decimal2", DataTypes.DECIMAL(38, 2))
         .field("decimal3", DataTypes.DECIMAL(10, 1))
+        .field("multiset", DataTypes.MULTISET(DataTypes.STRING().notNull()))

Review comment:
       Okay, for rows that are passed to Iceberg that have null map keys or null values in a multiset, what should happen?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1173: FlinkTypeVisitor: Use LogicalTypeVisitor and supports MultisetType

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1173:
URL: https://github.com/apache/iceberg/pull/1173


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on pull request #1173: FlinkTypeVisitor: Use LogicalTypeVisitor and supports MultisetType

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #1173:
URL: https://github.com/apache/iceberg/pull/1173#issuecomment-654580954


   CC: @openinx @rdblue 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1173: FlinkTypeVisitor: Use LogicalTypeVisitor and supports MultisetType

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1173:
URL: https://github.com/apache/iceberg/pull/1173#discussion_r452096503



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java
##########
@@ -64,86 +63,136 @@ private int getNextId() {
   }
 
   @Override
-  public Type fields(FieldsDataType fields, List<Type> types) {
-    List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(types.size());
-    boolean isRoot = root == fields;
+  public Type visit(CharType charType) {
+    return Types.StringType.get();
+  }
 
-    List<RowType.RowField> rowFields = ((RowType) fields.getLogicalType()).getFields();
-    Preconditions.checkArgument(rowFields.size() == types.size(), "fields list and types list should have same size.");
+  @Override
+  public Type visit(VarCharType varCharType) {
+    return Types.StringType.get();
+  }
 
-    for (int i = 0; i < rowFields.size(); i++) {
-      int id = isRoot ? i : getNextId();
+  @Override
+  public Type visit(BooleanType booleanType) {
+    return Types.BooleanType.get();
+  }
 
-      RowType.RowField field = rowFields.get(i);
-      String name = field.getName();
-      String comment = field.getDescription().orElse(null);
+  @Override
+  public Type visit(BinaryType binaryType) {
+    return Types.FixedType.ofLength(binaryType.getLength());
+  }
 
-      if (field.getType().isNullable()) {
-        newFields.add(Types.NestedField.optional(id, name, types.get(i), comment));
-      } else {
-        newFields.add(Types.NestedField.required(id, name, types.get(i), comment));
-      }
-    }
+  @Override
+  public Type visit(VarBinaryType varBinaryType) {
+    return Types.BinaryType.get();
+  }
 
-    return Types.StructType.of(newFields);
+  @Override
+  public Type visit(DecimalType decimalType) {
+    return Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale());
+  }
+
+  @Override
+  public Type visit(TinyIntType tinyIntType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(SmallIntType smallIntType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(IntType intType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(BigIntType bigIntType) {
+    return Types.LongType.get();
+  }
+
+  @Override
+  public Type visit(FloatType floatType) {
+    return Types.FloatType.get();
+  }
+
+  @Override
+  public Type visit(DoubleType doubleType) {
+    return Types.DoubleType.get();
+  }
+
+  @Override
+  public Type visit(DateType dateType) {
+    return Types.DateType.get();
+  }
+
+  @Override
+  public Type visit(TimeType timeType) {
+    return Types.TimeType.get();
   }
 
   @Override
-  public Type collection(CollectionDataType collection, Type elementType) {
-    if (collection.getElementDataType().getLogicalType().isNullable()) {
+  public Type visit(TimestampType timestampType) {
+    return Types.TimestampType.withoutZone();
+  }
+
+  @Override
+  public Type visit(LocalZonedTimestampType localZonedTimestampType) {
+    return Types.TimestampType.withZone();
+  }
+
+  @Override
+  public Type visit(ArrayType arrayType) {
+    Type elementType = arrayType.getElementType().accept(this);
+    if (arrayType.getElementType().isNullable()) {
       return Types.ListType.ofOptional(getNextId(), elementType);
     } else {
       return Types.ListType.ofRequired(getNextId(), elementType);
     }
   }
 
   @Override
-  public Type map(KeyValueDataType map, Type keyType, Type valueType) {
+  public Type visit(MultisetType multisetType) {
+    Type elementType = multisetType.getElementType().accept(this);
+    return Types.MapType.ofRequired(getNextId(), getNextId(), elementType, Types.IntegerType.get());
+  }
+
+  @Override
+  public Type visit(MapType mapType) {
     // keys in map are not allowed to be null.
-    if (map.getValueDataType().getLogicalType().isNullable()) {
+    Type keyType = mapType.getKeyType().accept(this);
+    Type valueType = mapType.getValueType().accept(this);
+    if (mapType.getValueType().isNullable()) {
       return Types.MapType.ofOptional(getNextId(), getNextId(), keyType, valueType);
     } else {
       return Types.MapType.ofRequired(getNextId(), getNextId(), keyType, valueType);
     }
   }
 
-  @SuppressWarnings("checkstyle:CyclomaticComplexity")
-  @Override
-  public Type atomic(AtomicDataType type) {
-    LogicalType inner = type.getLogicalType();
-    if (inner instanceof VarCharType ||
-        inner instanceof CharType) {
-      return Types.StringType.get();
-    } else if (inner instanceof BooleanType) {
-      return Types.BooleanType.get();
-    } else if (inner instanceof IntType ||
-        inner instanceof SmallIntType ||
-        inner instanceof TinyIntType) {
-      return Types.IntegerType.get();
-    } else if (inner instanceof BigIntType) {
-      return Types.LongType.get();
-    } else if (inner instanceof VarBinaryType) {
-      return Types.BinaryType.get();
-    } else if (inner instanceof BinaryType) {
-      BinaryType binaryType = (BinaryType) inner;
-      return Types.FixedType.ofLength(binaryType.getLength());
-    } else if (inner instanceof FloatType) {
-      return Types.FloatType.get();
-    } else if (inner instanceof DoubleType) {
-      return Types.DoubleType.get();
-    } else if (inner instanceof DateType) {
-      return Types.DateType.get();
-    } else if (inner instanceof TimeType) {
-      return Types.TimeType.get();
-    } else if (inner instanceof TimestampType) {
-      return Types.TimestampType.withoutZone();
-    } else if (inner instanceof LocalZonedTimestampType) {
-      return Types.TimestampType.withZone();
-    } else if (inner instanceof DecimalType) {
-      DecimalType decimalType = (DecimalType) inner;
-      return Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale());
-    } else {
-      throw new UnsupportedOperationException("Not a supported type: " + type.toString());
+  @Override
+  public Type visit(RowType rowType) {
+    List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(rowType.getFieldCount());
+    boolean isRoot = root == rowType;
+
+    List<Type> types = rowType.getFields().stream()

Review comment:
       One thing is :  we may adjust the place to generate field Id for nested types, then we may need to adjust the unit test .. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1173: FlinkTypeVisitor: Use LogicalTypeVisitor and supports MultisetType

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1173:
URL: https://github.com/apache/iceberg/pull/1173#issuecomment-658296370


   Overall, looks good to me. I'll merge this. Thanks @JingsongLi, I think the logical type visitor looks clean. And thanks to @openinx for reviewing!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1173: FlinkTypeVisitor: Use LogicalTypeVisitor and supports MultisetType

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1173:
URL: https://github.com/apache/iceberg/pull/1173#discussion_r454744290



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
##########
@@ -54,21 +54,22 @@ public void testConvertFlinkSchemaToIcebergSchema() {
         .field("decimal", DataTypes.DECIMAL(2, 2))
         .field("decimal2", DataTypes.DECIMAL(38, 2))
         .field("decimal3", DataTypes.DECIMAL(10, 1))
+        .field("multiset", DataTypes.MULTISET(DataTypes.STRING().notNull()))

Review comment:
       Just like a nullable key in Map, because the default behavior in Flink is nullable key, we support its conversion:
   - in the conversion of Flink type to Iceberg type, just ignore the nullable of key.
   - in the conversion of Iceberg type to Flink type, the nullable of key becomes false.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1173: FlinkTypeVisitor: Use LogicalTypeVisitor and supports MultisetType

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1173:
URL: https://github.com/apache/iceberg/pull/1173#discussion_r453393688



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java
##########
@@ -64,86 +63,136 @@ private int getNextId() {
   }
 
   @Override
-  public Type fields(FieldsDataType fields, List<Type> types) {
-    List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(types.size());
-    boolean isRoot = root == fields;
+  public Type visit(CharType charType) {
+    return Types.StringType.get();
+  }
 
-    List<RowType.RowField> rowFields = ((RowType) fields.getLogicalType()).getFields();
-    Preconditions.checkArgument(rowFields.size() == types.size(), "fields list and types list should have same size.");
+  @Override
+  public Type visit(VarCharType varCharType) {
+    return Types.StringType.get();
+  }
 
-    for (int i = 0; i < rowFields.size(); i++) {
-      int id = isRoot ? i : getNextId();
+  @Override
+  public Type visit(BooleanType booleanType) {
+    return Types.BooleanType.get();
+  }
 
-      RowType.RowField field = rowFields.get(i);
-      String name = field.getName();
-      String comment = field.getDescription().orElse(null);
+  @Override
+  public Type visit(BinaryType binaryType) {
+    return Types.FixedType.ofLength(binaryType.getLength());
+  }
 
-      if (field.getType().isNullable()) {
-        newFields.add(Types.NestedField.optional(id, name, types.get(i), comment));
-      } else {
-        newFields.add(Types.NestedField.required(id, name, types.get(i), comment));
-      }
-    }
+  @Override
+  public Type visit(VarBinaryType varBinaryType) {
+    return Types.BinaryType.get();
+  }
 
-    return Types.StructType.of(newFields);
+  @Override
+  public Type visit(DecimalType decimalType) {
+    return Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale());
+  }
+
+  @Override
+  public Type visit(TinyIntType tinyIntType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(SmallIntType smallIntType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(IntType intType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(BigIntType bigIntType) {
+    return Types.LongType.get();
+  }
+
+  @Override
+  public Type visit(FloatType floatType) {
+    return Types.FloatType.get();
+  }
+
+  @Override
+  public Type visit(DoubleType doubleType) {
+    return Types.DoubleType.get();
+  }
+
+  @Override
+  public Type visit(DateType dateType) {
+    return Types.DateType.get();
+  }
+
+  @Override
+  public Type visit(TimeType timeType) {
+    return Types.TimeType.get();
   }
 
   @Override
-  public Type collection(CollectionDataType collection, Type elementType) {
-    if (collection.getElementDataType().getLogicalType().isNullable()) {
+  public Type visit(TimestampType timestampType) {
+    return Types.TimestampType.withoutZone();
+  }
+
+  @Override
+  public Type visit(LocalZonedTimestampType localZonedTimestampType) {
+    return Types.TimestampType.withZone();
+  }
+
+  @Override
+  public Type visit(ArrayType arrayType) {
+    Type elementType = arrayType.getElementType().accept(this);
+    if (arrayType.getElementType().isNullable()) {
       return Types.ListType.ofOptional(getNextId(), elementType);
     } else {
       return Types.ListType.ofRequired(getNextId(), elementType);
     }
   }
 
   @Override
-  public Type map(KeyValueDataType map, Type keyType, Type valueType) {
+  public Type visit(MultisetType multisetType) {
+    Type elementType = multisetType.getElementType().accept(this);
+    return Types.MapType.ofRequired(getNextId(), getNextId(), elementType, Types.IntegerType.get());
+  }
+
+  @Override
+  public Type visit(MapType mapType) {
     // keys in map are not allowed to be null.
-    if (map.getValueDataType().getLogicalType().isNullable()) {
+    Type keyType = mapType.getKeyType().accept(this);
+    Type valueType = mapType.getValueType().accept(this);
+    if (mapType.getValueType().isNullable()) {
       return Types.MapType.ofOptional(getNextId(), getNextId(), keyType, valueType);
     } else {
       return Types.MapType.ofRequired(getNextId(), getNextId(), keyType, valueType);
     }
   }
 
-  @SuppressWarnings("checkstyle:CyclomaticComplexity")
-  @Override
-  public Type atomic(AtomicDataType type) {
-    LogicalType inner = type.getLogicalType();
-    if (inner instanceof VarCharType ||
-        inner instanceof CharType) {
-      return Types.StringType.get();
-    } else if (inner instanceof BooleanType) {
-      return Types.BooleanType.get();
-    } else if (inner instanceof IntType ||
-        inner instanceof SmallIntType ||
-        inner instanceof TinyIntType) {
-      return Types.IntegerType.get();
-    } else if (inner instanceof BigIntType) {
-      return Types.LongType.get();
-    } else if (inner instanceof VarBinaryType) {
-      return Types.BinaryType.get();
-    } else if (inner instanceof BinaryType) {
-      BinaryType binaryType = (BinaryType) inner;
-      return Types.FixedType.ofLength(binaryType.getLength());
-    } else if (inner instanceof FloatType) {
-      return Types.FloatType.get();
-    } else if (inner instanceof DoubleType) {
-      return Types.DoubleType.get();
-    } else if (inner instanceof DateType) {
-      return Types.DateType.get();
-    } else if (inner instanceof TimeType) {
-      return Types.TimeType.get();
-    } else if (inner instanceof TimestampType) {
-      return Types.TimestampType.withoutZone();
-    } else if (inner instanceof LocalZonedTimestampType) {
-      return Types.TimestampType.withZone();
-    } else if (inner instanceof DecimalType) {
-      DecimalType decimalType = (DecimalType) inner;
-      return Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale());
-    } else {
-      throw new UnsupportedOperationException("Not a supported type: " + type.toString());
+  @Override
+  public Type visit(RowType rowType) {
+    List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(rowType.getFieldCount());
+    boolean isRoot = root == rowType;
+
+    List<Type> types = rowType.getFields().stream()

Review comment:
       I'd prefer to keep the loop twice. If we need change the generation ID for nested types, I think it is better to change Spark too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1173: FlinkTypeVisitor: Use LogicalTypeVisitor and supports MultisetType

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1173:
URL: https://github.com/apache/iceberg/pull/1173#discussion_r450714611



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTypeVisitor.java
##########
@@ -19,65 +19,63 @@
 
 package org.apache.iceberg.flink;
 
-import java.util.List;
-import java.util.Map;
-import org.apache.flink.table.types.AtomicDataType;
-import org.apache.flink.table.types.CollectionDataType;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
-import org.apache.flink.table.types.KeyValueDataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
 
-public class FlinkTypeVisitor<T> {
+public abstract class FlinkTypeVisitor<T> implements LogicalTypeVisitor<T> {

Review comment:
       @JingsongLi I'm curious that what's the difference between the  flink style `LogicalTypeVisitor`  and iceberg style visitor... Currently, all of the visitor are iceberg style,  I'm not quite sure that what's the benifits to convert it to flink style visitor ... 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTypeVisitor.java
##########
@@ -19,65 +19,63 @@
 
 package org.apache.iceberg.flink;
 
-import java.util.List;
-import java.util.Map;
-import org.apache.flink.table.types.AtomicDataType;
-import org.apache.flink.table.types.CollectionDataType;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
-import org.apache.flink.table.types.KeyValueDataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
 
-public class FlinkTypeVisitor<T> {
+public abstract class FlinkTypeVisitor<T> implements LogicalTypeVisitor<T> {

Review comment:
       BTW, seems this `FlinkTypeVisitor` can be package access (I forget to check the access before).

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java
##########
@@ -64,86 +63,136 @@ private int getNextId() {
   }
 
   @Override
-  public Type fields(FieldsDataType fields, List<Type> types) {
-    List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(types.size());
-    boolean isRoot = root == fields;
+  public Type visit(CharType charType) {
+    return Types.StringType.get();
+  }
 
-    List<RowType.RowField> rowFields = ((RowType) fields.getLogicalType()).getFields();
-    Preconditions.checkArgument(rowFields.size() == types.size(), "fields list and types list should have same size.");
+  @Override
+  public Type visit(VarCharType varCharType) {
+    return Types.StringType.get();
+  }
 
-    for (int i = 0; i < rowFields.size(); i++) {
-      int id = isRoot ? i : getNextId();
+  @Override
+  public Type visit(BooleanType booleanType) {
+    return Types.BooleanType.get();
+  }
 
-      RowType.RowField field = rowFields.get(i);
-      String name = field.getName();
-      String comment = field.getDescription().orElse(null);
+  @Override
+  public Type visit(BinaryType binaryType) {
+    return Types.FixedType.ofLength(binaryType.getLength());
+  }
 
-      if (field.getType().isNullable()) {
-        newFields.add(Types.NestedField.optional(id, name, types.get(i), comment));
-      } else {
-        newFields.add(Types.NestedField.required(id, name, types.get(i), comment));
-      }
-    }
+  @Override
+  public Type visit(VarBinaryType varBinaryType) {
+    return Types.BinaryType.get();
+  }
 
-    return Types.StructType.of(newFields);
+  @Override
+  public Type visit(DecimalType decimalType) {
+    return Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale());
+  }
+
+  @Override
+  public Type visit(TinyIntType tinyIntType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(SmallIntType smallIntType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(IntType intType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(BigIntType bigIntType) {
+    return Types.LongType.get();
+  }
+
+  @Override
+  public Type visit(FloatType floatType) {
+    return Types.FloatType.get();
+  }
+
+  @Override
+  public Type visit(DoubleType doubleType) {
+    return Types.DoubleType.get();
+  }
+
+  @Override
+  public Type visit(DateType dateType) {
+    return Types.DateType.get();
+  }
+
+  @Override
+  public Type visit(TimeType timeType) {
+    return Types.TimeType.get();
   }
 
   @Override
-  public Type collection(CollectionDataType collection, Type elementType) {
-    if (collection.getElementDataType().getLogicalType().isNullable()) {
+  public Type visit(TimestampType timestampType) {
+    return Types.TimestampType.withoutZone();
+  }
+
+  @Override
+  public Type visit(LocalZonedTimestampType localZonedTimestampType) {
+    return Types.TimestampType.withZone();
+  }
+
+  @Override
+  public Type visit(ArrayType arrayType) {
+    Type elementType = arrayType.getElementType().accept(this);
+    if (arrayType.getElementType().isNullable()) {
       return Types.ListType.ofOptional(getNextId(), elementType);
     } else {
       return Types.ListType.ofRequired(getNextId(), elementType);
     }
   }
 
   @Override
-  public Type map(KeyValueDataType map, Type keyType, Type valueType) {
+  public Type visit(MultisetType multisetType) {
+    Type elementType = multisetType.getElementType().accept(this);
+    return Types.MapType.ofRequired(getNextId(), getNextId(), elementType, Types.IntegerType.get());

Review comment:
       Sounds good that we've extended support the flink `multiset` data type .




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org