You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/27 14:27:16 UTC

[GitHub] pnowojski closed pull request #6829: [FLINK-10367][network] Introduce NotificationResult for BufferListener to solve recursive stack overflow

pnowojski closed pull request #6829: [FLINK-10367][network] Introduce NotificationResult for BufferListener to solve recursive stack overflow
URL: https://github.com/apache/flink/pull/6829
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
index 4cc32c0a661..e6b5416d986 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
@@ -24,6 +24,41 @@
  */
 public interface BufferListener {
 
+	/**
+	 * Status of the notification result from the buffer listener.
+	 */
+	enum NotificationResult {
+		BUFFER_NOT_USED(false, false),
+		BUFFER_USED_NO_NEED_MORE(true, false),
+		BUFFER_USED_NEED_MORE(true, true);
+
+		private final boolean isBufferUsed;
+		private final boolean needsMoreBuffers;
+
+		NotificationResult(boolean isBufferUsed, boolean needsMoreBuffers) {
+			this.isBufferUsed = isBufferUsed;
+			this.needsMoreBuffers = needsMoreBuffers;
+		}
+
+		/**
+		 * Whether the notified buffer is accepted to use by the listener.
+		 *
+		 * @return <tt>true</tt> if the notified buffer is accepted.
+		 */
+		boolean isBufferUsed() {
+			return isBufferUsed;
+		}
+
+		/**
+		 * Whether the listener still needs more buffers to be notified.
+		 *
+		 * @return <tt>true</tt> if the listener is still waiting for more buffers.
+		 */
+		boolean needsMoreBuffers() {
+			return needsMoreBuffers;
+		}
+	}
+
 	/**
 	 * Notification callback if a buffer is recycled and becomes available in buffer pool.
 	 *
@@ -37,9 +72,9 @@
 	 * stack!
 	 *
 	 * @param buffer buffer that becomes available in buffer pool.
-	 * @return true if the listener wants to be notified next time.
+	 * @return NotificationResult if the listener wants to be notified next time.
 	 */
-	boolean notifyBufferAvailable(Buffer buffer);
+	NotificationResult notifyBufferAvailable(Buffer buffer);
 
 	/**
 	 * Notification callback if the buffer provider is destroyed.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 1596fded6f3..273822746fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferListener.NotificationResult;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -258,30 +259,31 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte
 	@Override
 	public void recycle(MemorySegment segment) {
 		BufferListener listener;
-		synchronized (availableMemorySegments) {
-			if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
-				returnMemorySegment(segment);
-				return;
-			} else {
-				listener = registeredListeners.poll();
-
-				if (listener == null) {
-					availableMemorySegments.add(segment);
-					availableMemorySegments.notify();
+		NotificationResult notificationResult = NotificationResult.BUFFER_NOT_USED;
+		while (!notificationResult.isBufferUsed()) {
+			synchronized (availableMemorySegments) {
+				if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
+					returnMemorySegment(segment);
 					return;
+				} else {
+					listener = registeredListeners.poll();
+					if (listener == null) {
+						availableMemorySegments.add(segment);
+						availableMemorySegments.notify();
+						return;
+					}
 				}
 			}
+			notificationResult = fireBufferAvailableNotification(listener, segment);
 		}
+	}
 
+	private NotificationResult fireBufferAvailableNotification(BufferListener listener, MemorySegment segment) {
 		// We do not know which locks have been acquired before the recycle() or are needed in the
 		// notification and which other threads also access them.
 		// -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
-		// Note that in case of any exceptions notifyBufferAvailable() should recycle the buffer
-		// (either directly or later during error handling) and therefore eventually end up in this
-		// method again.
-		boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-
-		if (needMoreBuffers) {
+		NotificationResult notificationResult = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
+		if (notificationResult.needsMoreBuffers()) {
 			synchronized (availableMemorySegments) {
 				if (isDestroyed) {
 					// cleanup tasks how they would have been done if we only had one synchronized block
@@ -291,6 +293,7 @@ public void recycle(MemorySegment segment) {
 				}
 			}
 		}
+		return notificationResult;
 	}
 
 	/**
@@ -388,5 +391,4 @@ private void returnExcessMemorySegments() {
 			returnMemorySegment(segment);
 		}
 	}
-
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index c5ba7a4b7f1..34f65c0f22d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -417,32 +417,18 @@ public void notifyBufferDestroyed() {
 
 		// Called by the recycling thread (not network I/O thread)
 		@Override
-		public boolean notifyBufferAvailable(Buffer buffer) {
-			boolean success = false;
-
-			try {
-				if (availableBuffer.compareAndSet(null, buffer)) {
-					ctx.channel().eventLoop().execute(this);
+		public NotificationResult notifyBufferAvailable(Buffer buffer) {
+			if (availableBuffer.compareAndSet(null, buffer)) {
+				ctx.channel().eventLoop().execute(this);
 
-					success = true;
-				}
-				else {
-					throw new IllegalStateException("Received a buffer notification, " +
-							" but the previous one has not been handled yet.");
-				}
-			}
-			catch (Throwable t) {
-				ctx.channel().eventLoop().execute(new AsyncErrorNotificationTask(t));
+				return NotificationResult.BUFFER_USED_NO_NEED_MORE;
 			}
-			finally {
-				if (!success) {
-					if (buffer != null) {
-						buffer.recycleBuffer();
-					}
-				}
+			else {
+				ctx.channel().eventLoop().execute(new AsyncErrorNotificationTask(
+					new IllegalStateException("Received a buffer notification, " +
+						" but the previous one has not been handled yet.")));
+				return NotificationResult.BUFFER_NOT_USED;
 			}
-
-			return false;
 		}
 
 		/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 6738abd7f9c..141494996c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -342,18 +342,18 @@ boolean isWaitingForFloatingBuffers() {
 
 	/**
 	 * The Buffer pool notifies this channel of an available floating buffer. If the channel is released or
-	 * currently does not need extra buffers, the buffer should be recycled to the buffer pool. Otherwise,
+	 * currently does not need extra buffers, the buffer should be returned to the buffer pool. Otherwise,
 	 * the buffer will be added into the <tt>bufferQueue</tt> and the unannounced credit is increased
 	 * by one.
 	 *
 	 * @param buffer Buffer that becomes available in buffer pool.
-	 * @return True when this channel is waiting for more floating buffers, otherwise false.
+	 * @return NotificationResult indicates whether this channel accepts the buffer and is waiting for
+	 *  	more floating buffers.
 	 */
 	@Override
-	public boolean notifyBufferAvailable(Buffer buffer) {
-		boolean recycleBuffer = true;
+	public NotificationResult notifyBufferAvailable(Buffer buffer) {
+		NotificationResult notificationResult = NotificationResult.BUFFER_NOT_USED;
 		try {
-			boolean needMoreBuffers = false;
 			synchronized (bufferQueue) {
 				checkState(isWaitingForFloatingBuffers,
 					"This channel should be waiting for floating buffers.");
@@ -364,36 +364,29 @@ public boolean notifyBufferAvailable(Buffer buffer) {
 				// -> then isReleased is set correctly
 				// 2) releaseAllResources() did not yet release buffers from bufferQueue
 				// -> we may or may not have set isReleased yet but will always wait for the
-				//    lock on bufferQueue to release buffers
+				// lock on bufferQueue to release buffers
 				if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
 					isWaitingForFloatingBuffers = false;
-					recycleBuffer = false; // just in case
-					buffer.recycleBuffer();
-					return false;
+					return notificationResult;
 				}
 
-				recycleBuffer = false;
 				bufferQueue.addFloatingBuffer(buffer);
 
 				if (bufferQueue.getAvailableBufferSize() == numRequiredBuffers) {
 					isWaitingForFloatingBuffers = false;
+					notificationResult = NotificationResult.BUFFER_USED_NO_NEED_MORE;
 				} else {
-					needMoreBuffers = true;
+					notificationResult = NotificationResult.BUFFER_USED_NEED_MORE;
 				}
 			}
 
 			if (unannouncedCredit.getAndAdd(1) == 0) {
 				notifyCreditAvailable();
 			}
-
-			return needMoreBuffers;
 		} catch (Throwable t) {
-			if (recycleBuffer) {
-				buffer.recycleBuffer();
-			}
 			setError(t);
-			return false;
 		}
+		return notificationResult;
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 537d167908f..a0e10d7c687 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -414,10 +414,14 @@ private BufferListener createBufferListener(int notificationTimes) {
 			AtomicInteger times = new AtomicInteger(0);
 
 			@Override
-			public boolean notifyBufferAvailable(Buffer buffer) {
+			public NotificationResult notifyBufferAvailable(Buffer buffer) {
 				int newCount = times.incrementAndGet();
 				buffer.recycleBuffer();
-				return newCount < notificationTimes;
+				if (newCount < notificationTimes) {
+					return NotificationResult.BUFFER_USED_NEED_MORE;
+				} else {
+					return NotificationResult.BUFFER_USED_NO_NEED_MORE;
+				}
 			}
 
 			@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index ec80459f0ea..7747421fc7b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -23,6 +23,7 @@
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferListener.NotificationResult;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
@@ -150,7 +151,10 @@ private void testConcurrentReleaseAndSomething(
 						for (int j = 0; j < 128; j++) {
 							// this is the same buffer over and over again which will be
 							// recycled by the RemoteInputChannel
-							function.apply(inputChannel, buffer.retainBuffer(), j);
+							Object obj = function.apply(inputChannel, buffer.retainBuffer(), j);
+							if (obj instanceof NotificationResult && obj == NotificationResult.BUFFER_NOT_USED) {
+								buffer.recycleBuffer();
+							}
 						}
 
 						if (inputChannel.isReleased()) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services