You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/08/23 14:28:16 UTC

[flink-ml] branch master updated: [FLINK-29043] Reduce overhead for non-feedback HeadOperator mails

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


The following commit(s) were added to refs/heads/master by this push:
     new f9f7f41  [FLINK-29043] Reduce overhead for non-feedback HeadOperator mails
f9f7f41 is described below

commit f9f7f41ed0b1c804c63a1fb6f650c34509b69a63
Author: yunfengzhou-hub <yu...@outlook.com>
AuthorDate: Tue Aug 23 22:28:11 2022 +0800

    [FLINK-29043] Reduce overhead for non-feedback HeadOperator mails
    
    This closes #147.
---
 .../flink/iteration/operator/HeadOperator.java     | 54 +++++++++++-----------
 1 file changed, 27 insertions(+), 27 deletions(-)

diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java
index 541991d..d7df119 100644
--- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java
@@ -76,22 +76,22 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.function.ThrowingRunnable;
 
-import javax.annotation.concurrent.GuardedBy;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -561,15 +561,11 @@ public class HeadOperator extends AbstractStreamOperator<IterationRecord<?>>
     private static class MailboxExecutorWithYieldTimeout implements MailboxExecutor {
         private final MailboxExecutor mailboxExecutor;
 
-        private final ReentrantLock lock;
-
-        @GuardedBy("lock")
-        private final Condition notEmpty;
+        private final Timer timer;
 
         private MailboxExecutorWithYieldTimeout(MailboxExecutor mailboxExecutor) {
             this.mailboxExecutor = mailboxExecutor;
-            this.lock = new ReentrantLock();
-            this.notEmpty = lock.newCondition();
+            this.timer = new Timer();
         }
 
         @Override
@@ -577,14 +573,7 @@ public class HeadOperator extends AbstractStreamOperator<IterationRecord<?>>
                 ThrowingRunnable<? extends Exception> command,
                 String descriptionFormat,
                 Object... descriptionArgs) {
-            final ReentrantLock lock = this.lock;
-            lock.lock();
-            try {
-                mailboxExecutor.execute(command, descriptionFormat, descriptionArgs);
-                notEmpty.signal();
-            } finally {
-                lock.unlock();
-            }
+            mailboxExecutor.execute(command, descriptionFormat, descriptionArgs);
         }
 
         @Override
@@ -607,16 +596,27 @@ public class HeadOperator extends AbstractStreamOperator<IterationRecord<?>>
          * @param unit the time unit of the {@code time} argument
          */
         private void yield(long time, TimeUnit unit) throws InterruptedException {
-            final ReentrantLock lock = this.lock;
-            lock.lock();
-            try {
-                if (!mailboxExecutor.tryYield()) {
-                    notEmpty.await(time, unit);
-                    mailboxExecutor.tryYield();
-                }
-            } finally {
-                lock.unlock();
+            if (mailboxExecutor.tryYield()) {
+                return;
             }
+
+            timer.schedule(
+                    new TimerTask() {
+                        @Override
+                        public void run() {
+                            try {
+                                mailboxExecutor.execute(
+                                        () -> {}, "NoOp runnable to trigger yield timeout");
+                            } catch (RejectedExecutionException e) {
+                                if (!(e.getCause() instanceof TaskMailbox.MailboxClosedException)) {
+                                    throw e;
+                                }
+                            }
+                        }
+                    },
+                    unit.toMillis(time));
+
+            mailboxExecutor.yield();
         }
     }
 }