You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/07/12 13:40:01 UTC
[flink] 01/03: [hotfix] Remove dangerous waiting methods from
mailbox
This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit acfbc83cf17f94a27c1aa71c331ee95382f55596
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Wed Jun 26 12:21:33 2019 +0200
[hotfix] Remove dangerous waiting methods from mailbox
---
.../flink/streaming/runtime/tasks/StreamTask.java | 2 +-
.../runtime/tasks/mailbox/MailboxImpl.java | 26 -------------------
.../runtime/tasks/mailbox/MailboxReceiver.java | 6 -----
.../runtime/tasks/mailbox/MailboxSender.java | 7 ------
.../runtime/tasks/mailbox/MailboxImplTest.java | 29 ++++++----------------
5 files changed, 8 insertions(+), 62 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2f4dd6a..8ba2a44 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -1339,7 +1339,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
*/
public final class ActionContext {
- private final Runnable actionUnavailableLetter = ThrowingRunnable.unchecked(mailbox::waitUntilHasMail);
+ private final Runnable actionUnavailableLetter = ThrowingRunnable.unchecked(() -> mailbox.takeMail().run());
/**
* This method must be called to end the stream task when all actions for the tasks have been performed.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
index 411efd1..e9bd346 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
@@ -126,19 +126,6 @@ public class MailboxImpl implements Mailbox {
}
}
- @Override
- public void waitUntilHasMail() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (isEmpty()) {
- notEmpty.await();
- }
- } finally {
- lock.unlock();
- }
- }
-
//------------------------------------------------------------------------------------------------------------------
@Override
@@ -171,19 +158,6 @@ public class MailboxImpl implements Mailbox {
}
}
- @Override
- public void waitUntilHasCapacity() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (isFull()) {
- notFull.await();
- }
- } finally {
- lock.unlock();
- }
- }
-
//------------------------------------------------------------------------------------------------------------------
private void putInternal(Runnable letter) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
index 189687e..2d2f112 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
@@ -50,10 +50,4 @@ public interface MailboxReceiver {
*/
@Nonnull
Runnable takeMail() throws InterruptedException;
-
- /**
- * This method blocks if the mailbox is empty until mail becomes available.
- * @throws InterruptedException on interruption.
- */
- void waitUntilHasMail() throws InterruptedException;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
index 1829125..36d10a1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
@@ -42,11 +42,4 @@ public interface MailboxSender {
* @throws InterruptedException on interruption.
*/
void putMail(@Nonnull Runnable letter) throws InterruptedException;
-
- /**
- * This method blocks until the mailbox has again capacity to enqueue new letters.
- *
- * @throws InterruptedException on interruption.
- */
- void waitUntilHasCapacity() throws InterruptedException;
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
index fc7f19c..9c4edbf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
@@ -27,6 +27,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.LinkedList;
+import java.util.Optional;
import java.util.Queue;
/**
@@ -81,24 +82,7 @@ public class MailboxImplTest {
while (!testObjects.isEmpty()) {
Assert.assertEquals(testObjects.remove(), mailbox.tryTakeMail().get());
Assert.assertEquals(!testObjects.isEmpty(), mailbox.hasMail());
- mailbox.waitUntilHasCapacity(); // should not block here because the mailbox is not full
}
-
- Thread waitingReader = new Thread(ThrowingRunnable.unchecked(() -> mailbox.waitUntilHasMail()));
- waitingReader.start();
- Thread.sleep(1);
- Assert.assertTrue(waitingReader.isAlive());
- mailbox.tryPutMail(() -> {});
- waitingReader.join(); // should complete here
-
- while (mailbox.tryPutMail(() -> {})) {}
-
- Thread waitingWriter = new Thread(ThrowingRunnable.unchecked(() -> mailbox.waitUntilHasCapacity()));
- waitingWriter.start();
- Thread.sleep(1);
- Assert.assertTrue(waitingWriter.isAlive());
- mailbox.takeMail();
- waitingWriter.join();
}
/**
@@ -115,13 +99,14 @@ public class MailboxImplTest {
@Test
public void testConcurrentPutTakeNonBlockingAndWait() throws Exception {
testPutTake((mailbox -> {
- mailbox.waitUntilHasMail();
- return mailbox.tryTakeMail().get();
+ Optional<Runnable> optionalLetter = mailbox.tryTakeMail();
+ while (!optionalLetter.isPresent()) {
+ optionalLetter = mailbox.tryTakeMail();
+ }
+ return optionalLetter.get();
}),
((mailbox, runnable) -> {
- while (!mailbox.tryPutMail(runnable)) {
- mailbox.waitUntilHasCapacity();
- }
+ while (!mailbox.tryPutMail(runnable)) {}
}));
}