You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/30 21:03:40 UTC

[GitHub] NicoK closed pull request #6553: [FLINK-10141][network] optimisations reducing lock contention

NicoK closed pull request #6553: [FLINK-10141][network] optimisations reducing lock contention
URL: https://github.com/apache/flink/pull/6553
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 28f30209892..c4954c0fb6d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -191,16 +191,16 @@ void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException, Int
 		checkError();
 
 		final Buffer next;
-		final int remaining;
+		final boolean moreAvailable;
 
 		synchronized (receivedBuffers) {
 			next = receivedBuffers.poll();
-			remaining = receivedBuffers.size();
+			moreAvailable = !receivedBuffers.isEmpty();
 		}
 
 		numBytesIn.inc(next.getSizeUnsafe());
 		numBuffersIn.inc();
-		return Optional.of(new BufferAndAvailability(next, remaining > 0, getSenderBacklog()));
+		return Optional.of(new BufferAndAvailability(next, moreAvailable, getSenderBacklog()));
 	}
 
 	// ------------------------------------------------------------------------
@@ -306,8 +306,8 @@ public void recycle(MemorySegment segment) {
 		int numAddedBuffers;
 
 		synchronized (bufferQueue) {
-			// Important: check the isReleased state inside synchronized block, so there is no
-			// race condition when recycle and releaseAllResources running in parallel.
+			// Similar to notifyBufferAvailable(), make sure that we never add a buffer
+			// after releaseAllResources() released all buffers (see below for details).
 			if (isReleased.get()) {
 				try {
 					inputGate.returnExclusiveSegments(Collections.singletonList(segment));
@@ -368,8 +368,13 @@ public boolean notifyBufferAvailable(Buffer buffer) {
 				checkState(isWaitingForFloatingBuffers,
 					"This channel should be waiting for floating buffers.");
 
-				// Important: double check the isReleased state inside synchronized block, so there is no
-				// race condition when notifyBufferAvailable and releaseAllResources running in parallel.
+				// Important: make sure that we never add a buffer after releaseAllResources()
+				// released all buffers. Following scenarios exist:
+				// 1) releaseAllResources() already released buffers inside bufferQueue
+				// -> then isReleased is set correctly
+				// 2) releaseAllResources() did not yet release buffers from bufferQueue
+				// -> we may or may not have set isReleased yet but will always wait for the
+				//    lock on bufferQueue to release buffers
 				if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
 					isWaitingForFloatingBuffers = false;
 					recycleBuffer = false; // just in case
@@ -385,10 +390,10 @@ public boolean notifyBufferAvailable(Buffer buffer) {
 				} else {
 					needMoreBuffers = true;
 				}
+			}
 
-				if (unannouncedCredit.getAndAdd(1) == 0) {
-					notifyCreditAvailable();
-				}
+			if (unannouncedCredit.getAndAdd(1) == 0) {
+				notifyCreditAvailable();
 			}
 
 			return needMoreBuffers;
@@ -484,8 +489,8 @@ void onSenderBacklog(int backlog) throws IOException {
 		int numRequestedBuffers = 0;
 
 		synchronized (bufferQueue) {
-			// Important: check the isReleased state inside synchronized block, so there is no
-			// race condition when onSenderBacklog and releaseAllResources running in parallel.
+			// Similar to notifyBufferAvailable(), make sure that we never add a buffer
+			// after releaseAllResources() released all buffers (see above for details).
 			if (isReleased.get()) {
 				return;
 			}
@@ -510,33 +515,40 @@ void onSenderBacklog(int backlog) throws IOException {
 	}
 
 	public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
-		boolean success = false;
+		boolean recycleBuffer = true;
 
 		try {
+
+			final boolean wasEmpty;
 			synchronized (receivedBuffers) {
-				if (!isReleased.get()) {
-					if (expectedSequenceNumber == sequenceNumber) {
-						int available = receivedBuffers.size();
+				// Similar to notifyBufferAvailable(), make sure that we never add a buffer
+				// after releaseAllResources() released all buffers from receivedBuffers
+				// (see above for details).
+				if (isReleased.get()) {
+					return;
+				}
 
-						receivedBuffers.add(buffer);
-						expectedSequenceNumber++;
+				if (expectedSequenceNumber != sequenceNumber) {
+					onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
+					return;
+				}
 
-						if (available == 0) {
-							notifyChannelNonEmpty();
-						}
+				wasEmpty = receivedBuffers.isEmpty();
+				receivedBuffers.add(buffer);
+				recycleBuffer = false;
+			}
 
-						success = true;
-					} else {
-						onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
-					}
-				}
+			++expectedSequenceNumber;
+
+			if (wasEmpty) {
+				notifyChannelNonEmpty();
 			}
 
-			if (success && backlog >= 0) {
+			if (backlog >= 0) {
 				onSenderBacklog(backlog);
 			}
 		} finally {
-			if (!success) {
+			if (recycleBuffer) {
 				buffer.recycleBuffer();
 			}
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 2e7d076f3f8..f51dc7417ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -545,7 +545,7 @@ public void requestPartitions() throws IOException, InterruptedException {
 
 				currentChannel = inputChannelsWithData.remove();
 				enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
-				moreAvailable = inputChannelsWithData.size() > 0;
+				moreAvailable = !inputChannelsWithData.isEmpty();
 			}
 
 			result = currentChannel.getNextBuffer();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services