You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/03/05 19:47:00 UTC
[jira] [Commented] (FLINK-8480) Implement Java API to expose join
functionality of TimeBoundedStreamJoinOperator
[ https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386641#comment-16386641 ]
ASF GitHub Bot commented on FLINK-8480:
---------------------------------------
Github user bowenli86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5482#discussion_r172302583
--- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector<T2, KEY> keySelector) {
public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+ /**
+ * Specifies the time boundaries over which the join operation works, so that
+ * <pre>leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound</pre>
+ * By default both the lower and the upper bound are inclusive. This can be configured
+ * with {@link TimeBounded#lowerBoundExclusive(boolean)} and
+ * {@link TimeBounded#upperBoundExclusive(boolean)}
+ *
+ * @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound
+ * @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound
+ */
+ public TimeBounded<T1, T2, KEY> between(Time lowerBound, Time upperBound) {
+
+ TimeCharacteristic timeCharacteristic =
+ input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+ if (timeCharacteristic != TimeCharacteristic.EventTime) {
+ throw new RuntimeException("Time-bounded stream joins are only supported in event time");
+ }
+
+ checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join");
+ checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join");
+ return new TimeBounded<>(
+ input1,
+ input2,
+ lowerBound.toMilliseconds(),
+ upperBound.toMilliseconds(),
+ true,
+ true,
+ keySelector1,
+ keySelector2
+ );
+ }
+ }
+ }
+
+ /**
+ * Joined streams that have keys for both sides as well as the time boundaries over which
+ * elements should be joined defined.
+ *
+ * @param <IN1> Input type of elements from the first stream
+ * @param <IN2> Input type of elements from the second stream
+ * @param <KEY> The type of the key
+ */
+ public static class TimeBounded<IN1, IN2, KEY> {
+
+ private static final String TIMEBOUNDED_JOIN_FUNC_NAME = "TimeBoundedJoin";
--- End diff --
hmm... this might be not very relevant, but I'd prefer a single config class that holds all function's names, rather than having them scattered all over the code base.
> Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator
> --------------------------------------------------------------------------------
>
> Key: FLINK-8480
> URL: https://issues.apache.org/jira/browse/FLINK-8480
> Project: Flink
> Issue Type: Sub-task
> Reporter: Florian Schmidt
> Priority: Major
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)