You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/05/10 23:34:07 UTC

[flink] 08/14: [hotfix] [network] Release unpooled buffer for events.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 826daab2c3cc52d100c38480b67393e067472b8f
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Apr 15 21:02:51 2019 +0200

    [hotfix] [network] Release unpooled buffer for events.
    
    So far, these buffers never needed to be released, because they do not come from a buffer pool.
    They were simply garbage collected.
    
    When changing the blocking partitions to use memory mapped files, these buffer were refering
    for a short time to an unmapped memory region (after the partition is released). Because the buffers
    were not accessed any more by any code, it did not matter when regularly running Flink.
    
    But, it did segfault the JVM when attaching a debugger and exploring just that part of the code.
    This happens because the debugger calls toString() on the buffer object as part of its rendering of the current
    stack frame. The toString() method access the buffer contents, which is an unmapped region of memory,
    and boom!
---
 .../partition/consumer/SingleInputGate.java        |  8 +++++-
 .../partition/consumer/SingleInputGateTest.java    |  4 +++
 .../partition/consumer/TestInputChannel.java       | 32 +++++++++++++++++++++-
 3 files changed, 42 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 19912b2..a584a21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -593,7 +593,13 @@ public class SingleInputGate extends InputGate {
 			return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable);
 		}
 		else {
-			final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
+			final AbstractEvent event;
+			try {
+				event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
+			}
+			finally {
+				buffer.recycleBuffer();
+			}
 
 			if (event.getClass() == EndOfPartitionEvent.class) {
 				channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index a6f824d..5dda8db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -114,6 +114,10 @@ public class SingleInputGateTest extends InputGateTestBase {
 
 		// Return null when the input gate has received all end-of-partition events
 		assertTrue(inputGate.isFinished());
+
+		for (TestInputChannel ic : inputChannels) {
+			ic.assertReturnedEventsAreRecycled();
+		}
 	}
 
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index ac3f0ff..96a5db2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -27,12 +27,15 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -43,6 +46,8 @@ public class TestInputChannel extends InputChannel {
 
 	private final Queue<BufferAndAvailabilityProvider> buffers = new ConcurrentLinkedQueue<>();
 
+	private final Collection<Buffer> allReturnedBuffers = new ArrayList<>();
+
 	private BufferAndAvailabilityProvider lastProvider = null;
 
 	private boolean isReleased = false;
@@ -125,7 +130,9 @@ public class TestInputChannel extends InputChannel {
 
 		if (provider != null) {
 			lastProvider = provider;
-			return provider.getBufferAvailability();
+			Optional<BufferAndAvailability> baa = provider.getBufferAvailability();
+			baa.ifPresent((v) -> allReturnedBuffers.add(v.buffer()));
+			return baa;
 		} else if (lastProvider != null) {
 			return lastProvider.getBufferAvailability();
 		} else {
@@ -162,6 +169,29 @@ public class TestInputChannel extends InputChannel {
 
 	}
 
+	public void assertReturnedDataBuffersAreRecycled() {
+		assertReturnedBuffersAreRecycled(true, false);
+	}
+
+	public void assertReturnedEventsAreRecycled() {
+		assertReturnedBuffersAreRecycled(false, true);
+	}
+
+	public void assertAllReturnedBuffersAreRecycled() {
+		assertReturnedBuffersAreRecycled(true, true);
+	}
+
+	private void assertReturnedBuffersAreRecycled(boolean assertBuffers, boolean assertEvents) {
+		for (Buffer b : allReturnedBuffers) {
+			if (b.isBuffer() && assertBuffers && !b.isRecycled()) {
+				fail("Data Buffer " + b + " not recycled");
+			}
+			if (!b.isBuffer() && assertEvents && !b.isRecycled()) {
+				fail("Event Buffer " + b + " not recycled");
+			}
+		}
+	}
+
 	interface BufferAndAvailabilityProvider {
 		Optional<BufferAndAvailability> getBufferAvailability() throws IOException, InterruptedException;
 	}