You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/08/23 14:28:16 UTC
[flink-ml] branch master updated: [FLINK-29043] Reduce overhead for non-feedback HeadOperator mails
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push:
new f9f7f41 [FLINK-29043] Reduce overhead for non-feedback HeadOperator mails
f9f7f41 is described below
commit f9f7f41ed0b1c804c63a1fb6f650c34509b69a63
Author: yunfengzhou-hub <yu...@outlook.com>
AuthorDate: Tue Aug 23 22:28:11 2022 +0800
[FLINK-29043] Reduce overhead for non-feedback HeadOperator mails
This closes #147.
---
.../flink/iteration/operator/HeadOperator.java | 54 +++++++++++-----------
1 file changed, 27 insertions(+), 27 deletions(-)
diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java
index 541991d..d7df119 100644
--- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java
@@ -76,22 +76,22 @@ import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.function.ThrowingRunnable;
-import javax.annotation.concurrent.GuardedBy;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
import static org.apache.flink.util.Preconditions.checkState;
@@ -561,15 +561,11 @@ public class HeadOperator extends AbstractStreamOperator<IterationRecord<?>>
private static class MailboxExecutorWithYieldTimeout implements MailboxExecutor {
private final MailboxExecutor mailboxExecutor;
- private final ReentrantLock lock;
-
- @GuardedBy("lock")
- private final Condition notEmpty;
+ private final Timer timer;
private MailboxExecutorWithYieldTimeout(MailboxExecutor mailboxExecutor) {
this.mailboxExecutor = mailboxExecutor;
- this.lock = new ReentrantLock();
- this.notEmpty = lock.newCondition();
+ this.timer = new Timer();
}
@Override
@@ -577,14 +573,7 @@ public class HeadOperator extends AbstractStreamOperator<IterationRecord<?>>
ThrowingRunnable<? extends Exception> command,
String descriptionFormat,
Object... descriptionArgs) {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- mailboxExecutor.execute(command, descriptionFormat, descriptionArgs);
- notEmpty.signal();
- } finally {
- lock.unlock();
- }
+ mailboxExecutor.execute(command, descriptionFormat, descriptionArgs);
}
@Override
@@ -607,16 +596,27 @@ public class HeadOperator extends AbstractStreamOperator<IterationRecord<?>>
* @param unit the time unit of the {@code time} argument
*/
private void yield(long time, TimeUnit unit) throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- if (!mailboxExecutor.tryYield()) {
- notEmpty.await(time, unit);
- mailboxExecutor.tryYield();
- }
- } finally {
- lock.unlock();
+ if (mailboxExecutor.tryYield()) {
+ return;
}
+
+ timer.schedule(
+ new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ mailboxExecutor.execute(
+ () -> {}, "NoOp runnable to trigger yield timeout");
+ } catch (RejectedExecutionException e) {
+ if (!(e.getCause() instanceof TaskMailbox.MailboxClosedException)) {
+ throw e;
+ }
+ }
+ }
+ },
+ unit.toMillis(time));
+
+ mailboxExecutor.yield();
}
}
}