You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/06/02 17:42:15 UTC
[flink] 02/03: [FLINK-16057][task] Optimize
ContinuousFileReaderOperator
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1a69cb9fce629b0c458f5ea514d9ac8de008687f
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri May 29 18:10:22 2020 +0200
[FLINK-16057][task] Optimize ContinuousFileReaderOperator
Current approach of re-enqueuing mails creates an overhead
visible in benchmarks. This change eliminates unnecessary
re-enqueueing of mails by checking mailboxExecutor.isIdle.
---
.../source/ContinuousFileReaderOperator.java | 25 ++++++++++++----------
.../runtime/tasks/mailbox/MailboxExecutorImpl.java | 13 +++++++++++
.../runtime/tasks/mailbox/MailboxProcessor.java | 2 +-
.../runtime/tasks/mailbox/TaskMailbox.java | 11 +++++++++-
4 files changed, 38 insertions(+), 13 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index c53049a..87a028c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;
@@ -205,7 +206,7 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
private transient InputFormat<OUT, ? super T> format;
private TypeSerializer<OUT> serializer;
- private transient MailboxExecutor executor;
+ private transient MailboxExecutorImpl executor;
private transient OUT reusedRecord;
private transient SourceFunction.SourceContext<OUT> sourceContext;
private transient ListState<T> checkpointedState;
@@ -233,7 +234,7 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
this.format = checkNotNull(format);
this.processingTimeService = checkNotNull(processingTimeService);
- this.executor = checkNotNull(mailboxExecutor);
+ this.executor = (MailboxExecutorImpl) checkNotNull(mailboxExecutor);
}
@Override
@@ -311,17 +312,19 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
}
private void processRecord() throws IOException {
- if (!state.prepareToProcessRecord(this)) {
- return;
- }
+ do {
+ if (!state.prepareToProcessRecord(this)) {
+ return;
+ }
- readAndCollectRecord();
+ readAndCollectRecord();
- if (format.reachedEnd()) {
- onSplitProcessed();
- } else {
- enqueueProcessRecord();
- }
+ if (format.reachedEnd()) {
+ onSplitProcessed();
+ return;
+ }
+ } while (executor.isIdle()); // todo: consider moving this loop into MailboxProcessor (return boolean "re-execute" from enqueued action)
+ enqueueProcessRecord();
}
private void onSplitProcessed() throws IOException {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
index 2f99bec..48b57d8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
@@ -43,10 +43,22 @@ public final class MailboxExecutorImpl implements MailboxExecutor {
private final StreamTaskActionExecutor actionExecutor;
+ private final MailboxProcessor mailboxProcessor;
+
public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor) {
+ this(mailbox, priority, actionExecutor, null);
+ }
+
+ public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor, MailboxProcessor mailboxProcessor) {
this.mailbox = mailbox;
this.priority = priority;
this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
+ this.mailboxProcessor = mailboxProcessor;
+ }
+
+ public boolean isIdle() {
+ return !mailboxProcessor.isMailboxLoopRunning() ||
+ (mailboxProcessor.isDefaultActionUnavailable() && !mailbox.hasMail() && mailbox.getState().isAcceptingMails());
}
@Override
@@ -85,4 +97,5 @@ public final class MailboxExecutorImpl implements MailboxExecutor {
return false;
}
}
+
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
index a955c2b..2180971 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
@@ -116,7 +116,7 @@ public class MailboxProcessor implements Closeable {
* @param priority the priority of the {@link MailboxExecutor}.
*/
public MailboxExecutor getMailboxExecutor(int priority) {
- return new MailboxExecutorImpl(mailbox, priority, actionExecutor);
+ return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this);
}
public void initMetric(TaskMetricGroup metricGroup) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
index 5e9fb85..8928525 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
@@ -173,7 +173,16 @@ public interface TaskMailbox {
* This enum represents the states of the mailbox lifecycle.
*/
enum State {
- OPEN, QUIESCED, CLOSED
+ OPEN(true), QUIESCED(false), CLOSED(false);
+ private final boolean acceptingMails;
+
+ State(boolean acceptingMails) {
+ this.acceptingMails = acceptingMails;
+ }
+
+ public boolean isAcceptingMails() {
+ return acceptingMails;
+ }
}
/**