You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/10/29 07:25:16 UTC

[GitHub] [iceberg] JingsongLi commented on a change in pull request #1391: Parquet: Add type visitor with partner type

JingsongLi commented on a change in pull request #1391:
URL: https://github.com/apache/iceberg/pull/1391#discussion_r514020451



##########
File path: flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -79,13 +79,17 @@ private FlinkParquetReaders() {
     }
 
     @Override
-    public ParquetValueReader<RowData> message(Types.StructType expected, MessageType message,
+    public ParquetValueReader<RowData> message(org.apache.iceberg.types.Type expected, MessageType message,
                                                List<ParquetValueReader<?>> fieldReaders) {
-      return struct(expected, message.asGroupType(), fieldReaders);
+      if (expected == null) {

Review comment:
       NIT: `return struct(expected == null ? null : expected.asStructType(), message.asGroupType(), fieldReaders);`

##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeWithPartnerVisitor.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.util.Deque;
+import java.util.List;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+public abstract class ParquetTypeWithPartnerVisitor<P, T> {
+  private final Deque<String> fieldNames = Lists.newLinkedList();
+
+  public static <P, T> T visit(P partnerType, Type type, ParquetTypeWithPartnerVisitor<P, T> visitor) {
+    if (type instanceof MessageType) {

Review comment:
       `Preconditions.checkNotNull(partnerType, "Invalid partnerType: null");`

##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeWithPartnerVisitor.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.util.Deque;
+import java.util.List;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+public abstract class ParquetTypeWithPartnerVisitor<P, T> {
+  private final Deque<String> fieldNames = Lists.newLinkedList();
+
+  public static <P, T> T visit(P partnerType, Type type, ParquetTypeWithPartnerVisitor<P, T> visitor) {
+    if (type instanceof MessageType) {
+      return visitor.message(partnerType, (MessageType) type, visitFields(partnerType, type.asGroupType(), visitor));
+    } else if (type.isPrimitive()) {
+      return visitor.primitive(partnerType, type.asPrimitiveType());
+    } else {
+      // if not a primitive, the typeId must be a group
+      GroupType group = type.asGroupType();
+      OriginalType annotation = group.getOriginalType();
+      if (annotation != null) {
+        switch (annotation) {
+          case LIST:
+            return visitList(partnerType, group, visitor);
+          case MAP:
+            return visitMap(partnerType, group, visitor);
+          default:
+        }
+      }
+      return visitor.struct(partnerType, group, visitFields(partnerType, group, visitor));
+    }
+  }
+
+  private static <P, T> T visitList(P list, GroupType group, ParquetTypeWithPartnerVisitor<P, T> visitor) {
+    Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED),
+        "Invalid list: top-level group is repeated: %s", group);
+    Preconditions.checkArgument(group.getFieldCount() == 1,
+        "Invalid list: does not contain single repeated field: %s", group);
+
+    GroupType repeatedElement = group.getFields().get(0).asGroupType();
+    Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED),
+        "Invalid list: inner group is not repeated");
+    Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1,
+        "Invalid list: repeated group is not a single field: %s", group);
+
+    visitor.beforeRepeatedElement(repeatedElement);
+    try {
+      T elementResult = null;
+      if (repeatedElement.getFieldCount() > 0) {
+        Type elementField = repeatedElement.getType(0);
+        try {
+          visitor.beforeElementField(elementField);
+          elementResult = visit(visitor.arrayElementType(list), elementField, visitor);
+        } finally {
+          visitor.afterElementField(elementField);
+        }
+      }
+      return visitor.list(list, group, elementResult);
+    } finally {
+      visitor.afterRepeatedElement(repeatedElement);
+    }
+  }
+
+  private static <P, T> T visitMap(P map, GroupType group, ParquetTypeWithPartnerVisitor<P, T> visitor) {
+    Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED),
+        "Invalid map: top-level group is repeated: %s", group);
+    Preconditions.checkArgument(group.getFieldCount() == 1,
+        "Invalid map: does not contain single repeated field: %s", group);
+
+    GroupType repeatedKeyValue = group.getType(0).asGroupType();
+    Preconditions.checkArgument(repeatedKeyValue.isRepetition(Type.Repetition.REPEATED),
+        "Invalid map: inner group is not repeated");
+    Preconditions.checkArgument(repeatedKeyValue.getFieldCount() <= 2,
+        "Invalid map: repeated group does not have 2 fields");
+
+    visitor.beforeRepeatedKeyValue(repeatedKeyValue);
+    try {
+      T keyResult = null;
+      T valueResult = null;
+      switch (repeatedKeyValue.getFieldCount()) {
+        case 2:
+          // if there are 2 fields, both key and value are projected
+          Type keyType = repeatedKeyValue.getType(0);
+          visitor.beforeKeyField(keyType);
+          try {
+            keyResult = visit(visitor.mapKeyType(map), keyType, visitor);
+          } finally {
+            visitor.afterKeyField(keyType);
+          }
+          Type valueType = repeatedKeyValue.getType(1);
+          visitor.beforeValueField(valueType);
+          try {
+            valueResult = visit(visitor.mapValueType(map), repeatedKeyValue.getType(1), visitor);
+          } finally {
+            visitor.afterValueField(valueType);
+          }
+          break;
+        case 1:
+          // if there is just one, use the name to determine what it is
+          Type keyOrValue = repeatedKeyValue.getType(0);
+          if (keyOrValue.getName().equalsIgnoreCase("key")) {
+            visitor.beforeKeyField(keyOrValue);
+            try {
+              keyResult = visit(visitor.mapKeyType(map), keyOrValue, visitor);
+            } finally {
+              visitor.afterKeyField(keyOrValue);
+            }
+            // value result remains null
+          } else {
+            visitor.beforeValueField(keyOrValue);
+            try {
+              valueResult = visit(visitor.mapValueType(map), keyOrValue, visitor);
+            } finally {
+              visitor.afterValueField(keyOrValue);
+            }
+            // key result remains null
+          }
+          break;
+        default:
+          // both results will remain null
+      }
+
+      return visitor.map(map, group, keyResult, valueResult);
+
+    } finally {
+      visitor.afterRepeatedKeyValue(repeatedKeyValue);
+    }
+  }
+
+  protected static <P, T> List<T> visitFields(P struct, GroupType group, ParquetTypeWithPartnerVisitor<P, T> visitor) {
+    List<T> results = Lists.newArrayListWithExpectedSize(group.getFieldCount());
+    for (int i = 0; i < group.getFieldCount(); i += 1) {
+      Type field = group.getFields().get(i);
+      visitor.beforeField(field);
+      Integer fieldId = field.getId() == null ? null : field.getId().intValue();
+      Pair<String, P> filedNameAndType = visitor.fieldNameAndType(struct, i, fieldId);
+      if (filedNameAndType != null) {
+        results.add(visit(filedNameAndType.second(), field, visitor));
+      } else {
+        results.add(visit(null, field, visitor));
+      }
+      visitor.afterField(field);
+    }
+
+    return results;
+  }
+
+  protected abstract P arrayElementType(P arrayType);
+  protected abstract P mapKeyType(P mapType);
+  protected abstract P mapValueType(P mapType);
+  protected abstract Pair<String, P> fieldNameAndType(P structType, int pos, Integer fieldId);

Review comment:
       Why we need `fieldNameAndType`? Looks like just type is OK?

##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeWithPartnerVisitor.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.util.Deque;
+import java.util.List;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+public abstract class ParquetTypeWithPartnerVisitor<P, T> {
+  private final Deque<String> fieldNames = Lists.newLinkedList();
+
+  public static <P, T> T visit(P partnerType, Type type, ParquetTypeWithPartnerVisitor<P, T> visitor) {
+    if (type instanceof MessageType) {
+      return visitor.message(partnerType, (MessageType) type, visitFields(partnerType, type.asGroupType(), visitor));
+    } else if (type.isPrimitive()) {
+      return visitor.primitive(partnerType, type.asPrimitiveType());
+    } else {
+      // if not a primitive, the typeId must be a group
+      GroupType group = type.asGroupType();
+      OriginalType annotation = group.getOriginalType();
+      if (annotation != null) {
+        switch (annotation) {
+          case LIST:
+            return visitList(partnerType, group, visitor);
+          case MAP:
+            return visitMap(partnerType, group, visitor);
+          default:
+        }
+      }
+      return visitor.struct(partnerType, group, visitFields(partnerType, group, visitor));
+    }
+  }
+
+  private static <P, T> T visitList(P list, GroupType group, ParquetTypeWithPartnerVisitor<P, T> visitor) {
+    Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED),
+        "Invalid list: top-level group is repeated: %s", group);
+    Preconditions.checkArgument(group.getFieldCount() == 1,
+        "Invalid list: does not contain single repeated field: %s", group);
+
+    GroupType repeatedElement = group.getFields().get(0).asGroupType();
+    Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED),
+        "Invalid list: inner group is not repeated");
+    Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1,
+        "Invalid list: repeated group is not a single field: %s", group);
+
+    visitor.beforeRepeatedElement(repeatedElement);
+    try {
+      T elementResult = null;
+      if (repeatedElement.getFieldCount() > 0) {
+        Type elementField = repeatedElement.getType(0);
+        try {
+          visitor.beforeElementField(elementField);
+          elementResult = visit(visitor.arrayElementType(list), elementField, visitor);
+        } finally {
+          visitor.afterElementField(elementField);
+        }
+      }
+      return visitor.list(list, group, elementResult);
+    } finally {
+      visitor.afterRepeatedElement(repeatedElement);
+    }
+  }
+
+  private static <P, T> T visitMap(P map, GroupType group, ParquetTypeWithPartnerVisitor<P, T> visitor) {
+    Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED),
+        "Invalid map: top-level group is repeated: %s", group);
+    Preconditions.checkArgument(group.getFieldCount() == 1,
+        "Invalid map: does not contain single repeated field: %s", group);
+
+    GroupType repeatedKeyValue = group.getType(0).asGroupType();
+    Preconditions.checkArgument(repeatedKeyValue.isRepetition(Type.Repetition.REPEATED),
+        "Invalid map: inner group is not repeated");
+    Preconditions.checkArgument(repeatedKeyValue.getFieldCount() <= 2,
+        "Invalid map: repeated group does not have 2 fields");
+
+    visitor.beforeRepeatedKeyValue(repeatedKeyValue);
+    try {
+      T keyResult = null;
+      T valueResult = null;
+      switch (repeatedKeyValue.getFieldCount()) {
+        case 2:
+          // if there are 2 fields, both key and value are projected
+          Type keyType = repeatedKeyValue.getType(0);
+          visitor.beforeKeyField(keyType);
+          try {
+            keyResult = visit(visitor.mapKeyType(map), keyType, visitor);
+          } finally {
+            visitor.afterKeyField(keyType);
+          }
+          Type valueType = repeatedKeyValue.getType(1);
+          visitor.beforeValueField(valueType);
+          try {
+            valueResult = visit(visitor.mapValueType(map), repeatedKeyValue.getType(1), visitor);
+          } finally {
+            visitor.afterValueField(valueType);
+          }
+          break;
+        case 1:
+          // if there is just one, use the name to determine what it is
+          Type keyOrValue = repeatedKeyValue.getType(0);
+          if (keyOrValue.getName().equalsIgnoreCase("key")) {
+            visitor.beforeKeyField(keyOrValue);
+            try {
+              keyResult = visit(visitor.mapKeyType(map), keyOrValue, visitor);
+            } finally {
+              visitor.afterKeyField(keyOrValue);
+            }
+            // value result remains null
+          } else {
+            visitor.beforeValueField(keyOrValue);
+            try {
+              valueResult = visit(visitor.mapValueType(map), keyOrValue, visitor);
+            } finally {
+              visitor.afterValueField(keyOrValue);
+            }
+            // key result remains null
+          }
+          break;
+        default:
+          // both results will remain null
+      }
+
+      return visitor.map(map, group, keyResult, valueResult);
+
+    } finally {
+      visitor.afterRepeatedKeyValue(repeatedKeyValue);
+    }
+  }
+
+  protected static <P, T> List<T> visitFields(P struct, GroupType group, ParquetTypeWithPartnerVisitor<P, T> visitor) {
+    List<T> results = Lists.newArrayListWithExpectedSize(group.getFieldCount());
+    for (int i = 0; i < group.getFieldCount(); i += 1) {
+      Type field = group.getFields().get(i);
+      visitor.beforeField(field);
+      Integer fieldId = field.getId() == null ? null : field.getId().intValue();
+      Pair<String, P> filedNameAndType = visitor.fieldNameAndType(struct, i, fieldId);
+      if (filedNameAndType != null) {
+        results.add(visit(filedNameAndType.second(), field, visitor));
+      } else {
+        results.add(visit(null, field, visitor));
+      }
+      visitor.afterField(field);
+    }
+
+    return results;
+  }
+
+  protected abstract P arrayElementType(P arrayType);
+  protected abstract P mapKeyType(P mapType);
+  protected abstract P mapValueType(P mapType);
+  protected abstract Pair<String, P> fieldNameAndType(P structType, int pos, Integer fieldId);

Review comment:
       `fieldId` -> `parquetFieldId`

##########
File path: flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
##########
@@ -68,15 +68,15 @@ private FlinkParquetWriters() {
     }
 
     @Override
-    public ParquetValueWriter<?> message(RowType sStruct, MessageType message, List<ParquetValueWriter<?>> fields) {
+    public ParquetValueWriter<?> message(LogicalType sStruct, MessageType message, List<ParquetValueWriter<?>> fields) {
       return struct(sStruct, message.asGroupType(), fields);
     }
 
     @Override
-    public ParquetValueWriter<?> struct(RowType sStruct, GroupType struct,
+    public ParquetValueWriter<?> struct(LogicalType fStruct, GroupType struct,
                                         List<ParquetValueWriter<?>> fieldWriters) {
       List<Type> fields = struct.getFields();
-      List<RowField> flinkFields = sStruct.getFields();
+      List<RowField> flinkFields = ((RowType) fStruct).getFields();

Review comment:
       Can we change to use `LogicalType.getChildren`?

##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeWithPartnerVisitor.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.util.Deque;
+import java.util.List;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+public abstract class ParquetTypeWithPartnerVisitor<P, T> {
+  private final Deque<String> fieldNames = Lists.newLinkedList();
+
+  public static <P, T> T visit(P partnerType, Type type, ParquetTypeWithPartnerVisitor<P, T> visitor) {
+    if (type instanceof MessageType) {
+      return visitor.message(partnerType, (MessageType) type, visitFields(partnerType, type.asGroupType(), visitor));
+    } else if (type.isPrimitive()) {
+      return visitor.primitive(partnerType, type.asPrimitiveType());
+    } else {
+      // if not a primitive, the typeId must be a group
+      GroupType group = type.asGroupType();
+      OriginalType annotation = group.getOriginalType();
+      if (annotation != null) {
+        switch (annotation) {
+          case LIST:
+            return visitList(partnerType, group, visitor);
+          case MAP:
+            return visitMap(partnerType, group, visitor);
+          default:
+        }
+      }
+      return visitor.struct(partnerType, group, visitFields(partnerType, group, visitor));
+    }
+  }
+
+  private static <P, T> T visitList(P list, GroupType group, ParquetTypeWithPartnerVisitor<P, T> visitor) {
+    Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED),
+        "Invalid list: top-level group is repeated: %s", group);
+    Preconditions.checkArgument(group.getFieldCount() == 1,
+        "Invalid list: does not contain single repeated field: %s", group);
+
+    GroupType repeatedElement = group.getFields().get(0).asGroupType();
+    Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED),
+        "Invalid list: inner group is not repeated");
+    Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1,
+        "Invalid list: repeated group is not a single field: %s", group);
+
+    visitor.beforeRepeatedElement(repeatedElement);
+    try {
+      T elementResult = null;
+      if (repeatedElement.getFieldCount() > 0) {
+        Type elementField = repeatedElement.getType(0);
+        try {
+          visitor.beforeElementField(elementField);
+          elementResult = visit(visitor.arrayElementType(list), elementField, visitor);
+        } finally {
+          visitor.afterElementField(elementField);
+        }
+      }
+      return visitor.list(list, group, elementResult);
+    } finally {
+      visitor.afterRepeatedElement(repeatedElement);
+    }
+  }
+
+  private static <P, T> T visitMap(P map, GroupType group, ParquetTypeWithPartnerVisitor<P, T> visitor) {
+    Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED),
+        "Invalid map: top-level group is repeated: %s", group);
+    Preconditions.checkArgument(group.getFieldCount() == 1,
+        "Invalid map: does not contain single repeated field: %s", group);
+
+    GroupType repeatedKeyValue = group.getType(0).asGroupType();
+    Preconditions.checkArgument(repeatedKeyValue.isRepetition(Type.Repetition.REPEATED),
+        "Invalid map: inner group is not repeated");
+    Preconditions.checkArgument(repeatedKeyValue.getFieldCount() <= 2,
+        "Invalid map: repeated group does not have 2 fields");
+
+    visitor.beforeRepeatedKeyValue(repeatedKeyValue);
+    try {
+      T keyResult = null;
+      T valueResult = null;
+      switch (repeatedKeyValue.getFieldCount()) {
+        case 2:
+          // if there are 2 fields, both key and value are projected
+          Type keyType = repeatedKeyValue.getType(0);
+          visitor.beforeKeyField(keyType);
+          try {
+            keyResult = visit(visitor.mapKeyType(map), keyType, visitor);
+          } finally {
+            visitor.afterKeyField(keyType);
+          }
+          Type valueType = repeatedKeyValue.getType(1);
+          visitor.beforeValueField(valueType);
+          try {

Review comment:
       Maybe we can have a method `runWithStack(Runnable, Type)`?

##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeWithPartnerVisitor.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.util.Deque;
+import java.util.List;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+public abstract class ParquetTypeWithPartnerVisitor<P, T> {
+  private final Deque<String> fieldNames = Lists.newLinkedList();
+
+  public static <P, T> T visit(P partnerType, Type type, ParquetTypeWithPartnerVisitor<P, T> visitor) {
+    if (type instanceof MessageType) {
+      return visitor.message(partnerType, (MessageType) type, visitFields(partnerType, type.asGroupType(), visitor));
+    } else if (type.isPrimitive()) {
+      return visitor.primitive(partnerType, type.asPrimitiveType());
+    } else {
+      // if not a primitive, the typeId must be a group
+      GroupType group = type.asGroupType();
+      OriginalType annotation = group.getOriginalType();
+      if (annotation != null) {
+        switch (annotation) {
+          case LIST:
+            return visitList(partnerType, group, visitor);
+          case MAP:
+            return visitMap(partnerType, group, visitor);
+          default:
+        }
+      }
+      return visitor.struct(partnerType, group, visitFields(partnerType, group, visitor));
+    }
+  }
+
+  private static <P, T> T visitList(P list, GroupType group, ParquetTypeWithPartnerVisitor<P, T> visitor) {
+    Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED),
+        "Invalid list: top-level group is repeated: %s", group);
+    Preconditions.checkArgument(group.getFieldCount() == 1,
+        "Invalid list: does not contain single repeated field: %s", group);
+
+    GroupType repeatedElement = group.getFields().get(0).asGroupType();
+    Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED),
+        "Invalid list: inner group is not repeated");
+    Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1,
+        "Invalid list: repeated group is not a single field: %s", group);
+
+    visitor.beforeRepeatedElement(repeatedElement);
+    try {
+      T elementResult = null;
+      if (repeatedElement.getFieldCount() > 0) {
+        Type elementField = repeatedElement.getType(0);
+        try {
+          visitor.beforeElementField(elementField);
+          elementResult = visit(visitor.arrayElementType(list), elementField, visitor);
+        } finally {
+          visitor.afterElementField(elementField);
+        }
+      }
+      return visitor.list(list, group, elementResult);
+    } finally {
+      visitor.afterRepeatedElement(repeatedElement);
+    }
+  }
+
+  private static <P, T> T visitMap(P map, GroupType group, ParquetTypeWithPartnerVisitor<P, T> visitor) {
+    Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED),
+        "Invalid map: top-level group is repeated: %s", group);
+    Preconditions.checkArgument(group.getFieldCount() == 1,
+        "Invalid map: does not contain single repeated field: %s", group);
+
+    GroupType repeatedKeyValue = group.getType(0).asGroupType();
+    Preconditions.checkArgument(repeatedKeyValue.isRepetition(Type.Repetition.REPEATED),
+        "Invalid map: inner group is not repeated");
+    Preconditions.checkArgument(repeatedKeyValue.getFieldCount() <= 2,
+        "Invalid map: repeated group does not have 2 fields");
+
+    visitor.beforeRepeatedKeyValue(repeatedKeyValue);
+    try {
+      T keyResult = null;
+      T valueResult = null;
+      switch (repeatedKeyValue.getFieldCount()) {
+        case 2:
+          // if there are 2 fields, both key and value are projected
+          Type keyType = repeatedKeyValue.getType(0);
+          visitor.beforeKeyField(keyType);
+          try {
+            keyResult = visit(visitor.mapKeyType(map), keyType, visitor);
+          } finally {
+            visitor.afterKeyField(keyType);
+          }
+          Type valueType = repeatedKeyValue.getType(1);
+          visitor.beforeValueField(valueType);
+          try {
+            valueResult = visit(visitor.mapValueType(map), repeatedKeyValue.getType(1), visitor);
+          } finally {
+            visitor.afterValueField(valueType);
+          }
+          break;
+        case 1:
+          // if there is just one, use the name to determine what it is
+          Type keyOrValue = repeatedKeyValue.getType(0);
+          if (keyOrValue.getName().equalsIgnoreCase("key")) {
+            visitor.beforeKeyField(keyOrValue);
+            try {
+              keyResult = visit(visitor.mapKeyType(map), keyOrValue, visitor);
+            } finally {
+              visitor.afterKeyField(keyOrValue);
+            }
+            // value result remains null
+          } else {
+            visitor.beforeValueField(keyOrValue);
+            try {
+              valueResult = visit(visitor.mapValueType(map), keyOrValue, visitor);
+            } finally {
+              visitor.afterValueField(keyOrValue);
+            }
+            // key result remains null
+          }
+          break;
+        default:
+          // both results will remain null
+      }
+
+      return visitor.map(map, group, keyResult, valueResult);
+
+    } finally {
+      visitor.afterRepeatedKeyValue(repeatedKeyValue);
+    }
+  }
+
+  protected static <P, T> List<T> visitFields(P struct, GroupType group, ParquetTypeWithPartnerVisitor<P, T> visitor) {
+    List<T> results = Lists.newArrayListWithExpectedSize(group.getFieldCount());
+    for (int i = 0; i < group.getFieldCount(); i += 1) {
+      Type field = group.getFields().get(i);
+      visitor.beforeField(field);
+      Integer fieldId = field.getId() == null ? null : field.getId().intValue();
+      Pair<String, P> filedNameAndType = visitor.fieldNameAndType(struct, i, fieldId);
+      if (filedNameAndType != null) {
+        results.add(visit(filedNameAndType.second(), field, visitor));
+      } else {
+        results.add(visit(null, field, visitor));
+      }
+      visitor.afterField(field);
+    }
+
+    return results;
+  }
+
+  protected abstract P arrayElementType(P arrayType);
+  protected abstract P mapKeyType(P mapType);
+  protected abstract P mapValueType(P mapType);
+  protected abstract Pair<String, P> fieldNameAndType(P structType, int pos, Integer fieldId);
+
+  public T message(P struct, MessageType message, List<T> fields) {
+    return null;
+  }
+
+  public T struct(P pStruct, GroupType struct, List<T> fields) {
+    return null;
+  }
+
+  public T list(P pArray, GroupType array, T element) {
+    return null;
+  }
+
+  public T map(P pMap, GroupType map, T key, T value) {
+    return null;
+  }
+
+  public T primitive(P pType, PrimitiveType primitive) {
+    return null;
+  }
+
+
+  public void beforeField(Type type) {

Review comment:
       I don't quite understand the meaning of the following methods. Will they be overrided?

##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java
##########
@@ -19,187 +19,57 @@
 
 package org.apache.iceberg.parquet;
 
-import java.util.LinkedList;
-import java.util.List;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
+import org.apache.iceberg.util.Pair;
 
 /**
  * Visitor for traversing a Parquet type with a companion Iceberg type.
  *
  * @param <T> the Java class returned by the visitor
  */
-public class TypeWithSchemaVisitor<T> {
-  @SuppressWarnings({"checkstyle:VisibilityModifier", "checkstyle:IllegalType"})
-  protected LinkedList<String> fieldNames = Lists.newLinkedList();
+public class TypeWithSchemaVisitor<T> extends ParquetTypeWithPartnerVisitor<Type, T> {
 
-  @SuppressWarnings("checkstyle:CyclomaticComplexity")
-  public static <T> T visit(org.apache.iceberg.types.Type iType, Type type, TypeWithSchemaVisitor<T> visitor) {
-    if (type instanceof MessageType) {
-      Types.StructType struct = iType != null ? iType.asStructType() : null;
-      return visitor.message(struct, (MessageType) type,
-          visitFields(struct, type.asGroupType(), visitor));
-
-    } else if (type.isPrimitive()) {
-      org.apache.iceberg.types.Type.PrimitiveType iPrimitive = iType != null ?
-          iType.asPrimitiveType() : null;
-      return visitor.primitive(iPrimitive, type.asPrimitiveType());
-
-    } else {
-      // if not a primitive, the typeId must be a group
-      GroupType group = type.asGroupType();
-      OriginalType annotation = group.getOriginalType();
-      if (annotation != null) {
-        switch (annotation) {
-          case LIST:
-            Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED),
-                "Invalid list: top-level group is repeated: %s", group);
-            Preconditions.checkArgument(group.getFieldCount() == 1,
-                "Invalid list: does not contain single repeated field: %s", group);
-
-            GroupType repeatedElement = group.getFields().get(0).asGroupType();
-            Preconditions.checkArgument(repeatedElement.isRepetition(Type.Repetition.REPEATED),
-                "Invalid list: inner group is not repeated");
-            Preconditions.checkArgument(repeatedElement.getFieldCount() <= 1,
-                "Invalid list: repeated group is not a single field: %s", group);
-
-            Types.ListType list = null;
-            Types.NestedField element = null;
-            if (iType != null) {
-              list = iType.asListType();
-              element = list.fields().get(0);
-            }
-
-            visitor.fieldNames.push(repeatedElement.getName());
-            try {
-              T elementResult = null;
-              if (repeatedElement.getFieldCount() > 0) {
-                elementResult = visitField(element, repeatedElement.getType(0), visitor);
-              }
-
-              return visitor.list(list, group, elementResult);
-            } finally {
-              visitor.fieldNames.pop();
-            }
-
-          case MAP:
-            Preconditions.checkArgument(!group.isRepetition(Type.Repetition.REPEATED),
-                "Invalid map: top-level group is repeated: %s", group);
-            Preconditions.checkArgument(group.getFieldCount() == 1,
-                "Invalid map: does not contain single repeated field: %s", group);
-
-            GroupType repeatedKeyValue = group.getType(0).asGroupType();
-            Preconditions.checkArgument(repeatedKeyValue.isRepetition(Type.Repetition.REPEATED),
-                "Invalid map: inner group is not repeated");
-            Preconditions.checkArgument(repeatedKeyValue.getFieldCount() <= 2,
-                "Invalid map: repeated group does not have 2 fields");
-
-            Types.MapType map = null;
-            Types.NestedField keyField = null;
-            Types.NestedField valueField = null;
-            if (iType != null) {
-              map = iType.asMapType();
-              keyField = map.fields().get(0);
-              valueField = map.fields().get(1);
-            }
-
-            visitor.fieldNames.push(repeatedKeyValue.getName());
-            try {
-              T keyResult = null;
-              T valueResult = null;
-              switch (repeatedKeyValue.getFieldCount()) {
-                case 2:
-                  // if there are 2 fields, both key and value are projected
-                  keyResult = visitField(keyField, repeatedKeyValue.getType(0), visitor);
-                  valueResult = visitField(valueField, repeatedKeyValue.getType(1), visitor);
-                  break;
-                case 1:
-                  // if there is just one, use the name to determine what it is
-                  Type keyOrValue = repeatedKeyValue.getType(0);
-                  if (keyOrValue.getName().equalsIgnoreCase("key")) {
-                    keyResult = visitField(keyField, keyOrValue, visitor);
-                    // value result remains null
-                  } else {
-                    valueResult = visitField(valueField, keyOrValue, visitor);
-                    // key result remains null
-                  }
-                  break;
-                default:
-                  // both results will remain null
-              }
-
-              return visitor.map(map, group, keyResult, valueResult);
-
-            } finally {
-              visitor.fieldNames.pop();
-            }
-
-          default:
-        }
-      }
-
-      Types.StructType struct = iType != null ? iType.asStructType() : null;
-      return visitor.struct(struct, group, visitFields(struct, group, visitor));
+  @Override
+  protected Type arrayElementType(Type arrayType) {
+    if (arrayType == null) {
+      return null;
     }
-  }
 
-  private static <T> T visitField(Types.NestedField iField, Type field, TypeWithSchemaVisitor<T> visitor) {
-    visitor.fieldNames.push(field.getName());
-    try {
-      return visit(iField != null ? iField.type() : null, field, visitor);
-    } finally {
-      visitor.fieldNames.pop();
-    }
+    return arrayType.asListType().elementType();
   }
 
-  private static <T> List<T> visitFields(Types.StructType struct, GroupType group, TypeWithSchemaVisitor<T> visitor) {
-    List<T> results = Lists.newArrayListWithExpectedSize(group.getFieldCount());
-    for (Type field : group.getFields()) {
-      int id = -1;
-      if (field.getId() != null) {
-        id = field.getId().intValue();
-      }
-      Types.NestedField iField = (struct != null && id >= 0) ? struct.field(id) : null;
-      results.add(visitField(iField, field, visitor));
+  @Override
+  protected Type mapKeyType(Type mapType) {
+    if (mapType == null) {
+      return null;
     }
 
-    return results;
-  }
-
-  public T message(Types.StructType iStruct, MessageType message, List<T> fields) {
-    return null;
-  }
-
-  public T struct(Types.StructType iStruct, GroupType struct, List<T> fields) {
-    return null;
+    return mapType.asMapType().keyType();
   }
 
-  public T list(Types.ListType iList, GroupType array, T element) {
-    return null;
-  }
+  @Override
+  protected Type mapValueType(Type mapType) {
+    if (mapType == null) {
+      return null;
+    }
 
-  public T map(Types.MapType iMap, GroupType map, T key, T value) {
-    return null;
+    return mapType.asMapType().valueType();
   }
 
-  public T primitive(org.apache.iceberg.types.Type.PrimitiveType iPrimitive,
-                     PrimitiveType primitive) {
-    return null;
-  }
+  @Override
+  protected Pair<String, Type> fieldNameAndType(Type structType, int pos, Integer fieldId) {
+    if (structType == null || fieldId == null) {
+      return null;
+    }
 
-  protected String[] currentPath() {
-    return Lists.newArrayList(fieldNames.descendingIterator()).toArray(new String[0]);
-  }
+    Types.StructType struct = structType.asStructType();
+    if (struct.field(fieldId) == null) {
+      return null;
+    }
 
-  protected String[] path(String name) {
-    List<String> list = Lists.newArrayList(fieldNames.descendingIterator());
-    list.add(name);
-    return list.toArray(new String[0]);
+    Type type = struct.field(fieldId).type();
+    String name = struct.field(fieldId).name();
+    return Pair.of(name, type);

Review comment:
       NIT:
   ```
       Types.NestedField field = struct.field(fieldId);
       return field == null ? null : Pair.of(field.name(), field.type());
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org