You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/07/12 13:40:01 UTC

[flink] 01/03: [hotfix] Remove dangerous waiting methods from mailbox

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit acfbc83cf17f94a27c1aa71c331ee95382f55596
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Wed Jun 26 12:21:33 2019 +0200

    [hotfix] Remove dangerous waiting methods from mailbox
---
 .../flink/streaming/runtime/tasks/StreamTask.java  |  2 +-
 .../runtime/tasks/mailbox/MailboxImpl.java         | 26 -------------------
 .../runtime/tasks/mailbox/MailboxReceiver.java     |  6 -----
 .../runtime/tasks/mailbox/MailboxSender.java       |  7 ------
 .../runtime/tasks/mailbox/MailboxImplTest.java     | 29 ++++++----------------
 5 files changed, 8 insertions(+), 62 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2f4dd6a..8ba2a44 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -1339,7 +1339,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	 */
 	public final class ActionContext {
 
-		private final Runnable actionUnavailableLetter = ThrowingRunnable.unchecked(mailbox::waitUntilHasMail);
+		private final Runnable actionUnavailableLetter = ThrowingRunnable.unchecked(() -> mailbox.takeMail().run());
 
 		/**
 		 * This method must be called to end the stream task when all actions for the tasks have been performed.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
index 411efd1..e9bd346 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
@@ -126,19 +126,6 @@ public class MailboxImpl implements Mailbox {
 		}
 	}
 
-	@Override
-	public void waitUntilHasMail() throws InterruptedException {
-		final ReentrantLock lock = this.lock;
-		lock.lockInterruptibly();
-		try {
-			while (isEmpty()) {
-				notEmpty.await();
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-
 	//------------------------------------------------------------------------------------------------------------------
 
 	@Override
@@ -171,19 +158,6 @@ public class MailboxImpl implements Mailbox {
 		}
 	}
 
-	@Override
-	public void waitUntilHasCapacity() throws InterruptedException {
-		final ReentrantLock lock = this.lock;
-		lock.lockInterruptibly();
-		try {
-			while (isFull()) {
-				notFull.await();
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-
 	//------------------------------------------------------------------------------------------------------------------
 
 	private void putInternal(Runnable letter) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
index 189687e..2d2f112 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
@@ -50,10 +50,4 @@ public interface MailboxReceiver {
 	 */
 	@Nonnull
 	Runnable takeMail() throws InterruptedException;
-
-	/**
-	 * This method blocks if the mailbox is empty until mail becomes available.
-	 * @throws InterruptedException on interruption.
-	 */
-	void waitUntilHasMail() throws InterruptedException;
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
index 1829125..36d10a1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
@@ -42,11 +42,4 @@ public interface MailboxSender {
 	 * @throws InterruptedException on interruption.
 	 */
 	void putMail(@Nonnull Runnable letter) throws InterruptedException;
-
-	/**
-	 * This method blocks until the mailbox has again capacity to enqueue new letters.
-	 *
-	 * @throws InterruptedException on interruption.
-	 */
-	void waitUntilHasCapacity() throws InterruptedException;
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
index fc7f19c..9c4edbf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
@@ -27,6 +27,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.LinkedList;
+import java.util.Optional;
 import java.util.Queue;
 
 /**
@@ -81,24 +82,7 @@ public class MailboxImplTest {
 		while (!testObjects.isEmpty()) {
 			Assert.assertEquals(testObjects.remove(), mailbox.tryTakeMail().get());
 			Assert.assertEquals(!testObjects.isEmpty(), mailbox.hasMail());
-			mailbox.waitUntilHasCapacity(); // should not block here because the mailbox is not full
 		}
-
-		Thread waitingReader = new Thread(ThrowingRunnable.unchecked(() -> mailbox.waitUntilHasMail()));
-		waitingReader.start();
-		Thread.sleep(1);
-		Assert.assertTrue(waitingReader.isAlive());
-		mailbox.tryPutMail(() -> {});
-		waitingReader.join(); // should complete here
-
-		while (mailbox.tryPutMail(() -> {})) {}
-
-		Thread waitingWriter = new Thread(ThrowingRunnable.unchecked(() -> mailbox.waitUntilHasCapacity()));
-		waitingWriter.start();
-		Thread.sleep(1);
-		Assert.assertTrue(waitingWriter.isAlive());
-		mailbox.takeMail();
-		waitingWriter.join();
 	}
 
 	/**
@@ -115,13 +99,14 @@ public class MailboxImplTest {
 	@Test
 	public void testConcurrentPutTakeNonBlockingAndWait() throws Exception {
 		testPutTake((mailbox -> {
-				mailbox.waitUntilHasMail();
-				return mailbox.tryTakeMail().get();
+				Optional<Runnable> optionalLetter = mailbox.tryTakeMail();
+				while (!optionalLetter.isPresent()) {
+					optionalLetter = mailbox.tryTakeMail();
+				}
+				return optionalLetter.get();
 			}),
 			((mailbox, runnable) -> {
-				while (!mailbox.tryPutMail(runnable)) {
-					mailbox.waitUntilHasCapacity();
-				}
+				while (!mailbox.tryPutMail(runnable)) {}
 			}));
 	}