You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/29 15:50:11 UTC

[GitHub] [flink] aljoscha opened a new pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

aljoscha opened a new pull request #12412:
URL: https://github.com/apache/flink/pull/12412


   ## What is the purpose of the change
   
   From the Jira issue:
   Currently, we have an interface `WatermarkStrategy`, which is a `TimestampAssignerSupplier` and `WatermarkGeneratorSupplier`. The very first design (which is also currently implemented) also added `WatermarkStrategies` as a convenience builder for a `WatermarkStrategy`. However, I don't think users will ever implement a `WatermarkStrategy` but always wrap it in a builder. I also think that `WatermarkStrategy` itself is already that builder and we currently have two levels of builders, which also makes them harder to use in the `DataStream API` because of type checking issues.
   
   I'm proposing to remove `WatermarkStrategies` and to instead put the static methods directly into `WatermarkStrategy` and also to remove the `build()` method. Instead of a `build()` method, API methods on `WatermarkStrategy` just keep "piling" features on top of a base `WatermarkStrategy` via wrapping.
   
   Example to show what I mean for the API (current):
   ```
   DataStream<MyType> input = ...;
   input.assignTimestampsAndWatermarks(
       WatermarkStrategies.<MyType>.forMonotonousTimestamps().build());
   ```
   
   with this change:
   ```
   DataStream<MyType> input = ...;
   input.assignTimestampsAndWatermarks(
       WatermarkStrategy.forMonotonousTimestamps());
   ```
   
   ## Brief change log
   
   - remove `WatermarkStrategies` and instead move all code to `WatermarkStrategy`
   
   
   ## Verifying this change
   
   - covered by existing tests
   - new tests are added to verify how `WatermarkStrategy` can be used on `DataStream.assignTimestampsAndWatermarks()`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: *yes*
   
   ## Documentation
   
   I didn't yet change the documentation. I would update that once we agree on the change.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on pull request #12412:
URL: https://github.com/apache/flink/pull/12412#issuecomment-638112951


   Looks great! +1 for this change from my side.
   
   We need to think, though, how we have this with the 1.11 release.
   If we merge this only to `master` then we break the API.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #12412:
URL: https://github.com/apache/flink/pull/12412#discussion_r433930559



##########
File path: flink-core/src/test/java/org/apache/flink/api/common/eventtime/AscendingTimestampsWatermarksTest.java
##########
@@ -30,7 +30,10 @@
 	@Test
 	public void testWatermarkBeforeRecords() {
 		final TestingWatermarkOutput output = new TestingWatermarkOutput();
-		final AscendingTimestampsWatermarks<Object> watermarks = new AscendingTimestampsWatermarks<>();
+		final WatermarkGenerator<Object> watermarks =
+				WatermarkStrategy
+						.forMonotonousTimestamps()
+						.createWatermarkGenerator(() -> null);

Review comment:
       Yes, that's a very good point!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12412:
URL: https://github.com/apache/flink/pull/12412#issuecomment-636047460


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 55b61138a63ce9fde288910e00e533798c8cbaec (Fri May 29 15:51:43 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12412:
URL: https://github.com/apache/flink/pull/12412#issuecomment-636063842


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2465",
       "triggerID" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0461a736a590e75496c19176292fb96dcd148196",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2567",
       "triggerID" : "0461a736a590e75496c19176292fb96dcd148196",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 55b61138a63ce9fde288910e00e533798c8cbaec Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2465) 
   * 0461a736a590e75496c19176292fb96dcd148196 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2567) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
aljoscha commented on pull request #12412:
URL: https://github.com/apache/flink/pull/12412#issuecomment-638275915


   Thanks for the reviews! I'm merging


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
aljoscha commented on pull request #12412:
URL: https://github.com/apache/flink/pull/12412#issuecomment-638170374


   I would merge this to `release-1.11` as well, https://issues.apache.org/jira/browse/FLINK-18011 is marked as a blocker for Flink 1.11 and no-one has objected to this. I interpret this as a mandate to merge it there.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12412:
URL: https://github.com/apache/flink/pull/12412#issuecomment-636063842


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2465",
       "triggerID" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0461a736a590e75496c19176292fb96dcd148196",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2567",
       "triggerID" : "0461a736a590e75496c19176292fb96dcd148196",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c7efdd8b719ab6267cb70435f5ce43fbecbe105",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0c7efdd8b719ab6267cb70435f5ce43fbecbe105",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d31906be96d953c66880832c12bb6a0fa161c69b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2578",
       "triggerID" : "d31906be96d953c66880832c12bb6a0fa161c69b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0461a736a590e75496c19176292fb96dcd148196 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2567) 
   * 0c7efdd8b719ab6267cb70435f5ce43fbecbe105 UNKNOWN
   * d31906be96d953c66880832c12bb6a0fa161c69b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2578) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
aljoscha commented on pull request #12412:
URL: https://github.com/apache/flink/pull/12412#issuecomment-637589309


   @StephanEwen  Thanks for the review!
   
   about 1) I had a version where `WatermarkStrategy` is an `abstract class`, where the strategy implementations were final static private classes. This doesn't work for an `interface`, so I will move them out.
   
   about 2) I will write more class-level documentation and also structure it as you proposed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #12412:
URL: https://github.com/apache/flink/pull/12412#discussion_r434086063



##########
File path: flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
##########
@@ -49,4 +53,158 @@
 	 */
 	@Override
 	WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
+
+	/**
+	 * Creates a watermark strategy for situations with monotonously ascending timestamps.
+	 *
+	 * <p>The watermarks are generated periodically and tightly follow the latest
+	 * timestamp in the data. The delay introduced by this strategy is mainly the periodic interval
+	 * in which the watermarks are generated.
+	 *
+	 * @see AscendingTimestampsWatermarks
+	 */
+	static <T> WatermarkStrategy<T> forMonotonousTimestamps() {

Review comment:
       True, I commented while only looking at the diff, not the whole class :(




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12412:
URL: https://github.com/apache/flink/pull/12412#issuecomment-636063842


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2465",
       "triggerID" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0461a736a590e75496c19176292fb96dcd148196",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2567",
       "triggerID" : "0461a736a590e75496c19176292fb96dcd148196",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c7efdd8b719ab6267cb70435f5ce43fbecbe105",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0c7efdd8b719ab6267cb70435f5ce43fbecbe105",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 55b61138a63ce9fde288910e00e533798c8cbaec Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2465) 
   * 0461a736a590e75496c19176292fb96dcd148196 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2567) 
   * 0c7efdd8b719ab6267cb70435f5ce43fbecbe105 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #12412:
URL: https://github.com/apache/flink/pull/12412#discussion_r433847667



##########
File path: flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
##########
@@ -49,4 +53,158 @@
 	 */
 	@Override
 	WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
+
+	/**
+	 * Creates a watermark strategy for situations with monotonously ascending timestamps.
+	 *
+	 * <p>The watermarks are generated periodically and tightly follow the latest
+	 * timestamp in the data. The delay introduced by this strategy is mainly the periodic interval
+	 * in which the watermarks are generated.
+	 *
+	 * @see AscendingTimestampsWatermarks
+	 */
+	static <T> WatermarkStrategy<T> forMonotonousTimestamps() {

Review comment:
       Shouldn't this be `public` and not `package-private`? (The same for the other methods)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12412:
URL: https://github.com/apache/flink/pull/12412#issuecomment-636063842


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 55b61138a63ce9fde288910e00e533798c8cbaec UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on a change in pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on a change in pull request #12412:
URL: https://github.com/apache/flink/pull/12412#discussion_r433877916



##########
File path: flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
##########
@@ -49,4 +53,158 @@
 	 */
 	@Override
 	WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
+
+	/**
+	 * Creates a watermark strategy for situations with monotonously ascending timestamps.
+	 *
+	 * <p>The watermarks are generated periodically and tightly follow the latest
+	 * timestamp in the data. The delay introduced by this strategy is mainly the periodic interval
+	 * in which the watermarks are generated.
+	 *
+	 * @see AscendingTimestampsWatermarks
+	 */
+	static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
+		return (ctx) -> new AscendingTimestampsWatermarks<>();
+	}
+
+	/**
+	 * Creates a watermark strategy for situations where records are out of order, but you
+	 * can place an upper bound on how far the events are out of order. An out-of-order bound B
+	 * means that once the an event with timestamp T was encountered, no events older than {@code T
+	 * - B} will follow any more.
+	 *
+	 * <p>The watermarks are generated periodically. The delay introduced by this watermark
+	 * strategy
+	 * is the periodic interval length, plus the out of orderness bound.
+	 *
+	 * @see BoundedOutOfOrdernessWatermarks
+	 */
+	static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
+		return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
+	}
+
+	/**
+	 * Creates a watermark strategy based on an existing {@link
+	 * WatermarkGeneratorSupplier}.
+	 */
+	static <T> WatermarkStrategy<T> forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) {
+		return generatorSupplier::createWatermarkGenerator;
+	}
+
+	/**
+	 * Creates a watermark strategy that generates no watermarks at all.
+	 * This may be useful in scenarios that do pure processing-time based stream processing.
+	 */
+	static <T> WatermarkStrategy<T> noWatermarks() {
+		return (ctx) -> new NoWatermarksGenerator<>();
+	}
+
+	/**
+	 * Creates Adds the given {@link TimestampAssigner} (via a {@link TimestampAssignerSupplier}) to
+	 * this {@link WatermarkStrategy}.
+	 *
+	 * <p>You can use this when a {@link TimestampAssigner} needs additional context, for example
+	 * access to the metrics system.
+	 *
+	 * <pre>
+	 * {@code WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
+	 *   .forMonotonousTimestamps()
+	 *   .withTimestampAssigner((ctx) -> new MetricsReportingAssigner(ctx));
+	 * }</pre>
+	 */
+	default WatermarkStrategy<T> withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner) {
+		checkNotNull(timestampAssigner, "timestampAssigner");
+		return new WithTimestampAssigner<>(this, timestampAssigner);
+	}
+
+	/**
+	 * Creates a new {@link WatermarkStrategy} with the {@link TimestampAssigner} overridden by the
+	 * provided assigner.
+	 *
+	 * <p>You can use this in case you want to specify a {@link TimestampAssigner} via a lambda
+	 * function.
+	 *
+	 * <pre>
+	 * {@code WatermarkStrategy<CustomObject> wmStrategy = WatermarkStrategy
+	 *   .forMonotonousTimestamps()
+	 *   .withTimestampAssigner((event, timestamp) -> event.getTimestamp());
+	 * }</pre>
+	 */
+	default WatermarkStrategy<T> withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner) {
+		checkNotNull(timestampAssigner, "timestampAssigner");
+		return new WithTimestampAssigner<>(this, TimestampAssignerSupplier.of(timestampAssigner));
+	}
+
+	/**
+	 * Add an idle timeout to the watermark strategy. If no records flow in a partition of a stream
+	 * for that amount of time, then that partition is considered "idle" and will not hold back the
+	 * progress of watermarks in downstream operators.
+	 *
+	 * <p>Idleness can be important if some partitions have little data and might not have events
+	 * during
+	 * some periods. Without idleness, these streams can stall the overall event time progress of
+	 * the application.
+	 */
+	default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
+		checkNotNull(idleTimeout, "idleTimeout");
+		checkArgument(!(idleTimeout.isZero() || idleTimeout.isNegative()),
+				"idleTimeout must be greater than zero");
+		return new WithIdlenessStrategy<>(this, idleTimeout);
+	}
+
+	/**
+	 * A {@link WatermarkStrategy} that adds idleness detection on top of the wrapped strategy.
+	 */
+	class WithIdlenessStrategy<T> implements WatermarkStrategy<T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final WatermarkStrategy<T> baseStrategy;
+		private final Duration idlenessTimeout;
+
+		private WithIdlenessStrategy(WatermarkStrategy<T> baseStrategy, Duration idlenessTimeout) {
+			this.baseStrategy = baseStrategy;
+			this.idlenessTimeout = idlenessTimeout;
+		}
+
+		@Override
+		public TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
+			return baseStrategy.createTimestampAssigner(context);
+		}
+
+		@Override
+		public WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
+			return new WatermarksWithIdleness<>(baseStrategy.createWatermarkGenerator(context),
+					idlenessTimeout);
+		}

Review comment:
       Not sure. Is `equals` and `hashode` well-defined here? These are no classes describing data or state, but classes mainly representing logic.
   
   If testing is the sole purpose, then the right way to go, in my opinion, is writing a `Matcher` or an utility method that tests for the equality. Pushing a not-very-well-defined `equals()` implementation into production code just to simplify tests is not the right way.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
aljoscha commented on pull request #12412:
URL: https://github.com/apache/flink/pull/12412#issuecomment-636046671


   By now I'm wondering wether we should make `WatermarkStrategy` `@PublicEvolving`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12412:
URL: https://github.com/apache/flink/pull/12412#issuecomment-636063842


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2465",
       "triggerID" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0461a736a590e75496c19176292fb96dcd148196",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2567",
       "triggerID" : "0461a736a590e75496c19176292fb96dcd148196",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c7efdd8b719ab6267cb70435f5ce43fbecbe105",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0c7efdd8b719ab6267cb70435f5ce43fbecbe105",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d31906be96d953c66880832c12bb6a0fa161c69b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2578",
       "triggerID" : "d31906be96d953c66880832c12bb6a0fa161c69b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0c7efdd8b719ab6267cb70435f5ce43fbecbe105 UNKNOWN
   * d31906be96d953c66880832c12bb6a0fa161c69b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2578) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12412:
URL: https://github.com/apache/flink/pull/12412#issuecomment-636063842


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2465",
       "triggerID" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0461a736a590e75496c19176292fb96dcd148196",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0461a736a590e75496c19176292fb96dcd148196",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 55b61138a63ce9fde288910e00e533798c8cbaec Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2465) 
   * 0461a736a590e75496c19176292fb96dcd148196 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #12412:
URL: https://github.com/apache/flink/pull/12412#discussion_r433839951



##########
File path: flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
##########
@@ -49,4 +53,158 @@
 	 */
 	@Override
 	WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
+
+	/**
+	 * Creates a watermark strategy for situations with monotonously ascending timestamps.
+	 *
+	 * <p>The watermarks are generated periodically and tightly follow the latest
+	 * timestamp in the data. The delay introduced by this strategy is mainly the periodic interval
+	 * in which the watermarks are generated.
+	 *
+	 * @see AscendingTimestampsWatermarks
+	 */
+	static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
+		return (ctx) -> new AscendingTimestampsWatermarks<>();
+	}
+
+	/**
+	 * Creates a watermark strategy for situations where records are out of order, but you
+	 * can place an upper bound on how far the events are out of order. An out-of-order bound B
+	 * means that once the an event with timestamp T was encountered, no events older than {@code T
+	 * - B} will follow any more.
+	 *
+	 * <p>The watermarks are generated periodically. The delay introduced by this watermark
+	 * strategy
+	 * is the periodic interval length, plus the out of orderness bound.
+	 *
+	 * @see BoundedOutOfOrdernessWatermarks
+	 */
+	static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
+		return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
+	}
+
+	/**
+	 * Creates a watermark strategy based on an existing {@link
+	 * WatermarkGeneratorSupplier}.
+	 */
+	static <T> WatermarkStrategy<T> forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) {
+		return generatorSupplier::createWatermarkGenerator;
+	}
+
+	/**
+	 * Creates a watermark strategy that generates no watermarks at all.
+	 * This may be useful in scenarios that do pure processing-time based stream processing.
+	 */
+	static <T> WatermarkStrategy<T> noWatermarks() {
+		return (ctx) -> new NoWatermarksGenerator<>();
+	}
+
+	/**
+	 * Creates Adds the given {@link TimestampAssigner} (via a {@link TimestampAssignerSupplier}) to
+	 * this {@link WatermarkStrategy}.
+	 *
+	 * <p>You can use this when a {@link TimestampAssigner} needs additional context, for example
+	 * access to the metrics system.
+	 *
+	 * <pre>
+	 * {@code WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
+	 *   .forMonotonousTimestamps()
+	 *   .withTimestampAssigner((ctx) -> new MetricsReportingAssigner(ctx));
+	 * }</pre>
+	 */
+	default WatermarkStrategy<T> withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner) {
+		checkNotNull(timestampAssigner, "timestampAssigner");
+		return new WithTimestampAssigner<>(this, timestampAssigner);
+	}
+
+	/**
+	 * Creates a new {@link WatermarkStrategy} with the {@link TimestampAssigner} overridden by the
+	 * provided assigner.
+	 *
+	 * <p>You can use this in case you want to specify a {@link TimestampAssigner} via a lambda
+	 * function.
+	 *
+	 * <pre>
+	 * {@code WatermarkStrategy<CustomObject> wmStrategy = WatermarkStrategy
+	 *   .forMonotonousTimestamps()
+	 *   .withTimestampAssigner((event, timestamp) -> event.getTimestamp());
+	 * }</pre>
+	 */
+	default WatermarkStrategy<T> withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner) {
+		checkNotNull(timestampAssigner, "timestampAssigner");
+		return new WithTimestampAssigner<>(this, TimestampAssignerSupplier.of(timestampAssigner));
+	}
+
+	/**
+	 * Add an idle timeout to the watermark strategy. If no records flow in a partition of a stream
+	 * for that amount of time, then that partition is considered "idle" and will not hold back the
+	 * progress of watermarks in downstream operators.
+	 *
+	 * <p>Idleness can be important if some partitions have little data and might not have events
+	 * during
+	 * some periods. Without idleness, these streams can stall the overall event time progress of
+	 * the application.
+	 */
+	default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
+		checkNotNull(idleTimeout, "idleTimeout");
+		checkArgument(!(idleTimeout.isZero() || idleTimeout.isNegative()),
+				"idleTimeout must be greater than zero");
+		return new WithIdlenessStrategy<>(this, idleTimeout);
+	}
+
+	/**
+	 * A {@link WatermarkStrategy} that adds idleness detection on top of the wrapped strategy.
+	 */
+	class WithIdlenessStrategy<T> implements WatermarkStrategy<T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final WatermarkStrategy<T> baseStrategy;
+		private final Duration idlenessTimeout;
+
+		private WithIdlenessStrategy(WatermarkStrategy<T> baseStrategy, Duration idlenessTimeout) {
+			this.baseStrategy = baseStrategy;
+			this.idlenessTimeout = idlenessTimeout;
+		}
+
+		@Override
+		public TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
+			return baseStrategy.createTimestampAssigner(context);
+		}
+
+		@Override
+		public WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
+			return new WatermarksWithIdleness<>(baseStrategy.createWatermarkGenerator(context),
+					idlenessTimeout);
+		}

Review comment:
       nit: we should add hashCode/equals here, otherwise it is difficult to compare strategies for e.g. for testing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12412:
URL: https://github.com/apache/flink/pull/12412#issuecomment-636063842


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2465",
       "triggerID" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0461a736a590e75496c19176292fb96dcd148196",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2567",
       "triggerID" : "0461a736a590e75496c19176292fb96dcd148196",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c7efdd8b719ab6267cb70435f5ce43fbecbe105",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0c7efdd8b719ab6267cb70435f5ce43fbecbe105",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d31906be96d953c66880832c12bb6a0fa161c69b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2578",
       "triggerID" : "d31906be96d953c66880832c12bb6a0fa161c69b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 55b61138a63ce9fde288910e00e533798c8cbaec Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2465) 
   * 0461a736a590e75496c19176292fb96dcd148196 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2567) 
   * 0c7efdd8b719ab6267cb70435f5ce43fbecbe105 UNKNOWN
   * d31906be96d953c66880832c12bb6a0fa161c69b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2578) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12412:
URL: https://github.com/apache/flink/pull/12412#issuecomment-636063842


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2465",
       "triggerID" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 55b61138a63ce9fde288910e00e533798c8cbaec Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2465) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #12412:
URL: https://github.com/apache/flink/pull/12412#discussion_r433930267



##########
File path: flink-core/src/test/java/org/apache/flink/api/common/eventtime/AscendingTimestampsWatermarksTest.java
##########
@@ -30,7 +30,10 @@
 	@Test
 	public void testWatermarkBeforeRecords() {
 		final TestingWatermarkOutput output = new TestingWatermarkOutput();
-		final AscendingTimestampsWatermarks<Object> watermarks = new AscendingTimestampsWatermarks<>();

Review comment:
       You're right, I did change that to see if the builders work because there were no tests for that before, I did add tests in `WatermarkStrategyTest` already so I will undo these changes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
aljoscha commented on pull request #12412:
URL: https://github.com/apache/flink/pull/12412#issuecomment-637610435


   I think I addressed all the comments, please take another look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12412:
URL: https://github.com/apache/flink/pull/12412#issuecomment-636063842


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2465",
       "triggerID" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 55b61138a63ce9fde288910e00e533798c8cbaec Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2465) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12412:
URL: https://github.com/apache/flink/pull/12412#issuecomment-636063842


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2465",
       "triggerID" : "55b61138a63ce9fde288910e00e533798c8cbaec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0461a736a590e75496c19176292fb96dcd148196",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2567",
       "triggerID" : "0461a736a590e75496c19176292fb96dcd148196",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c7efdd8b719ab6267cb70435f5ce43fbecbe105",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0c7efdd8b719ab6267cb70435f5ce43fbecbe105",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d31906be96d953c66880832c12bb6a0fa161c69b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d31906be96d953c66880832c12bb6a0fa161c69b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 55b61138a63ce9fde288910e00e533798c8cbaec Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2465) 
   * 0461a736a590e75496c19176292fb96dcd148196 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2567) 
   * 0c7efdd8b719ab6267cb70435f5ce43fbecbe105 UNKNOWN
   * d31906be96d953c66880832c12bb6a0fa161c69b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha closed pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
aljoscha closed pull request #12412:
URL: https://github.com/apache/flink/pull/12412


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #12412: [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #12412:
URL: https://github.com/apache/flink/pull/12412#discussion_r433853487



##########
File path: flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
##########
@@ -49,4 +53,158 @@
 	 */
 	@Override
 	WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
+
+	/**
+	 * Creates a watermark strategy for situations with monotonously ascending timestamps.
+	 *
+	 * <p>The watermarks are generated periodically and tightly follow the latest
+	 * timestamp in the data. The delay introduced by this strategy is mainly the periodic interval
+	 * in which the watermarks are generated.
+	 *
+	 * @see AscendingTimestampsWatermarks
+	 */
+	static <T> WatermarkStrategy<T> forMonotonousTimestamps() {

Review comment:
       it's an interface




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org