You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/05 08:28:24 UTC

[GitHub] [flink] AHeise commented on a change in pull request #13539: [FLINK-19027][network] Assign exclusive buffers to LocalRecoveredInputChannel.

AHeise commented on a change in pull request #13539:
URL: https://github.com/apache/flink/pull/13539#discussion_r499424128



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -247,7 +248,18 @@ public void setup() throws IOException {
 	}
 
 	@Override
-	public CompletableFuture<?> readRecoveredState(ExecutorService executor, ChannelStateReader reader) {
+	public CompletableFuture<?> readRecoveredState(ExecutorService executor, ChannelStateReader reader) throws IOException {
+		synchronized (requestLock) {
+			if (closeFuture.isDone()) {
+				return FutureUtils.completedVoidFuture();
+			}
+			for (InputChannel inputChannel : inputChannels.values()) {
+				if (inputChannel instanceof RemoteRecoveredInputChannel) {
+					((RemoteRecoveredInputChannel) inputChannel).assignExclusiveSegments();
+				}
+			}
+		}
+

Review comment:
       I'll add. In short, the #number of required buffers is now higher than a few tests (and possibly production setups) assume. Without the lazy initialization, you cannot simulate backpressure in a few scenarios as easily.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
##########
@@ -217,15 +221,11 @@ public void testConcurrentReadStateAndProcessAndClose() throws Exception {
 				}
 			};
 
-			submitTasksAndWaitForResults(executor, new Callable[] {closeTask, readRecoveredStateTask, processStateTask});
-		} finally {
-			executor.shutdown();
+			executor.invokeAll(Arrays.asList(closeTask, readRecoveredStateTask, processStateTask));
+
 			// wait until the internal channel state recover task finishes
-			executor.awaitTermination(60, TimeUnit.SECONDS);
 			assertEquals(totalBuffers, environment.getNetworkBufferPool().getNumberOfAvailableMemorySegments());
 			assertTrue(inputGate.getCloseFuture().isDone());
-
-			environment.close();

Review comment:
       `close` is called by the `Closer`.
   
   `shutdown` + `awaitTermination` is simply the wrong method. `invokeAll` is doing what was intended. Could be an extra commit. However, it should then probably be done on all 10 places that use `submitTasksAndWaitForResults`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalRecoveredInputChannel.java
##########
@@ -42,8 +42,17 @@
 			TaskEventPublisher taskEventPublisher,
 			int initialBackOff,
 			int maxBackoff,
+			int networkBuffersPerChannel,
 			InputChannelMetrics metrics) {
-		super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, metrics.getNumBytesInLocalCounter(), metrics.getNumBuffersInLocalCounter());
+		super(

Review comment:
       Hm you are right, it doesn't solve it completely after having read the ticket. However, without a solution for FLINK-13203, there will also not be a real solution here.
   On the other hand, it's inherently wrong to treat local and remote channels differently during recovery (they even share the same implementation). So this commit is still fixing the issue in a best effort manner and certainly helps to improve build stability, which is an improvement of its own.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
##########
@@ -59,26 +59,27 @@
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;

Review comment:
       I didn't even know that double-tags are a thing. :p




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org