You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/11 08:15:14 UTC

[GitHub] [flink] rkhachatryan commented on a change in pull request #12072: [FLINK-17580][checkpointing] Fix NPE in unaligned checkpoint after EndOfPartition events

rkhachatryan commented on a change in pull request #12072:
URL: https://github.com/apache/flink/pull/12072#discussion_r422854752



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
##########
@@ -210,12 +210,15 @@ public int getInputIndex() {
 			final InputChannel channel = checkpointedInputGate.getChannel(channelIndex);
 
 			// Assumption for retrieving buffers = one concurrent checkpoint
-			recordDeserializers[channelIndex].getUnconsumedBuffer().ifPresent(buffer ->
-				channelStateWriter.addInputData(
-					checkpointId,
-					channel.getChannelInfo(),
-					ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
-					buffer));
+			RecordDeserializer<?> deserializer = recordDeserializers[channelIndex];
+			if (deserializer != null) {

Review comment:
       I guess the reason of not using `Optional.flatMap` was `IOException` thrown from `RecordDeserializer.getUnconsumedBuffer`.
   
   The root cause of that `IOException` is `DataOutputSerializer` wrapping with it OOMs and RuntimeExceptions.
   
   Since the resulting `IOException` is never recovered or analyzed and originally it's RuntimeException I'd consider wrap `IOException` and use `flatMap`.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
##########
@@ -106,6 +111,45 @@ public void testNoDataProcessedAfterCheckpointBarrier() throws Exception {
 		assertEquals(0, output.getNumberOfEmittedRecords());
 	}
 
+	@Test
+	public void testSnapshotAfterEndOfPartition() throws Exception {
+		VerifyRecordsDataOutput<Long> output = new VerifyRecordsDataOutput<>();
+		LongSerializer inSerializer = LongSerializer.INSTANCE;
+		int numInputChannels = 1;
+		StreamTestSingleInputGate<Long> inputGate = new StreamTestSingleInputGate<>(numInputChannels, 0, inSerializer, 1024);
+		TestRecordDeserializer[] deserializers = IntStream.range(0, numInputChannels)
+			.mapToObj(index -> new TestRecordDeserializer(ioManager.getSpillingDirectoriesPaths()))
+			.toArray(TestRecordDeserializer[]::new);
+		StreamTaskNetworkInput<Long> input = new StreamTaskNetworkInput<>(
+			new CheckpointedInputGate(
+				inputGate.getInputGate(),
+				new CheckpointBarrierUnaligner(
+					new int[] { 1 },
+					ChannelStateWriter.NO_OP,
+					"test",
+					new DummyCheckpointInvokable())),
+			inSerializer,
+			new StatusWatermarkValve(1, output),
+			0,
+			deserializers);
+
+		inputGate.sendEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpointWithDefaultLocation()), 0);
+		inputGate.sendElement(new StreamRecord<>(42L), 0);
+
+		assertHasNextElement(input, output);
+		assertHasNextElement(input, output);
+		assertEquals(1, output.getNumberOfEmittedRecords());
+
+		// send EndOfPartitionEvent and ensure that deserializer has been released
+		inputGate.sendEvent(EndOfPartitionEvent.INSTANCE, 0);
+		input.emitNext(output);
+		assertNull(deserializers[0]);

Review comment:
       Extracting `0` to channelId would make the test a bit more readable.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
##########
@@ -106,6 +111,45 @@ public void testNoDataProcessedAfterCheckpointBarrier() throws Exception {
 		assertEquals(0, output.getNumberOfEmittedRecords());
 	}
 
+	@Test
+	public void testSnapshotAfterEndOfPartition() throws Exception {
+		VerifyRecordsDataOutput<Long> output = new VerifyRecordsDataOutput<>();
+		LongSerializer inSerializer = LongSerializer.INSTANCE;
+		int numInputChannels = 1;
+		StreamTestSingleInputGate<Long> inputGate = new StreamTestSingleInputGate<>(numInputChannels, 0, inSerializer, 1024);
+		TestRecordDeserializer[] deserializers = IntStream.range(0, numInputChannels)
+			.mapToObj(index -> new TestRecordDeserializer(ioManager.getSpillingDirectoriesPaths()))
+			.toArray(TestRecordDeserializer[]::new);
+		StreamTaskNetworkInput<Long> input = new StreamTaskNetworkInput<>(
+			new CheckpointedInputGate(
+				inputGate.getInputGate(),
+				new CheckpointBarrierUnaligner(
+					new int[] { 1 },

Review comment:
       `1` -> `numInputChannels` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org