You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/17 16:50:16 UTC

[GitHub] [beam] kennknowles commented on a change in pull request #11041: [BEAM-4076] Use beam join api in sql

kennknowles commented on a change in pull request #11041: [BEAM-4076] Use beam join api in sql
URL: https://github.com/apache/beam/pull/11041#discussion_r410346652
 
 

 ##########
 File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
 ##########
 @@ -45,111 +41,35 @@
 /** Collections of {@code PTransform} and {@code DoFn} used to perform JOIN operation. */
 public class BeamJoinTransforms {
 
-  /** A {@code SimpleFunction} to extract join fields from the specified row. */
-  public static class ExtractJoinFields extends SimpleFunction<Row, KV<Row, Row>> {
-    private final List<SerializableRexNode> joinColumns;
-    private final Schema schema;
-    private int leftRowColumnCount;
-
-    public ExtractJoinFields(
-        boolean isLeft,
-        List<Pair<RexNode, RexNode>> joinColumns,
-        Schema schema,
-        int leftRowColumnCount) {
-      this.joinColumns =
-          joinColumns.stream()
-              .map(pair -> SerializableRexNode.builder(isLeft ? pair.left : pair.right).build())
-              .collect(toList());
-      this.schema = schema;
-      this.leftRowColumnCount = leftRowColumnCount;
-    }
-
-    @Override
-    public KV<Row, Row> apply(Row input) {
-      Row row =
-          joinColumns.stream()
-              .map(v -> getValue(v, input, leftRowColumnCount))
-              .collect(toRow(schema));
-      return KV.of(row, input);
-    }
-
-    @SuppressWarnings("unused")
-    private Schema.Field toField(Schema schema, Integer fieldIndex) {
-      Schema.Field original = schema.getField(fieldIndex);
-      return original.withName("c" + fieldIndex);
-    }
-
-    private Object getValue(
-        SerializableRexNode serializableRexNode, Row input, int leftRowColumnCount) {
-      if (serializableRexNode instanceof SerializableRexInputRef) {
-        return input.getValue(
-            ((SerializableRexInputRef) serializableRexNode).getIndex() - leftRowColumnCount);
-      } else { // It can only be SerializableFieldAccess.
-        List<Integer> indexes = ((SerializableRexFieldAccess) serializableRexNode).getIndexes();
-        // retrieve row based on the first column reference.
-        Row rowField = input.getValue(indexes.get(0) - leftRowColumnCount);
-        for (int i = 1; i < indexes.size() - 1; i++) {
-          rowField = rowField.getRow(indexes.get(i));
-        }
-        return rowField.getValue(indexes.get(indexes.size() - 1));
-      }
-    }
+  public static FieldAccessDescriptor getJoinColumns(
+      boolean isLeft,
+      List<Pair<RexNode, RexNode>> joinColumns,
+      int leftRowColumnCount,
+      Schema schema) {
+    List<SerializableRexNode> joinColumnsBuilt =
+        joinColumns.stream()
+            .map(pair -> SerializableRexNode.builder(isLeft ? pair.left : pair.right).build())
+            .collect(toList());
+    return FieldAccessDescriptor.union(
+        joinColumnsBuilt.stream()
+            .map(v -> getJoinColumn(v, leftRowColumnCount).resolve(schema))
+            .collect(Collectors.toList()));
   }
 
-  /** A {@code DoFn} which implement the sideInput-JOIN. */
-  public static class SideInputJoinDoFn extends DoFn<KV<Row, Row>, Row> {
-    private final PCollectionView<Map<Row, Iterable<Row>>> sideInputView;
-    private final JoinRelType joinType;
-    private final Row rightNullRow;
-    private final boolean swap;
-    private final Schema schema;
-
-    public SideInputJoinDoFn(
-        JoinRelType joinType,
-        Row rightNullRow,
-        PCollectionView<Map<Row, Iterable<Row>>> sideInputView,
-        boolean swap,
-        Schema schema) {
-      this.joinType = joinType;
-      this.rightNullRow = rightNullRow;
-      this.sideInputView = sideInputView;
-      this.swap = swap;
-      this.schema = schema;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      Row key = context.element().getKey();
-      Row leftRow = context.element().getValue();
-      Map<Row, Iterable<Row>> key2Rows = context.sideInput(sideInputView);
-      Iterable<Row> rightRowsIterable = key2Rows.get(key);
-
-      if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext()) {
-        for (Row aRightRowsIterable : rightRowsIterable) {
-          context.output(combineTwoRowsIntoOne(leftRow, aRightRowsIterable, swap, schema));
-        }
-      } else {
-        if (joinType == JoinRelType.LEFT) {
-          context.output(combineTwoRowsIntoOne(leftRow, rightNullRow, swap, schema));
-        }
+  private static FieldAccessDescriptor getJoinColumn(
 
 Review comment:
   This actually hardcodes a bad assumption a bit further than it was before: that the join is only on columns. We want to move in the other direction, and allow join conditions to be more general RexNodes, many of which still work for CoGBK and side input lookup joins. This is BEAM-6112.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services