You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/07/19 19:28:32 UTC

flink git commit: [FLINK-9755][network] forward exceptions in RemoteInputChannel#notifyBufferAvailable() to the responsible thread

Repository: flink
Updated Branches:
  refs/heads/release-1.5 c1c4bcb34 -> 8193d5dc6


[FLINK-9755][network] forward exceptions in RemoteInputChannel#notifyBufferAvailable() to the responsible thread

This mainly involves state checks but previously these have only been swallowed
without re-registration or any other logging/handling. This may have lead to
some thread stalling while waiting for the notification that never came.

This closes #6272.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8193d5dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8193d5dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8193d5dc

Branch: refs/heads/release-1.5
Commit: 8193d5dc68289760ad68cf0b6b237fd86b0fd906
Parents: c1c4bcb
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jul 5 00:48:33 2018 +0200
Committer: Nico Kruber <ni...@data-artisans.com>
Committed: Thu Jul 19 21:25:11 2018 +0200

----------------------------------------------------------------------
 .../io/network/buffer/BufferListener.java       |  9 ++
 .../io/network/buffer/LocalBufferPool.java      | 36 ++------
 .../partition/consumer/RemoteInputChannel.java  | 52 ++++++-----
 .../consumer/RemoteInputChannelTest.java        | 90 ++++++++++++++++++--
 4 files changed, 131 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8193d5dc/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
index 05b4156..4cc32c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
@@ -27,6 +27,15 @@ public interface BufferListener {
 	/**
 	 * Notification callback if a buffer is recycled and becomes available in buffer pool.
 	 *
+	 * <p>Note: responsibility on recycling the given buffer is transferred to this implementation,
+	 * including any errors that lead to exceptions being thrown!
+	 *
+	 * <p><strong>BEWARE:</strong> since this may be called from outside the thread that relies on
+	 * the listener's logic, any exception that occurs with this handler should be forwarded to the
+	 * responsible thread for handling and otherwise ignored in the processing of this method. The
+	 * buffer pool forwards any {@link Throwable} from here upwards to a potentially unrelated call
+	 * stack!
+	 *
 	 * @param buffer buffer that becomes available in buffer pool.
 	 * @return true if the listener wants to be notified next time.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/8193d5dc/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 77eb601..41c3494 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -262,8 +262,7 @@ class LocalBufferPool implements BufferPool {
 			if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
 				returnMemorySegment(segment);
 				return;
-			}
-			else {
+			} else {
 				listener = registeredListeners.poll();
 
 				if (listener == null) {
@@ -277,37 +276,18 @@ class LocalBufferPool implements BufferPool {
 		// We do not know which locks have been acquired before the recycle() or are needed in the
 		// notification and which other threads also access them.
 		// -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
-		boolean success = false;
-		boolean needMoreBuffers = false;
-		try {
-			needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-			success = true;
-		} catch (Throwable ignored) {
-			// handled below, under the lock
-		}
+		// Note that in case of any exceptions notifyBufferAvailable() should recycle the buffer
+		// (either directly or later during error handling) and therefore eventually end up in this
+		// method again.
+		boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
 
-		if (!success || needMoreBuffers) {
+		if (needMoreBuffers) {
 			synchronized (availableMemorySegments) {
 				if (isDestroyed) {
 					// cleanup tasks how they would have been done if we only had one synchronized block
-					if (needMoreBuffers) {
-						listener.notifyBufferDestroyed();
-					}
-					if (!success) {
-						returnMemorySegment(segment);
-					}
+					listener.notifyBufferDestroyed();
 				} else {
-					if (needMoreBuffers) {
-						registeredListeners.add(listener);
-					}
-					if (!success) {
-						if (numberOfRequestedMemorySegments > currentPoolSize) {
-							returnMemorySegment(segment);
-						} else {
-							availableMemorySegments.add(segment);
-							availableMemorySegments.notify();
-						}
-					}
+					registeredListeners.add(listener);
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8193d5dc/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 0f70d44..b94f48a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -360,32 +360,44 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 			return false;
 		}
 
-		boolean needMoreBuffers = false;
-		synchronized (bufferQueue) {
-			checkState(isWaitingForFloatingBuffers, "This channel should be waiting for floating buffers.");
+		boolean recycleBuffer = true;
+		try {
+			boolean needMoreBuffers = false;
+			synchronized (bufferQueue) {
+				checkState(isWaitingForFloatingBuffers,
+					"This channel should be waiting for floating buffers.");
+
+				// Important: double check the isReleased state inside synchronized block, so there is no
+				// race condition when notifyBufferAvailable and releaseAllResources running in parallel.
+				if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
+					isWaitingForFloatingBuffers = false;
+					recycleBuffer = false; // just in case
+					buffer.recycleBuffer();
+					return false;
+				}
 
-			// Important: double check the isReleased state inside synchronized block, so there is no
-			// race condition when notifyBufferAvailable and releaseAllResources running in parallel.
-			if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
-				isWaitingForFloatingBuffers = false;
-				buffer.recycleBuffer();
-				return false;
-			}
+				recycleBuffer = false;
+				bufferQueue.addFloatingBuffer(buffer);
 
-			bufferQueue.addFloatingBuffer(buffer);
+				if (bufferQueue.getAvailableBufferSize() == numRequiredBuffers) {
+					isWaitingForFloatingBuffers = false;
+				} else {
+					needMoreBuffers = true;
+				}
 
-			if (bufferQueue.getAvailableBufferSize() == numRequiredBuffers) {
-				isWaitingForFloatingBuffers = false;
-			} else {
-				needMoreBuffers =  true;
+				if (unannouncedCredit.getAndAdd(1) == 0) {
+					notifyCreditAvailable();
+				}
 			}
-		}
 
-		if (unannouncedCredit.getAndAdd(1) == 0) {
-			notifyCreditAvailable();
+			return needMoreBuffers;
+		} catch (Throwable t) {
+			if (recycleBuffer) {
+				buffer.recycleBuffer();
+			}
+			setError(t);
+			return false;
 		}
-
-		return needMoreBuffers;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8193d5dc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 6c6fd96..6305492 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -52,9 +52,13 @@ import java.util.concurrent.Future;
 
 import scala.Tuple2;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -452,7 +456,7 @@ public class RemoteInputChannelTest {
 		} catch (Throwable t) {
 			thrown = t;
 		} finally {
-			cleanup(networkBufferPool, null, thrown, inputChannel);
+			cleanup(networkBufferPool, null, null, thrown, inputChannel);
 		}
 	}
 
@@ -528,7 +532,7 @@ public class RemoteInputChannelTest {
 		} catch (Throwable t) {
 			thrown = t;
 		} finally {
-			cleanup(networkBufferPool, null, thrown, inputChannel);
+			cleanup(networkBufferPool, null, null, thrown, inputChannel);
 		}
 	}
 
@@ -618,7 +622,7 @@ public class RemoteInputChannelTest {
 		} catch (Throwable t) {
 			thrown = t;
 		} finally {
-			cleanup(networkBufferPool, null, thrown, inputChannel);
+			cleanup(networkBufferPool, null, null, thrown, inputChannel);
 		}
 	}
 
@@ -687,7 +691,72 @@ public class RemoteInputChannelTest {
 		} catch (Throwable t) {
 			thrown = t;
 		} finally {
-			cleanup(networkBufferPool, null, thrown, channel1, channel2, channel3);
+			cleanup(networkBufferPool, null, null, thrown, channel1, channel2, channel3);
+		}
+	}
+
+	/**
+	 * Tests that failures are propagated correctly if
+	 * {@link RemoteInputChannel#notifyBufferAvailable(Buffer)} throws an exception. Also tests that
+	 * a second listener will be notified in this case.
+	 */
+	@Test
+	public void testFailureInNotifyBufferAvailable() throws Exception {
+		// Setup
+		final int numExclusiveBuffers = 0;
+		final int numFloatingBuffers = 1;
+		final int numTotalBuffers = numExclusiveBuffers + numFloatingBuffers;
+		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(
+			numTotalBuffers, 32);
+
+		final SingleInputGate inputGate = createSingleInputGate();
+		final RemoteInputChannel successfulRemoteIC = createRemoteInputChannel(inputGate);
+		inputGate.setInputChannel(successfulRemoteIC.partitionId.getPartitionId(), successfulRemoteIC);
+
+		successfulRemoteIC.requestSubpartition(0);
+
+		// late creation -> no exclusive buffers, also no requested subpartition in successfulRemoteIC
+		// (to trigger a failure in RemoteInputChannel#notifyBufferAvailable())
+		final RemoteInputChannel failingRemoteIC = createRemoteInputChannel(inputGate);
+		inputGate.setInputChannel(failingRemoteIC.partitionId.getPartitionId(), failingRemoteIC);
+
+		Buffer buffer = null;
+		Throwable thrown = null;
+		try {
+			final BufferPool bufferPool =
+				networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
+			inputGate.setBufferPool(bufferPool);
+
+			buffer = bufferPool.requestBufferBlocking();
+
+			// trigger subscription to buffer pool
+			failingRemoteIC.onSenderBacklog(1);
+			successfulRemoteIC.onSenderBacklog(numExclusiveBuffers + 1);
+			// recycling will call RemoteInputChannel#notifyBufferAvailable() which will fail and
+			// this exception will be swallowed and set as an error in failingRemoteIC
+			buffer.recycleBuffer();
+			buffer = null;
+			try {
+				failingRemoteIC.checkError();
+				fail("The input channel should have an error based on the failure in RemoteInputChannel#notifyBufferAvailable()");
+			} catch (IOException e) {
+				assertThat(e, hasProperty("cause", isA(IllegalStateException.class)));
+			}
+			// currently, the buffer is still enqueued in the bufferQueue of failingRemoteIC
+			assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
+			buffer = successfulRemoteIC.requestBuffer();
+			assertNull("buffer should still remain in failingRemoteIC", buffer);
+
+			// releasing resources in failingRemoteIC should free the buffer again and immediately
+			// recycle it into successfulRemoteIC
+			failingRemoteIC.releaseAllResources();
+			assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
+			buffer = successfulRemoteIC.requestBuffer();
+			assertNotNull("no buffer given to successfulRemoteIC", buffer);
+		} catch (Throwable t) {
+			thrown = t;
+		} finally {
+			cleanup(networkBufferPool, null, buffer, thrown, failingRemoteIC, successfulRemoteIC);
 		}
 	}
 
@@ -749,7 +818,7 @@ public class RemoteInputChannelTest {
 		} catch (Throwable t) {
 			thrown = t;
 		} finally {
-			cleanup(networkBufferPool, executor, thrown, inputChannel);
+			cleanup(networkBufferPool, executor, null, thrown, inputChannel);
 		}
 	}
 
@@ -802,7 +871,7 @@ public class RemoteInputChannelTest {
 		} catch (Throwable t) {
 			thrown = t;
 		} finally {
-			cleanup(networkBufferPool, executor, thrown, inputChannel);
+			cleanup(networkBufferPool, executor, null, thrown, inputChannel);
 		}
 	}
 
@@ -854,7 +923,7 @@ public class RemoteInputChannelTest {
 		} catch (Throwable t) {
 			thrown = t;
 		} finally {
-			cleanup(networkBufferPool, executor, thrown, inputChannel);
+			cleanup(networkBufferPool, executor, null, thrown, inputChannel);
 		}
 	}
 
@@ -936,7 +1005,7 @@ public class RemoteInputChannelTest {
 		} catch (Throwable t) {
 			thrown = t;
 		} finally {
-			cleanup(networkBufferPool, executor, thrown, inputChannel);
+			cleanup(networkBufferPool, executor, null, thrown, inputChannel);
 		}
 	}
 
@@ -1064,6 +1133,7 @@ public class RemoteInputChannelTest {
 	private void cleanup(
 			NetworkBufferPool networkBufferPool,
 			@Nullable ExecutorService executor,
+			@Nullable Buffer buffer,
 			@Nullable Throwable throwable,
 			InputChannel... inputChannels) throws Exception {
 		for (InputChannel inputChannel : inputChannels) {
@@ -1074,6 +1144,10 @@ public class RemoteInputChannelTest {
 			}
 		}
 
+		if (buffer != null && !buffer.isRecycled()) {
+			buffer.recycleBuffer();
+		}
+
 		try {
 			networkBufferPool.destroyAllBufferPools();
 		} catch (Throwable tInner) {