You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/29 10:23:46 UTC

[flink] 01/03: [FLINK-13245][network] Fix the bug of file resource leak while canceling partition request

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 526236a4d537e78dbac4c575611ef52b602923d9
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Tue Jul 16 23:00:09 2019 +0800

    [FLINK-13245][network] Fix the bug of file resource leak while canceling partition request
    
    On producer side the netty handler receives the CancelPartitionRequest for releasing the SubpartitionView resource.
    In previous implementation we try to find the corresponding view via available queue in PartitionRequestQueue. But
    in reality the view is not always available to stay in this queue, then the view would never be released.
    
    Furthermore the release of ResultPartition/ResultSubpartitions is based on the reference counter in ReleaseOnConsumptionResultPartition,
    but while handling the CancelPartitionRequest in PartitionRequestQueue, the ReleaseOnConsumptionResultPartition is never
    notified of consumed subpartition. That means the reference counter would never decrease to 0 to trigger partition release,
    which would bring file resource leak in the case of BoundedBlockingSubpartition.
    
    In order to fix above two issues, the corresponding view is released via all reader queue instead, and then it would call
    ReleaseOnConsumptionResultPartition#onConsumedSubpartition meanwhile to solve this bug.
---
 .../io/network/netty/PartitionRequestQueue.java    | 33 ++++----
 .../network/netty/CancelPartitionRequestTest.java  |  4 +-
 .../network/netty/PartitionRequestQueueTest.java   | 92 ++++++++++++++++++++++
 .../io/network/partition/PartitionTestUtils.java   |  8 ++
 4 files changed, 118 insertions(+), 19 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index f82a42f..b492ea6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -138,9 +138,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 		}
 
 		for (NetworkSequenceViewReader reader : allReaders.values()) {
-			reader.notifySubpartitionConsumed();
-			reader.releaseAllResources();
-			markAsReleased(reader.getReceiverId());
+			releaseViewReader(reader);
 		}
 		allReaders.clear();
 	}
@@ -181,19 +179,14 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 				return;
 			}
 
-			// Cancel the request for the input channel
-			int size = availableReaders.size();
-			for (int i = 0; i < size; i++) {
-				NetworkSequenceViewReader reader = pollAvailableReader();
-				if (reader.getReceiverId().equals(toCancel)) {
-					reader.releaseAllResources();
-					markAsReleased(reader.getReceiverId());
-				} else {
-					registerAvailableReader(reader);
-				}
-			}
+			// remove reader from queue of available readers
+			availableReaders.removeIf(reader -> reader.getReceiverId().equals(toCancel));
 
-			allReaders.remove(toCancel);
+			// remove reader from queue of all readers and release its resource
+			final NetworkSequenceViewReader toRelease = allReaders.remove(toCancel);
+			if (toRelease != null) {
+				releaseViewReader(toRelease);
+			}
 		} else {
 			ctx.fireUserEventTriggered(msg);
 		}
@@ -308,14 +301,20 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 	private void releaseAllResources() throws IOException {
 		// note: this is only ever executed by one thread: the Netty IO thread!
 		for (NetworkSequenceViewReader reader : allReaders.values()) {
-			reader.releaseAllResources();
-			markAsReleased(reader.getReceiverId());
+			releaseViewReader(reader);
 		}
 
 		availableReaders.clear();
 		allReaders.clear();
 	}
 
+	private void releaseViewReader(NetworkSequenceViewReader reader) throws IOException {
+		reader.notifySubpartitionConsumed();
+		reader.setRegisteredAsAvailable(false);
+		reader.releaseAllResources();
+		markAsReleased(reader.getReceiverId());
+	}
+
 	/**
 	 * Marks a receiver as released.
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index eca8263..58c02df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -109,7 +109,7 @@ public class CancelPartitionRequestTest {
 			}
 
 			verify(view, times(1)).releaseAllResources();
-			verify(view, times(0)).notifySubpartitionConsumed();
+			verify(view, times(1)).notifySubpartitionConsumed();
 		}
 		finally {
 			shutdown(serverAndClient);
@@ -168,7 +168,7 @@ public class CancelPartitionRequestTest {
 			NettyTestUtil.awaitClose(ch);
 
 			verify(view, times(1)).releaseAllResources();
-			verify(view, times(0)).notifySubpartitionConsumed();
+			verify(view, times(1)).notifySubpartitionConsumed();
 		}
 		finally {
 			shutdown(serverAndClient);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index a9e8662..a0dbd7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -19,10 +19,18 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
 import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
@@ -32,7 +40,11 @@ import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nullable;
 
@@ -54,6 +66,24 @@ import static org.junit.Assert.assertTrue;
  */
 public class PartitionRequestQueueTest {
 
+	@ClassRule
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+	private static final int BUFFER_SIZE = 1024 * 1024;
+
+	private static FileChannelManager fileChannelManager;
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		fileChannelManager = new FileChannelManagerImpl(
+			new String[] {TEMPORARY_FOLDER.newFolder().getAbsolutePath()}, "testing");
+	}
+
+	@AfterClass
+	public static void shutdown() throws Exception {
+		fileChannelManager.close();
+	}
+
 	/**
 	 * In case of enqueuing an empty reader and a reader that actually has some buffers when channel is not writable,
 	 * on channelWritability change event should result in reading all of the messages.
@@ -348,6 +378,68 @@ public class PartitionRequestQueueTest {
 		assertNull(channel.readOutbound());
 	}
 
+	@Test
+	public void testCancelPartitionRequestForUnavailableView() throws Exception {
+		testCancelPartitionRequest(false);
+	}
+
+	@Test
+	public void testCancelPartitionRequestForAvailableView() throws Exception {
+		testCancelPartitionRequest(true);
+	}
+
+	private void testCancelPartitionRequest(boolean isAvailableView) throws Exception {
+		// setup
+		final ResultPartitionManager partitionManager = new ResultPartitionManager();
+		final ResultPartition partition = createFinishedPartitionWithFilledData(partitionManager);
+		final InputChannelID receiverId = new InputChannelID();
+		final PartitionRequestQueue queue = new PartitionRequestQueue();
+		final CreditBasedSequenceNumberingViewReader reader = new CreditBasedSequenceNumberingViewReader(receiverId, 0, queue);
+		final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+		reader.requestSubpartitionView(partitionManager, partition.getPartitionId(), 0);
+		// add this reader into allReaders queue
+		queue.notifyReaderCreated(reader);
+
+		// block the channel so that we see an intermediate state in the test
+		blockChannel(channel);
+
+		// add credit to make this reader available for adding into availableReaders queue
+		if (isAvailableView) {
+			queue.addCredit(receiverId, 1);
+			assertTrue(queue.getAvailableReaders().contains(reader));
+		}
+
+		// cancel this subpartition view
+		queue.cancel(receiverId);
+		channel.runPendingTasks();
+
+		assertFalse(queue.getAvailableReaders().contains(reader));
+		// the partition and its reader view should all be released
+		assertTrue(reader.isReleased());
+		assertTrue(partition.isReleased());
+		for (ResultSubpartition subpartition : partition.getAllPartitions()) {
+			assertTrue(subpartition.isReleased());
+		}
+
+		// cleanup
+		channel.close();
+	}
+
+	private static ResultPartition createFinishedPartitionWithFilledData(ResultPartitionManager partitionManager) throws Exception {
+		final ResultPartition partition = new ResultPartitionBuilder()
+			.setResultPartitionType(ResultPartitionType.BLOCKING)
+			.setFileChannelManager(fileChannelManager)
+			.setResultPartitionManager(partitionManager)
+			.isReleasedOnConsumption(true)
+			.build();
+
+		partitionManager.registerResultPartition(partition);
+		PartitionTestUtils.writeBuffers(partition, 1, BUFFER_SIZE);
+
+		return partition;
+	}
+
 	/**
 	 * Blocks the given channel by adding a buffer that is bigger than the high watermark.
 	 *
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 5e39a43..5f80421 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.util.EnumSet;
 import java.util.Optional;
 
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -158,4 +159,11 @@ public class PartitionTestUtils {
 			true,
 			releaseType);
 	}
+
+	public static void writeBuffers(ResultPartition partition, int numberOfBuffers, int bufferSize) throws IOException {
+		for (int i = 0; i < numberOfBuffers; i++) {
+			partition.addBufferConsumer(createFilledBufferConsumer(bufferSize, bufferSize), 0);
+		}
+		partition.finish();
+	}
 }