You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/13 19:27:19 UTC

[GitHub] [flink] pnowojski commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

pnowojski commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r424678536



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
##########
@@ -109,10 +109,17 @@ private static CheckpointBarrierHandler createCheckpointBarrierHandler(
 		switch (config.getCheckpointMode()) {
 			case EXACTLY_ONCE:
 				if (config.isUnalignedCheckpointsEnabled()) {
-					return new CheckpointBarrierUnaligner(
-						numberOfInputChannelsPerGate.toArray(),
-						channelStateWriter,
-						taskName,
+					return new AlternatingCheckpointBarrierHandler(
+						new CheckpointBarrierAligner(
+							taskName,
+							channelIndexToInputGate,
+							inputGateToChannelIndexOffset,
+							toNotifyOnCheckpoint),
+						new CheckpointBarrierUnaligner(
+							numberOfInputChannelsPerGate.toArray(),
+							channelStateWriter,
+							taskName,
+							toNotifyOnCheckpoint),

Review comment:
       There are two methods in `CheckpointedInputGate` which are implemented in not very object oriented design (I guess they should be re-implemented) that would stop working with this change (I hope some tests will fail because of that):
   ```
   	public void spillInflightBuffers(
   			long checkpointId,
   			int channelIndex,
   			ChannelStateWriter channelStateWriter) throws IOException {
   		if (((CheckpointBarrierUnaligner) barrierHandler).hasInflightData(checkpointId, channelIndex)) {
   			inputGate.getChannel(channelIndex).spillInflightBuffers(checkpointId, channelStateWriter);
   		}
   	}
   
   	public CompletableFuture<Void> getAllBarriersReceivedFuture(long checkpointId) {
   		return ((CheckpointBarrierUnaligner) barrierHandler).getAllBarriersReceivedFuture(checkpointId);
   	}
   ```
   I guess `hasInflightData()` and `getAllBarriersReceivedFuture()` should be pulled to `CheckpointBarrierHandler` interface (separate commit?)?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
##########
@@ -456,6 +463,23 @@ public void testUnblockReleasedChannel() throws Exception {
 		localChannel.resumeConsumption();
 	}
 
+	@Test
+	public void testNoNotifyOnSavepoint() throws IOException {
+		TestBufferReceivedListener listener = new TestBufferReceivedListener();
+		LocalInputChannel channel = new LocalInputChannel(
+			new SingleInputGateBuilder().build(),
+			0,
+			new ResultPartitionID(),
+			new ResultPartitionManager(),
+			new TaskEventDispatcher(),
+			new TestCounter(),
+			new TestCounter());
+		CheckpointBarrier barrier = new CheckpointBarrier(123L, 123L, new CheckpointOptions(SAVEPOINT, CheckpointStorageLocationReference.getDefault()));
+		channel.notifyPriorityEvent(new BufferConsumer(toBuffer(barrier).getMemorySegment(), FreeingBufferRecycler.INSTANCE, getDataType(barrier)));
+		channel.checkError();
+		assertTrue(listener.notifiedOnBarriers.isEmpty());

Review comment:
       frankly, it's not that unexpected that it's empty, since `listener` was never passed anywhere? 😈 
   
   (you forgot to register it to `SingleInputGate`?)

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
##########
@@ -456,6 +463,23 @@ public void testUnblockReleasedChannel() throws Exception {
 		localChannel.resumeConsumption();
 	}
 
+	@Test
+	public void testNoNotifyOnSavepoint() throws IOException {
+		TestBufferReceivedListener listener = new TestBufferReceivedListener();
+		LocalInputChannel channel = new LocalInputChannel(
+			new SingleInputGateBuilder().build(),
+			0,
+			new ResultPartitionID(),
+			new ResultPartitionManager(),
+			new TaskEventDispatcher(),
+			new TestCounter(),
+			new TestCounter());

Review comment:
       nit:
   ```
   InputChannelBuilder
     .newBuilder()
     .buildLocalChannel(new SingleInputGateBuilder().build())
   ```
   ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org