You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/09 21:14:16 UTC

[GitHub] [beam] reuvenlax commented on a change in pull request #14960: [BEAM-12442] Make sure that row renames work properly in nested rows.

reuvenlax commented on a change in pull request #14960:
URL: https://github.com/apache/beam/pull/14960#discussion_r648689186



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java
##########
@@ -64,64 +70,95 @@
     return new Inner<>();
   }
 
-  // Describes a single renameSchema rule.
-  private static class RenamePair implements Serializable {
+  // Describes a single renameSchema rule
+  @AutoValue
+  abstract static class RenamePair implements Serializable {
     // The FieldAccessDescriptor describing the field to renameSchema. Must reference a singleton
     // field.
-    private final FieldAccessDescriptor fieldAccessDescriptor;
+    abstract FieldAccessDescriptor getFieldAccessDescriptor();
     // The new name for the field.
-    private final String newName;
+    abstract String getNewName();
 
-    RenamePair(FieldAccessDescriptor fieldAccessDescriptor, String newName) {
-      this.fieldAccessDescriptor = fieldAccessDescriptor;
-      this.newName = newName;
+    static RenamePair of(FieldAccessDescriptor fieldAccessDescriptor, String newName) {
+      return new AutoValue_RenameFields_RenamePair(fieldAccessDescriptor, newName);
     }
 
     RenamePair resolve(Schema schema) {
-      FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(schema);
+      FieldAccessDescriptor resolved = getFieldAccessDescriptor().resolve(schema);
       if (!resolved.referencesSingleField()) {
         throw new IllegalArgumentException(resolved + " references multiple fields.");
       }
-      return new RenamePair(resolved, newName);
+      return RenamePair.of(resolved, getNewName());
     }
   }
 
-  private static FieldType renameFieldType(FieldType inputType, Collection<RenamePair> renames) {
+  private static FieldType renameFieldType(
+      FieldType inputType,
+      Collection<RenamePair> renames,
+      Map<UUID, Schema> renamedSchemasMap,
+      Map<UUID, BitSet> nestedFieldRenamedMap) {
+    if (renames.isEmpty()) {
+      return inputType;
+    }
+
     switch (inputType.getTypeName()) {
       case ROW:
-        return FieldType.row(renameSchema(inputType.getRowSchema(), renames));
+        renameSchema(inputType.getRowSchema(), renames, renamedSchemasMap, nestedFieldRenamedMap);
+        return FieldType.row(renamedSchemasMap.get(inputType.getRowSchema().getUUID()));
       case ARRAY:
-        return FieldType.array(renameFieldType(inputType.getCollectionElementType(), renames));
+        return FieldType.array(
+            renameFieldType(
+                inputType.getCollectionElementType(),
+                renames,
+                renamedSchemasMap,
+                nestedFieldRenamedMap));
       case ITERABLE:
-        return FieldType.iterable(renameFieldType(inputType.getCollectionElementType(), renames));
+        return FieldType.iterable(
+            renameFieldType(
+                inputType.getCollectionElementType(),
+                renames,
+                renamedSchemasMap,
+                nestedFieldRenamedMap));
       case MAP:
         return FieldType.map(
-            renameFieldType(inputType.getMapKeyType(), renames),
-            renameFieldType(inputType.getMapValueType(), renames));
+            renameFieldType(
+                inputType.getMapKeyType(), renames, renamedSchemasMap, nestedFieldRenamedMap),
+            renameFieldType(
+                inputType.getMapValueType(), renames, renamedSchemasMap, nestedFieldRenamedMap));
+      case LOGICAL_TYPE:
+        throw new RuntimeException("RenameFields does not support renaming logical types.");
       default:
         return inputType;
     }
   }
 
   // Apply the user-specified renames to the input schema.
-  private static Schema renameSchema(Schema inputSchema, Collection<RenamePair> renames) {
+  @VisibleForTesting

Review comment:
       I'm not sure how. The problem with the public interface is that it uses the DirectRunner, and the characteristic of this bug is that the DirectRunner tends to obscure the bug by serializing/deserializing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org