You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/26 19:44:42 UTC

[GitHub] [flink] rkhachatryan commented on a change in pull request #13209: [FLINK-18832][datastream] Add compatible check for blocking partition with buffer timeout

rkhachatryan commented on a change in pull request #13209:
URL: https://github.com/apache/flink/pull/13209#discussion_r477537338



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -285,11 +285,7 @@ public StreamGraph generate() {
 			alreadyTransformed.put(transform, transformedIds);
 		}
 
-		if (transform.getBufferTimeout() >= 0) {
-			streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
-		} else {
-			streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout);

Review comment:
       Can you explain the reasoning behind removing this branch?
   
   I think we still need it, because if the user doesn't set a timeout for transformation then default job timeout should be used.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -591,6 +593,13 @@ private void connect(Integer headOfChain, StreamEdge edge) {
 		}
 	}
 
+	private void checkCompatible(ResultPartitionType type, long bufferTimeout) {
+		if (type.isBlocking() && bufferTimeout != -1) {
+			throw new UnsupportedOperationException("Blocking partition does not support buffer timeout at the" +

Review comment:
       I think adding the timeout value and `edge.toString` could help to find out misconfguration to the user.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
##########
@@ -609,6 +609,33 @@ public void testShuffleModeUndefined() {
 			sourceAndMapVertex.getProducedDataSets().get(0).getResultType());
 	}
 
+	@Test(expected = UnsupportedOperationException.class)
+	public void testConflictShuffleModeWithBufferTimeout() {
+		testCompatibleShuffleModeWithBufferTimeout(ShuffleMode.BATCH);
+	}
+
+	@Test
+	public void testNormalShuffleModeWithBufferTimeout() {
+		testCompatibleShuffleModeWithBufferTimeout(ShuffleMode.PIPELINED);
+	}
+
+	private void testCompatibleShuffleModeWithBufferTimeout(ShuffleMode shuffleMode) {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
+		sourceDataStream.getTransformation().setBufferTimeout(100);

Review comment:
       If we keep using the default timeout (discussion above),
   then the issue also happens with `StreamExecutionEnvironment.setBufferTimeout(100)` or even default, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org