You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/07 09:20:43 UTC

[GitHub] [iceberg] openinx commented on a change in pull request #1173: FlinkTypeVisitor: Use LogicalTypeVisitor and supports MultisetType

openinx commented on a change in pull request #1173:
URL: https://github.com/apache/iceberg/pull/1173#discussion_r450714611



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTypeVisitor.java
##########
@@ -19,65 +19,63 @@
 
 package org.apache.iceberg.flink;
 
-import java.util.List;
-import java.util.Map;
-import org.apache.flink.table.types.AtomicDataType;
-import org.apache.flink.table.types.CollectionDataType;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
-import org.apache.flink.table.types.KeyValueDataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
 
-public class FlinkTypeVisitor<T> {
+public abstract class FlinkTypeVisitor<T> implements LogicalTypeVisitor<T> {

Review comment:
       @JingsongLi I'm curious that what's the difference between the  flink style `LogicalTypeVisitor`  and iceberg style visitor... Currently, all of the visitor are iceberg style,  I'm not quite sure that what's the benifits to convert it to flink style visitor ... 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTypeVisitor.java
##########
@@ -19,65 +19,63 @@
 
 package org.apache.iceberg.flink;
 
-import java.util.List;
-import java.util.Map;
-import org.apache.flink.table.types.AtomicDataType;
-import org.apache.flink.table.types.CollectionDataType;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
-import org.apache.flink.table.types.KeyValueDataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
 
-public class FlinkTypeVisitor<T> {
+public abstract class FlinkTypeVisitor<T> implements LogicalTypeVisitor<T> {

Review comment:
       BTW, seems this `FlinkTypeVisitor` can be package access (I forget to check the access before).

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java
##########
@@ -64,86 +63,136 @@ private int getNextId() {
   }
 
   @Override
-  public Type fields(FieldsDataType fields, List<Type> types) {
-    List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(types.size());
-    boolean isRoot = root == fields;
+  public Type visit(CharType charType) {
+    return Types.StringType.get();
+  }
 
-    List<RowType.RowField> rowFields = ((RowType) fields.getLogicalType()).getFields();
-    Preconditions.checkArgument(rowFields.size() == types.size(), "fields list and types list should have same size.");
+  @Override
+  public Type visit(VarCharType varCharType) {
+    return Types.StringType.get();
+  }
 
-    for (int i = 0; i < rowFields.size(); i++) {
-      int id = isRoot ? i : getNextId();
+  @Override
+  public Type visit(BooleanType booleanType) {
+    return Types.BooleanType.get();
+  }
 
-      RowType.RowField field = rowFields.get(i);
-      String name = field.getName();
-      String comment = field.getDescription().orElse(null);
+  @Override
+  public Type visit(BinaryType binaryType) {
+    return Types.FixedType.ofLength(binaryType.getLength());
+  }
 
-      if (field.getType().isNullable()) {
-        newFields.add(Types.NestedField.optional(id, name, types.get(i), comment));
-      } else {
-        newFields.add(Types.NestedField.required(id, name, types.get(i), comment));
-      }
-    }
+  @Override
+  public Type visit(VarBinaryType varBinaryType) {
+    return Types.BinaryType.get();
+  }
 
-    return Types.StructType.of(newFields);
+  @Override
+  public Type visit(DecimalType decimalType) {
+    return Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale());
+  }
+
+  @Override
+  public Type visit(TinyIntType tinyIntType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(SmallIntType smallIntType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(IntType intType) {
+    return Types.IntegerType.get();
+  }
+
+  @Override
+  public Type visit(BigIntType bigIntType) {
+    return Types.LongType.get();
+  }
+
+  @Override
+  public Type visit(FloatType floatType) {
+    return Types.FloatType.get();
+  }
+
+  @Override
+  public Type visit(DoubleType doubleType) {
+    return Types.DoubleType.get();
+  }
+
+  @Override
+  public Type visit(DateType dateType) {
+    return Types.DateType.get();
+  }
+
+  @Override
+  public Type visit(TimeType timeType) {
+    return Types.TimeType.get();
   }
 
   @Override
-  public Type collection(CollectionDataType collection, Type elementType) {
-    if (collection.getElementDataType().getLogicalType().isNullable()) {
+  public Type visit(TimestampType timestampType) {
+    return Types.TimestampType.withoutZone();
+  }
+
+  @Override
+  public Type visit(LocalZonedTimestampType localZonedTimestampType) {
+    return Types.TimestampType.withZone();
+  }
+
+  @Override
+  public Type visit(ArrayType arrayType) {
+    Type elementType = arrayType.getElementType().accept(this);
+    if (arrayType.getElementType().isNullable()) {
       return Types.ListType.ofOptional(getNextId(), elementType);
     } else {
       return Types.ListType.ofRequired(getNextId(), elementType);
     }
   }
 
   @Override
-  public Type map(KeyValueDataType map, Type keyType, Type valueType) {
+  public Type visit(MultisetType multisetType) {
+    Type elementType = multisetType.getElementType().accept(this);
+    return Types.MapType.ofRequired(getNextId(), getNextId(), elementType, Types.IntegerType.get());

Review comment:
       Sounds good that we've extended support the flink `multiset` data type .




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org