You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/07 21:12:49 UTC

[GitHub] [beam] reuvenlax opened a new pull request #14960: [BEAM-12442] Make sure that row renames work properly in nested rows.

reuvenlax opened a new pull request #14960:
URL: https://github.com/apache/beam/pull/14960


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mouyang commented on pull request #14960: [BEAM-12442] Make sure that row renames work properly in nested rows.

Posted by GitBox <gi...@apache.org>.
mouyang commented on pull request #14960:
URL: https://github.com/apache/beam/pull/14960#issuecomment-858235832


   > R: @mouyang
   
   @reuvenlax LGTM.  I have two general knowledge questions.
   
   1. I didn't get a chance to try out your user mailing list suggestion of reshuffle, but will this PR mean that I won't have to do a reshuffle anymore?  It seems like it but I'm not 100% sure.
   2. Traversing the Schema and Row is preferred over reshuffling, correct?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #14960: [BEAM-12442] Make sure that row renames work properly in nested rows.

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #14960:
URL: https://github.com/apache/beam/pull/14960#discussion_r648689186



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java
##########
@@ -64,64 +70,95 @@
     return new Inner<>();
   }
 
-  // Describes a single renameSchema rule.
-  private static class RenamePair implements Serializable {
+  // Describes a single renameSchema rule
+  @AutoValue
+  abstract static class RenamePair implements Serializable {
     // The FieldAccessDescriptor describing the field to renameSchema. Must reference a singleton
     // field.
-    private final FieldAccessDescriptor fieldAccessDescriptor;
+    abstract FieldAccessDescriptor getFieldAccessDescriptor();
     // The new name for the field.
-    private final String newName;
+    abstract String getNewName();
 
-    RenamePair(FieldAccessDescriptor fieldAccessDescriptor, String newName) {
-      this.fieldAccessDescriptor = fieldAccessDescriptor;
-      this.newName = newName;
+    static RenamePair of(FieldAccessDescriptor fieldAccessDescriptor, String newName) {
+      return new AutoValue_RenameFields_RenamePair(fieldAccessDescriptor, newName);
     }
 
     RenamePair resolve(Schema schema) {
-      FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(schema);
+      FieldAccessDescriptor resolved = getFieldAccessDescriptor().resolve(schema);
       if (!resolved.referencesSingleField()) {
         throw new IllegalArgumentException(resolved + " references multiple fields.");
       }
-      return new RenamePair(resolved, newName);
+      return RenamePair.of(resolved, getNewName());
     }
   }
 
-  private static FieldType renameFieldType(FieldType inputType, Collection<RenamePair> renames) {
+  private static FieldType renameFieldType(
+      FieldType inputType,
+      Collection<RenamePair> renames,
+      Map<UUID, Schema> renamedSchemasMap,
+      Map<UUID, BitSet> nestedFieldRenamedMap) {
+    if (renames.isEmpty()) {
+      return inputType;
+    }
+
     switch (inputType.getTypeName()) {
       case ROW:
-        return FieldType.row(renameSchema(inputType.getRowSchema(), renames));
+        renameSchema(inputType.getRowSchema(), renames, renamedSchemasMap, nestedFieldRenamedMap);
+        return FieldType.row(renamedSchemasMap.get(inputType.getRowSchema().getUUID()));
       case ARRAY:
-        return FieldType.array(renameFieldType(inputType.getCollectionElementType(), renames));
+        return FieldType.array(
+            renameFieldType(
+                inputType.getCollectionElementType(),
+                renames,
+                renamedSchemasMap,
+                nestedFieldRenamedMap));
       case ITERABLE:
-        return FieldType.iterable(renameFieldType(inputType.getCollectionElementType(), renames));
+        return FieldType.iterable(
+            renameFieldType(
+                inputType.getCollectionElementType(),
+                renames,
+                renamedSchemasMap,
+                nestedFieldRenamedMap));
       case MAP:
         return FieldType.map(
-            renameFieldType(inputType.getMapKeyType(), renames),
-            renameFieldType(inputType.getMapValueType(), renames));
+            renameFieldType(
+                inputType.getMapKeyType(), renames, renamedSchemasMap, nestedFieldRenamedMap),
+            renameFieldType(
+                inputType.getMapValueType(), renames, renamedSchemasMap, nestedFieldRenamedMap));
+      case LOGICAL_TYPE:
+        throw new RuntimeException("RenameFields does not support renaming logical types.");
       default:
         return inputType;
     }
   }
 
   // Apply the user-specified renames to the input schema.
-  private static Schema renameSchema(Schema inputSchema, Collection<RenamePair> renames) {
+  @VisibleForTesting

Review comment:
       I'm not sure how. The problem with the public interface is that it uses the DirectRunner, and the characteristic of this bug is that the DirectRunner tends to obscure the bug by serializing/deserializing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #14960: [BEAM-12442] Make sure that row renames work properly in nested rows.

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #14960:
URL: https://github.com/apache/beam/pull/14960#issuecomment-858995402


   @mouyang correct on both counts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #14960: [BEAM-12442] Make sure that row renames work properly in nested rows.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #14960:
URL: https://github.com/apache/beam/pull/14960#discussion_r648506759



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java
##########
@@ -64,64 +70,95 @@
     return new Inner<>();
   }
 
-  // Describes a single renameSchema rule.
-  private static class RenamePair implements Serializable {
+  // Describes a single renameSchema rule
+  @AutoValue
+  abstract static class RenamePair implements Serializable {
     // The FieldAccessDescriptor describing the field to renameSchema. Must reference a singleton
     // field.
-    private final FieldAccessDescriptor fieldAccessDescriptor;
+    abstract FieldAccessDescriptor getFieldAccessDescriptor();
     // The new name for the field.
-    private final String newName;
+    abstract String getNewName();
 
-    RenamePair(FieldAccessDescriptor fieldAccessDescriptor, String newName) {
-      this.fieldAccessDescriptor = fieldAccessDescriptor;
-      this.newName = newName;
+    static RenamePair of(FieldAccessDescriptor fieldAccessDescriptor, String newName) {
+      return new AutoValue_RenameFields_RenamePair(fieldAccessDescriptor, newName);
     }
 
     RenamePair resolve(Schema schema) {
-      FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(schema);
+      FieldAccessDescriptor resolved = getFieldAccessDescriptor().resolve(schema);
       if (!resolved.referencesSingleField()) {
         throw new IllegalArgumentException(resolved + " references multiple fields.");
       }
-      return new RenamePair(resolved, newName);
+      return RenamePair.of(resolved, getNewName());
     }
   }
 
-  private static FieldType renameFieldType(FieldType inputType, Collection<RenamePair> renames) {
+  private static FieldType renameFieldType(
+      FieldType inputType,
+      Collection<RenamePair> renames,
+      Map<UUID, Schema> renamedSchemasMap,
+      Map<UUID, BitSet> nestedFieldRenamedMap) {
+    if (renames.isEmpty()) {
+      return inputType;
+    }
+
     switch (inputType.getTypeName()) {
       case ROW:
-        return FieldType.row(renameSchema(inputType.getRowSchema(), renames));
+        renameSchema(inputType.getRowSchema(), renames, renamedSchemasMap, nestedFieldRenamedMap);
+        return FieldType.row(renamedSchemasMap.get(inputType.getRowSchema().getUUID()));
       case ARRAY:
-        return FieldType.array(renameFieldType(inputType.getCollectionElementType(), renames));
+        return FieldType.array(
+            renameFieldType(
+                inputType.getCollectionElementType(),
+                renames,
+                renamedSchemasMap,
+                nestedFieldRenamedMap));
       case ITERABLE:
-        return FieldType.iterable(renameFieldType(inputType.getCollectionElementType(), renames));
+        return FieldType.iterable(
+            renameFieldType(
+                inputType.getCollectionElementType(),
+                renames,
+                renamedSchemasMap,
+                nestedFieldRenamedMap));
       case MAP:
         return FieldType.map(
-            renameFieldType(inputType.getMapKeyType(), renames),
-            renameFieldType(inputType.getMapValueType(), renames));
+            renameFieldType(
+                inputType.getMapKeyType(), renames, renamedSchemasMap, nestedFieldRenamedMap),
+            renameFieldType(
+                inputType.getMapValueType(), renames, renamedSchemasMap, nestedFieldRenamedMap));
+      case LOGICAL_TYPE:
+        throw new RuntimeException("RenameFields does not support renaming logical types.");
       default:
         return inputType;
     }
   }
 
   // Apply the user-specified renames to the input schema.
-  private static Schema renameSchema(Schema inputSchema, Collection<RenamePair> renames) {
+  @VisibleForTesting

Review comment:
       Why not just test through the public interface, rather than exposing `renameSchema` and `renameRows`?

##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
##########
@@ -258,4 +265,65 @@ public void renameNestedInMapFields() {
     PAssert.that(renamed).containsInAnyOrder(expectedRows);
     pipeline.run();
   }
+
+  @Test
+  public void testRenameRow() {
+    Schema nestedSchema = Schema.builder().addStringField("field1").addInt32Field("field2").build();
+    Schema schema =
+        Schema.builder().addStringField("field1").addRowField("nested", nestedSchema).build();
+
+    Schema expectedNestedSchema =
+        Schema.builder().addStringField("bottom1").addInt32Field("bottom2").build();
+    Schema expectedSchema =
+        Schema.builder()
+            .addStringField("top1")
+            .addRowField("top_nested", expectedNestedSchema)

Review comment:
       optional super nitty nit: I think this would be easier to grok if the nested schemas were defined inline.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java
##########
@@ -164,29 +198,112 @@ private Inner(List<RenamePair> renames) {
       List<RenamePair> newList =
           ImmutableList.<RenamePair>builder()
               .addAll(renames)
-              .add(new RenamePair(field, newName))
+              .add(RenamePair.of(field, newName))
               .build();
 
       return new Inner<>(newList);
     }
 
     @Override
     public PCollection<Row> expand(PCollection<T> input) {
-      Schema inputSchema = input.getSchema();
+      final Map<UUID, Schema> renamedSchemasMap = Maps.newHashMap();
+      final Map<UUID, BitSet> nestedFieldRenamedMap = Maps.newHashMap();
 
-      List<RenamePair> pairs =
-          renames.stream().map(r -> r.resolve(inputSchema)).collect(Collectors.toList());
-      final Schema outputSchema = renameSchema(inputSchema, pairs);
+      List<RenamePair> resolvedRenames =
+          renames.stream().map(r -> r.resolve(input.getSchema())).collect(Collectors.toList());
+      renameSchema(input.getSchema(), resolvedRenames, renamedSchemasMap, nestedFieldRenamedMap);
+      final Schema outputSchema = renamedSchemasMap.get(input.getSchema().getUUID());
+      final BitSet nestedRenames = nestedFieldRenamedMap.get(input.getSchema().getUUID());
       return input
           .apply(
               ParDo.of(
                   new DoFn<T, Row>() {
                     @ProcessElement
                     public void processElement(@Element Row row, OutputReceiver<Row> o) {
-                      o.output(Row.withSchema(outputSchema).attachValues(row.getValues()));
+                      o.output(
+                          renameRow(
+                              row,
+                              outputSchema,
+                              nestedRenames,
+                              renamedSchemasMap,
+                              nestedFieldRenamedMap));
                     }
                   }))
           .setRowSchema(outputSchema);
     }
   }
+
+  // TODO(reuvenlax): For better performance, we should reuse functionality in

Review comment:
       Consider a jira for this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #14960: [BEAM-12442] Make sure that row renames work properly in nested rows.

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #14960:
URL: https://github.com/apache/beam/pull/14960#issuecomment-856278435


   R: @mouyang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #14960: [BEAM-12442] Make sure that row renames work properly in nested rows.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #14960:
URL: https://github.com/apache/beam/pull/14960#discussion_r648773975



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java
##########
@@ -64,64 +70,95 @@
     return new Inner<>();
   }
 
-  // Describes a single renameSchema rule.
-  private static class RenamePair implements Serializable {
+  // Describes a single renameSchema rule
+  @AutoValue
+  abstract static class RenamePair implements Serializable {
     // The FieldAccessDescriptor describing the field to renameSchema. Must reference a singleton
     // field.
-    private final FieldAccessDescriptor fieldAccessDescriptor;
+    abstract FieldAccessDescriptor getFieldAccessDescriptor();
     // The new name for the field.
-    private final String newName;
+    abstract String getNewName();
 
-    RenamePair(FieldAccessDescriptor fieldAccessDescriptor, String newName) {
-      this.fieldAccessDescriptor = fieldAccessDescriptor;
-      this.newName = newName;
+    static RenamePair of(FieldAccessDescriptor fieldAccessDescriptor, String newName) {
+      return new AutoValue_RenameFields_RenamePair(fieldAccessDescriptor, newName);
     }
 
     RenamePair resolve(Schema schema) {
-      FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(schema);
+      FieldAccessDescriptor resolved = getFieldAccessDescriptor().resolve(schema);
       if (!resolved.referencesSingleField()) {
         throw new IllegalArgumentException(resolved + " references multiple fields.");
       }
-      return new RenamePair(resolved, newName);
+      return RenamePair.of(resolved, getNewName());
     }
   }
 
-  private static FieldType renameFieldType(FieldType inputType, Collection<RenamePair> renames) {
+  private static FieldType renameFieldType(
+      FieldType inputType,
+      Collection<RenamePair> renames,
+      Map<UUID, Schema> renamedSchemasMap,
+      Map<UUID, BitSet> nestedFieldRenamedMap) {
+    if (renames.isEmpty()) {
+      return inputType;
+    }
+
     switch (inputType.getTypeName()) {
       case ROW:
-        return FieldType.row(renameSchema(inputType.getRowSchema(), renames));
+        renameSchema(inputType.getRowSchema(), renames, renamedSchemasMap, nestedFieldRenamedMap);
+        return FieldType.row(renamedSchemasMap.get(inputType.getRowSchema().getUUID()));
       case ARRAY:
-        return FieldType.array(renameFieldType(inputType.getCollectionElementType(), renames));
+        return FieldType.array(
+            renameFieldType(
+                inputType.getCollectionElementType(),
+                renames,
+                renamedSchemasMap,
+                nestedFieldRenamedMap));
       case ITERABLE:
-        return FieldType.iterable(renameFieldType(inputType.getCollectionElementType(), renames));
+        return FieldType.iterable(
+            renameFieldType(
+                inputType.getCollectionElementType(),
+                renames,
+                renamedSchemasMap,
+                nestedFieldRenamedMap));
       case MAP:
         return FieldType.map(
-            renameFieldType(inputType.getMapKeyType(), renames),
-            renameFieldType(inputType.getMapValueType(), renames));
+            renameFieldType(
+                inputType.getMapKeyType(), renames, renamedSchemasMap, nestedFieldRenamedMap),
+            renameFieldType(
+                inputType.getMapValueType(), renames, renamedSchemasMap, nestedFieldRenamedMap));
+      case LOGICAL_TYPE:
+        throw new RuntimeException("RenameFields does not support renaming logical types.");
       default:
         return inputType;
     }
   }
 
   // Apply the user-specified renames to the input schema.
-  private static Schema renameSchema(Schema inputSchema, Collection<RenamePair> renames) {
+  @VisibleForTesting

Review comment:
       Ah yeah. Shoot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #14960: [BEAM-12442] Make sure that row renames work properly in nested rows.

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #14960:
URL: https://github.com/apache/beam/pull/14960#issuecomment-857369959


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax merged pull request #14960: [BEAM-12442] Make sure that row renames work properly in nested rows.

Posted by GitBox <gi...@apache.org>.
reuvenlax merged pull request #14960:
URL: https://github.com/apache/beam/pull/14960


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org