You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/22 23:23:01 UTC

[GitHub] [flink] TengHu opened a new pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

TengHu opened a new pull request #12297:
URL: https://github.com/apache/flink/pull/12297


   > ## What is the purpose of the change
   > Flink triggers all panes belonging to one window at the same time. In other words, all panes are aligned and their triggers all fire simultaneously, causing the spiking workload effect. This pull request adds WindowStagger to generate staggering offset for each window assignment at runtime, so the workloads are distributed across time. Hence each window assignment is based on window size, window offset and staggering offset (generated in runtime).
   > 
   > This change only modifies TumblingProcessingTimeWindows, will send out other windows change in other PRs.
   > 
   > ## Brief change log
   > * _Add WindowStagger for generating staggering offsets_
   > * _Enable TumblingProcessingTimeWindows to generate staggering offsets if user enabled_
   > 
   > ## Verifying this change
   > This change is already covered by existing tests, such as _TumblingProcessingTimeWindowsTest_.
   > 
   > This change added tests and can be verified as follows:
   > 
   > * _Added unit tests for WindowStagger_
   > * _Validated the change by running in our clusters with 3500 task managers in total on a stateful streaming program using sliding and tumbling windowing. Some dashboards are shown below_
   > 
   > ![](https://camo.githubusercontent.com/298c26fefe598c1b4605a83f84451adfde4fae50/68747470733a2f2f6973737565732e6170616368652e6f72672f6a6972612f7365637572652f6174746163686d656e742f31323937313835362f737461676765725f77696e646f775f64656c61792e706e67)
   > ![](https://camo.githubusercontent.com/3c4769bd8b7352c8860eaaa4bc6bbffc64104975/68747470733a2f2f6973737565732e6170616368652e6f72672f6a6972612f7365637572652f6174746163686d656e742f31323937313835372f737461676765725f77696e646f775f7468726f7567687075742e706e67)
   > ![](https://camo.githubusercontent.com/5fd01124c4c21d9388eab78c498c928ff6a651db/68747470733a2f2f6973737565732e6170616368652e6f72672f6a6972612f7365637572652f6174746163686d656e742f31323937313835392f737461676765725f77696e646f772e706e67)
   > 
   > _some system metrics_
   > 
   > ![buffers_in_queue](https://user-images.githubusercontent.com/10646097/60139232-00f29d80-9762-11e9-84f4-3bfbde28c028.png)
   > ![buffer_usage](https://user-images.githubusercontent.com/10646097/60139234-03ed8e00-9762-11e9-99d1-de845d02a8c6.png)
   > ![output_record_rate](https://user-images.githubusercontent.com/10646097/60139237-064fe800-9762-11e9-959d-2db96c7f7bf6.png)
   > 
   > ## Does this pull request potentially affect one of the following parts:
   > * Dependencies (does it add or upgrade a dependency): no
   > * The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes, TumblingProcessingTimeWindows(potentially all WindowAssigners)
   > * The serializers: no
   > * The runtime per-record code paths (performance sensitive): don't know, probably no ?
   > * Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
   > * The S3 file system connector: no
   > 
   > ## Documentation
   > * Does this pull request introduce a new feature? yes
   > * If yes, how is the feature documented? JavaDocs
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12297:
URL: https://github.com/apache/flink/pull/12297#issuecomment-632945310


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2064",
       "triggerID" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2065",
       "triggerID" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0a5d093998b2dcdd13a8213fede0ee213ad8ae31",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3284",
       "triggerID" : "0a5d093998b2dcdd13a8213fede0ee213ad8ae31",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2065) 
   * 0a5d093998b2dcdd13a8213fede0ee213ad8ae31 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3284) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12297:
URL: https://github.com/apache/flink/pull/12297#issuecomment-632945310


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2064",
       "triggerID" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 52b0395a9fc4d3fa99def97fe79b42edef32bcf6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2064) 
   * a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TengHu commented on a change in pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
TengHu commented on a change in pull request #12297:
URL: https://github.com/apache/flink/pull/12297#discussion_r439077869



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
##########
@@ -46,28 +46,44 @@
 
 	private final long size;
 
-	private final long offset;
+	private final long globalOffset;
 
-	private TumblingProcessingTimeWindows(long size, long offset) {
+	private Long staggerOffset = null;
+
+	private final WindowStagger windowStagger;
+
+	private TumblingProcessingTimeWindows(long size, long offset, WindowStagger windowStagger) {
 		if (Math.abs(offset) >= size) {
 			throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy abs(offset) < size");
 		}
 
 		this.size = size;
-		this.offset = offset;
+		this.globalOffset = offset;
+		this.windowStagger = windowStagger;
 	}
 
 	@Override
 	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
 		final long now = context.getCurrentProcessingTime();
-		long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
+		if (staggerOffset == null) {
+			staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
+		}
+		long start = TimeWindow.getWindowStartWithOffset(now, (globalOffset + staggerOffset) % size, size);
 		return Collections.singletonList(new TimeWindow(start, start + size));
 	}
 
 	public long getSize() {
 		return size;
 	}
 
+	public long getGlobalOffset() {

Review comment:
       Removed

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowStagger.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * A {@code WindowStagger} staggers offset in runtime for each window assignment.
+ */
+public enum WindowStagger {
+	/**
+	 * Default mode,  all panes fire at the same time across all partitions.
+	 */
+	ALIGNED {
+		@Override
+		public long getStaggerOffset(
+			final long currentProcessingTime,
+			final long size) {
+			return 0L;
+		}
+	},
+
+	/**
+	 * Stagger offset is sampled from uniform distribution U(0, WindowSize) when first event ingested in the partitioned operator.
+	 */
+	RANDOM {
+		@Override
+		public long getStaggerOffset(
+			final long currentProcessingTime,
+			final long size) {
+			return (long) (ThreadLocalRandom.current().nextDouble() * size);
+		}
+	},
+
+	/**
+	 * Stagger offset is the ingestion delay in processing time, which is the difference between first event ingestion time and its corresponding processing window start time
+	 * in the partitioned operator. In other words, each partitioned window starts when its first pane created.

Review comment:
       Reworded

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
##########
@@ -107,7 +123,22 @@ public static TumblingProcessingTimeWindows of(Time size) {
 	 * @return The time policy.
 	 */
 	public static TumblingProcessingTimeWindows of(Time size, Time offset) {
-		return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
+		return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds(), WindowStagger.ALIGNED);
+	}
+
+	/**
+	 * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns
+	 * elements to time windows based on the element timestamp, offset and a staggering offset sampled
+	 * from uniform distribution(0, window size) for each pane.

Review comment:
       Fixed the comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TengHu commented on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
TengHu commented on pull request #12297:
URL: https://github.com/apache/flink/pull/12297#issuecomment-632947574


   @aljoscha 
   Here is the new PR. I will make the change for other window assigners after this one is approved.
   
   Thank you,
   Niel


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12297:
URL: https://github.com/apache/flink/pull/12297#issuecomment-632945310


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2064",
       "triggerID" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2065",
       "triggerID" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0a5d093998b2dcdd13a8213fede0ee213ad8ae31",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3284",
       "triggerID" : "0a5d093998b2dcdd13a8213fede0ee213ad8ae31",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0a113a9888b426fc59223e71f9b407f2f26793b9",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3346",
       "triggerID" : "0a113a9888b426fc59223e71f9b407f2f26793b9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0a5d093998b2dcdd13a8213fede0ee213ad8ae31 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3284) 
   * 0a113a9888b426fc59223e71f9b407f2f26793b9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3346) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12297:
URL: https://github.com/apache/flink/pull/12297#issuecomment-632945310


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2064",
       "triggerID" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2065",
       "triggerID" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 52b0395a9fc4d3fa99def97fe79b42edef32bcf6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2064) 
   * a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2065) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12297:
URL: https://github.com/apache/flink/pull/12297#issuecomment-632945310


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2064",
       "triggerID" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2065",
       "triggerID" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 52b0395a9fc4d3fa99def97fe79b42edef32bcf6 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2064) 
   * a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2065) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
aljoscha commented on pull request #12297:
URL: https://github.com/apache/flink/pull/12297#issuecomment-643170071


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TengHu commented on a change in pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
TengHu commented on a change in pull request #12297:
URL: https://github.com/apache/flink/pull/12297#discussion_r432696612



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
##########
@@ -46,28 +46,44 @@
 
 	private final long size;
 
-	private final long offset;
+	private final long globalOffset;
 
-	private TumblingProcessingTimeWindows(long size, long offset) {
+	private Long staggerOffset = null;
+
+	private final WindowStagger windowStagger;
+
+	private TumblingProcessingTimeWindows(long size, long offset, WindowStagger windowStagger) {
 		if (Math.abs(offset) >= size) {
 			throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy abs(offset) < size");
 		}
 
 		this.size = size;
-		this.offset = offset;
+		this.globalOffset = offset;
+		this.windowStagger = windowStagger;
 	}
 
 	@Override
 	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
 		final long now = context.getCurrentProcessingTime();
-		long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
+		if (staggerOffset == null) {
+			staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);

Review comment:
       Yes, your interpretation is correct, the intention was to align the panes with respect to the first event evert ingested.
   
   Events arrive at different times among partitions, the differences were usually caused by the design of partition keys, network delay, etc, which led to an implicit staggering (normal distribution in our case).
   
   This is useful because compared to the ALIGNED and RANDOM, this one staggers the pane but still preserves some useful alignments (for example, in our geospatial applications, windowing on events partitioned by cities still trigger at the same time if they're under same time zone ).
   
   Therefore, we think this would be a useful option for staggering.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #12297:
URL: https://github.com/apache/flink/pull/12297#discussion_r438698136



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
##########
@@ -107,7 +123,22 @@ public static TumblingProcessingTimeWindows of(Time size) {
 	 * @return The time policy.
 	 */
 	public static TumblingProcessingTimeWindows of(Time size, Time offset) {
-		return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
+		return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds(), WindowStagger.ALIGNED);
+	}
+
+	/**
+	 * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns
+	 * elements to time windows based on the element timestamp, offset and a staggering offset sampled
+	 * from uniform distribution(0, window size) for each pane.

Review comment:
       Is this correct? The staggering offset depends on the passed in `WindowStagger`, right?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
##########
@@ -46,28 +46,44 @@
 
 	private final long size;
 
-	private final long offset;
+	private final long globalOffset;
 
-	private TumblingProcessingTimeWindows(long size, long offset) {
+	private Long staggerOffset = null;
+
+	private final WindowStagger windowStagger;
+
+	private TumblingProcessingTimeWindows(long size, long offset, WindowStagger windowStagger) {
 		if (Math.abs(offset) >= size) {
 			throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy abs(offset) < size");
 		}
 
 		this.size = size;
-		this.offset = offset;
+		this.globalOffset = offset;
+		this.windowStagger = windowStagger;
 	}
 
 	@Override
 	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
 		final long now = context.getCurrentProcessingTime();
-		long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
+		if (staggerOffset == null) {
+			staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
+		}
+		long start = TimeWindow.getWindowStartWithOffset(now, (globalOffset + staggerOffset) % size, size);
 		return Collections.singletonList(new TimeWindow(start, start + size));
 	}
 
 	public long getSize() {
 		return size;
 	}
 
+	public long getGlobalOffset() {

Review comment:
       I don't think we need these additional public getters.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowStagger.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * A {@code WindowStagger} staggers offset in runtime for each window assignment.
+ */
+public enum WindowStagger {
+	/**
+	 * Default mode,  all panes fire at the same time across all partitions.
+	 */
+	ALIGNED {
+		@Override
+		public long getStaggerOffset(
+			final long currentProcessingTime,
+			final long size) {
+			return 0L;
+		}
+	},
+
+	/**
+	 * Stagger offset is sampled from uniform distribution U(0, WindowSize) when first event ingested in the partitioned operator.
+	 */
+	RANDOM {
+		@Override
+		public long getStaggerOffset(
+			final long currentProcessingTime,
+			final long size) {
+			return (long) (ThreadLocalRandom.current().nextDouble() * size);
+		}
+	},
+
+	/**
+	 * Stagger offset is the ingestion delay in processing time, which is the difference between first event ingestion time and its corresponding processing window start time
+	 * in the partitioned operator. In other words, each partitioned window starts when its first pane created.

Review comment:
       I think this is incorrect and should be more like the description of `RANDOM`. We determine the stagger offset when "the first event is ingested in the partitioned operator". It's not determined for each pane.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
##########
@@ -107,7 +123,22 @@ public static TumblingProcessingTimeWindows of(Time size) {
 	 * @return The time policy.
 	 */
 	public static TumblingProcessingTimeWindows of(Time size, Time offset) {
-		return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
+		return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds(), WindowStagger.ALIGNED);
+	}
+
+	/**
+	 * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns
+	 * elements to time windows based on the element timestamp, offset and a staggering offset sampled
+	 * from uniform distribution(0, window size) for each pane.
+	 *
+	 * @param size The size of the generated windows.
+	 * @param offset The offset which window start would be shifted by.
+	 * @param windowStagger The utility that produces staggering offset in runtime.
+	 *
+	 * @return The time policy.
+	 */
+	public static TumblingProcessingTimeWindows of(Time size, Time offset, WindowStagger windowStagger) throws Exception {

Review comment:
       We don't need the `throws` here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12297:
URL: https://github.com/apache/flink/pull/12297#issuecomment-632945310


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2064",
       "triggerID" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2065",
       "triggerID" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2065) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12297:
URL: https://github.com/apache/flink/pull/12297#issuecomment-632945310


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2064",
       "triggerID" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2065",
       "triggerID" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0a5d093998b2dcdd13a8213fede0ee213ad8ae31",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3284",
       "triggerID" : "0a5d093998b2dcdd13a8213fede0ee213ad8ae31",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0a113a9888b426fc59223e71f9b407f2f26793b9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0a113a9888b426fc59223e71f9b407f2f26793b9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0a5d093998b2dcdd13a8213fede0ee213ad8ae31 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3284) 
   * 0a113a9888b426fc59223e71f9b407f2f26793b9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TengHu commented on a change in pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
TengHu commented on a change in pull request #12297:
URL: https://github.com/apache/flink/pull/12297#discussion_r432696612



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
##########
@@ -46,28 +46,44 @@
 
 	private final long size;
 
-	private final long offset;
+	private final long globalOffset;
 
-	private TumblingProcessingTimeWindows(long size, long offset) {
+	private Long staggerOffset = null;
+
+	private final WindowStagger windowStagger;
+
+	private TumblingProcessingTimeWindows(long size, long offset, WindowStagger windowStagger) {
 		if (Math.abs(offset) >= size) {
 			throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy abs(offset) < size");
 		}
 
 		this.size = size;
-		this.offset = offset;
+		this.globalOffset = offset;
+		this.windowStagger = windowStagger;
 	}
 
 	@Override
 	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
 		final long now = context.getCurrentProcessingTime();
-		long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
+		if (staggerOffset == null) {
+			staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);

Review comment:
       Yes, your interpretation is correct, the intention was to align the panes with respect to the first event ever ingested.
   
   Events arrive at different times among partitions, the differences were usually caused by the design of partition keys, network delay, etc, which led to an implicit staggering (normal distribution in our case).
   
   This is useful because compared to the ALIGNED and RANDOM, this one staggers the pane but still preserves some useful alignments (for example, in our geospatial applications, windowing on events partitioned by cities still trigger at the same time if they're under same time zone ).
   
   Therefore, we think this would be a useful option for staggering.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #12297:
URL: https://github.com/apache/flink/pull/12297#discussion_r429763639



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
##########
@@ -46,28 +46,44 @@
 
 	private final long size;
 
-	private final long offset;
+	private final long globalOffset;
 
-	private TumblingProcessingTimeWindows(long size, long offset) {
+	private Long staggerOffset = null;
+
+	private final WindowStagger windowStagger;
+
+	private TumblingProcessingTimeWindows(long size, long offset, WindowStagger windowStagger) {
 		if (Math.abs(offset) >= size) {
 			throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy abs(offset) < size");
 		}
 
 		this.size = size;
-		this.offset = offset;
+		this.globalOffset = offset;
+		this.windowStagger = windowStagger;
 	}
 
 	@Override
 	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
 		final long now = context.getCurrentProcessingTime();
-		long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
+		if (staggerOffset == null) {
+			staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);

Review comment:
       I think my comment from your other PR still hold: Does WindowStagger.NATURAL make sense if we keep the stagger offset fixed per
   WindowAssigner instance? I don't know if this was the intention but that's what
   the code does.
   
   This will set a fixed offset when the first event is received, Flink will not create a new `WindowAssigner` instance for new panes, so those new panes will use the same offset that was set for the first pane that we ever saw. That's why I'm saying that `NATURAL` doesn't really do anything here. (Or I'm misunderstanding sth.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
aljoscha commented on pull request #12297:
URL: https://github.com/apache/flink/pull/12297#issuecomment-643269153


   Thanks for your contribution! I merged this 👌


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12297:
URL: https://github.com/apache/flink/pull/12297#issuecomment-632945310


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2064",
       "triggerID" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2065",
       "triggerID" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0a5d093998b2dcdd13a8213fede0ee213ad8ae31",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0a5d093998b2dcdd13a8213fede0ee213ad8ae31",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2065) 
   * 0a5d093998b2dcdd13a8213fede0ee213ad8ae31 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12297:
URL: https://github.com/apache/flink/pull/12297#issuecomment-632945310


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 52b0395a9fc4d3fa99def97fe79b42edef32bcf6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #12297:
URL: https://github.com/apache/flink/pull/12297#discussion_r436527795



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
##########
@@ -46,28 +46,44 @@
 
 	private final long size;
 
-	private final long offset;
+	private final long globalOffset;
 
-	private TumblingProcessingTimeWindows(long size, long offset) {
+	private Long staggerOffset = null;
+
+	private final WindowStagger windowStagger;
+
+	private TumblingProcessingTimeWindows(long size, long offset, WindowStagger windowStagger) {
 		if (Math.abs(offset) >= size) {
 			throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy abs(offset) < size");
 		}
 
 		this.size = size;
-		this.offset = offset;
+		this.globalOffset = offset;
+		this.windowStagger = windowStagger;
 	}
 
 	@Override
 	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
 		final long now = context.getCurrentProcessingTime();
-		long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
+		if (staggerOffset == null) {
+			staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);

Review comment:
       Ok, now I understand. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12297:
URL: https://github.com/apache/flink/pull/12297#issuecomment-632942068


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 8851d6a45084f93f4ee25085dcd135dc8cde2e1f (Fri May 22 23:24:48 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12297:
URL: https://github.com/apache/flink/pull/12297#issuecomment-632945310


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2064",
       "triggerID" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2065",
       "triggerID" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0a5d093998b2dcdd13a8213fede0ee213ad8ae31",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3284",
       "triggerID" : "0a5d093998b2dcdd13a8213fede0ee213ad8ae31",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0a5d093998b2dcdd13a8213fede0ee213ad8ae31 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3284) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TengHu commented on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
TengHu commented on pull request #12297:
URL: https://github.com/apache/flink/pull/12297#issuecomment-642935485


   @aljoscha I updated the PR. I hope that addresses your concerns. 
   
   Thank you


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12297:
URL: https://github.com/apache/flink/pull/12297#issuecomment-632945310


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2064",
       "triggerID" : "52b0395a9fc4d3fa99def97fe79b42edef32bcf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2065",
       "triggerID" : "a1304a3c20a9ba848ddc3308d6da9ecab8a85a5b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0a5d093998b2dcdd13a8213fede0ee213ad8ae31",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3284",
       "triggerID" : "0a5d093998b2dcdd13a8213fede0ee213ad8ae31",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0a113a9888b426fc59223e71f9b407f2f26793b9",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3346",
       "triggerID" : "0a113a9888b426fc59223e71f9b407f2f26793b9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0a113a9888b426fc59223e71f9b407f2f26793b9 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3346) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha closed pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
aljoscha closed pull request #12297:
URL: https://github.com/apache/flink/pull/12297


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TengHu commented on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
TengHu commented on pull request #12297:
URL: https://github.com/apache/flink/pull/12297#issuecomment-639894888


   @aljoscha Gentle ping on this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TengHu commented on a change in pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

Posted by GitBox <gi...@apache.org>.
TengHu commented on a change in pull request #12297:
URL: https://github.com/apache/flink/pull/12297#discussion_r439016010



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
##########
@@ -107,7 +123,22 @@ public static TumblingProcessingTimeWindows of(Time size) {
 	 * @return The time policy.
 	 */
 	public static TumblingProcessingTimeWindows of(Time size, Time offset) {
-		return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
+		return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds(), WindowStagger.ALIGNED);
+	}
+
+	/**
+	 * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns
+	 * elements to time windows based on the element timestamp, offset and a staggering offset sampled
+	 * from uniform distribution(0, window size) for each pane.
+	 *
+	 * @param size The size of the generated windows.
+	 * @param offset The offset which window start would be shifted by.
+	 * @param windowStagger The utility that produces staggering offset in runtime.
+	 *
+	 * @return The time policy.
+	 */
+	public static TumblingProcessingTimeWindows of(Time size, Time offset, WindowStagger windowStagger) throws Exception {

Review comment:
       Removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org