You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/02 20:16:12 UTC

[GitHub] [beam] kennknowles commented on a change in pull request #12915: [BEAM-7386] Introduce EventTimeBoundedEquijoin.

kennknowles commented on a change in pull request #12915:
URL: https://github.com/apache/beam/pull/12915#discussion_r534453439



##########
File path: sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>
+      extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1, V2>>>> {
+    private final transient PCollection<KV<K, V2>> rightCollection;
+    private final Duration temporalBound;
+    private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+    private TemporalInnerJoin(
+        final PCollection<KV<K, V2>> rightCollection,
+        final Duration temporalBound,
+        final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.temporalBound = temporalBound;
+      this.rightCollection = rightCollection;
+      this.comparatorFn = compareFn;
+    }
+
+    /**
+     * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+     *
+     * <p>Similar to {@code innerJoin} but also supports unbounded PCollections in the GlobalWindow.
+     * Join results will be produced eagerly as new elements are received, regardless of windowing,
+     * however users should prefer {@code innerJoin} in most cases for better throughput.
+     *
+     * <p>The non-inclusive {@code temporalBound}, used as part of the join predicate, allows
+     * elements to be expired when they are irrelevant according to the event-time watermark. This
+     * helps reduce the search space, storage, and memory requirements.
+     *
+     * @param rightCollection Right side collection of the join.
+     * @param temporalBound Duration used in the join predicate (non-inclusive).
+     * @param compareFn Join predicate used for matching elements.
+     * @param <K> Type of the key for both collections.
+     * @param <V1> Type of the values for the left collection.
+     * @param <V2> Type of values for the right collection.
+     */
+    public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+        PCollection<KV<K, V2>> rightCollection,
+        Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      return new TemporalInnerJoin<>(rightCollection, temporalBound, compareFn);
+    }
+
+    @Override
+    public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>> leftCollection) {
+      // left        right
+      // tag-left    tag-right (create union type)
+      //   \         /
+      //     flatten
+      //     join
+
+      Coder<K> keyCoder = ((KvCoder<K, V1>) leftCollection.getCoder()).getKeyCoder();
+      Coder<V1> leftValueCoder = ((KvCoder<K, V1>) leftCollection.getCoder()).getValueCoder();
+      Coder<V2> rightValueCoder = ((KvCoder<K, V2>) rightCollection.getCoder()).getValueCoder();
+
+      PCollection<KV<K, KV<V1, V2>>> leftUnion =
+          leftCollection
+              .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1, V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), NullableCoder.of(rightValueCoder))));
+
+      PCollection<KV<K, KV<V1, V2>>> rightUnion =
+          rightCollection
+              .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K, V1, V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), NullableCoder.of(rightValueCoder))));
+
+      return PCollectionList.of(leftUnion)
+          .and(rightUnion)
+          .apply(Flatten.pCollections())
+          .apply(
+              "TemporalInnerJoinFn",
+              ParDo.of(
+                  new TemporalInnerJoinFn<>(
+                      leftValueCoder, rightValueCoder, temporalBound, comparatorFn)));
+    }
+  }
+
+  private static class LeftUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+      return KV.of(element.getKey(), KV.of(element.getValue(), null));
+    }
+  }
+
+  private static class RightUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+      return KV.of(element.getKey(), KV.of(null, element.getValue()));
+    }
+  }
+
+  private static class TemporalInnerJoinFn<K, V1, V2>
+      extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+    @StateId("left")
+    private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+    @StateId("right")
+    private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+    // Null only when uninitialized. After first element is received this will always be non-null.
+    @StateId("lastEviction")
+    private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+    @TimerId("eviction")
+    private final TimerSpec evictionSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    private final Duration temporalBound;
+    private final Duration evictionFrequency;
+    private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+    // Tracks the state of the eviction timer. Value is true when the timer has been set and
+    // execution is waiting for the event time watermark to fire the timer according to the
+    // evictionFrequency. False after the timer has been fired, so processElement can set the timer
+    // using the previous firing event time.
+    private transient boolean evictionTimerSet;
+
+    @Setup
+    public void setup() {
+      evictionTimerSet = false;
+    }
+
+    protected TemporalInnerJoinFn(
+        final Coder<V1> leftCoder,
+        final Coder<V2> rightCoder,
+        final Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+      this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+      this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+      this.temporalBound = temporalBound;
+      this.compareFn = compareFn;
+      this.evictionFrequency =
+          temporalBound.getMillis() <= 4 ? Duration.millis(1) : temporalBound.dividedBy(4);
+    }
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext c,
+        @AlwaysFetched @StateId("left") OrderedListState<V1> leftState,
+        @AlwaysFetched @StateId("right") OrderedListState<V2> rightState,
+        @AlwaysFetched @StateId("lastEviction") ValueState<Instant> lastEvictionState,
+        @Timestamp Instant timestamp,
+        @TimerId("eviction") Timer evictionTimer) {
+      Instant lastEviction = lastEvictionState.read();
+      if (lastEviction == null) {
+        // Initialize timer for the first time relatively since event time watermark is unknown.
+        evictionTimerSet = true;
+        evictionTimer.offset(evictionFrequency).setRelative();
+      } else if (!evictionTimerSet) {
+        // Set timer using persisted event watermark from last timer firing event time.
+        checkNotNull(lastEviction);
+        evictionTimerSet = true;
+        evictionTimer.set(lastEviction.plus(evictionFrequency));
+      }
+
+      KV<K, KV<V1, V2>> e = c.element();
+      K key = e.getKey();
+      V1 left = e.getValue().getKey();
+      V2 right = e.getValue().getValue();
+      if (left != null) {
+        leftState.add(TimestampedValue.of(left, timestamp));
+        rightState
+            .readRange(timestamp.minus(temporalBound), timestamp.plus(temporalBound))
+            .forEach(
+                r -> {
+                  KV<V1, V2> matchCandidate = KV.of(left, r.getValue());
+                  if (new Duration(r.getTimestamp(), timestamp).abs().isShorterThan(temporalBound)
+                      && compareFn.apply(matchCandidate)) {
+                    c.output(KV.of(key, matchCandidate));
+                  }
+                });
+      } else {
+        rightState.add(TimestampedValue.of(right, timestamp));
+        leftState
+            .readRange(timestamp.minus(temporalBound), timestamp.plus(temporalBound))
+            .forEach(
+                l -> {
+                  KV<V1, V2> matchCandidate = KV.of(l.getValue(), right);
+                  if (new Duration(l.getTimestamp(), timestamp).abs().isShorterThan(temporalBound)
+                      && compareFn.apply(matchCandidate)) {
+                    c.output(KV.of(key, matchCandidate));
+                  }
+                });
+      }
+    }
+
+    @OnTimer("eviction")
+    public void onEviction(
+        @StateId("left") OrderedListState<V1> leftState,
+        @StateId("right") OrderedListState<V2> rightState,
+        @StateId("lastEviction") ValueState<Instant> lastEvictionState,
+        @Timestamp Instant ts) {
+      evictionTimerSet = false;
+      lastEvictionState.write(ts);
+      leftState.clearRange(new Instant(0L), ts);
+      rightState.clearRange(new Instant(0L), ts);

Review comment:
       I think this is basically trying to avoid complexities brought in by subclasses. We might lift such restrictions. You may know that personally I generally dislike implementation inheritance and particularly access to private members of superclasses, so it might just reflect my bias.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org