You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/06/02 17:42:15 UTC

[flink] 02/03: [FLINK-16057][task] Optimize ContinuousFileReaderOperator

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a69cb9fce629b0c458f5ea514d9ac8de008687f
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri May 29 18:10:22 2020 +0200

    [FLINK-16057][task] Optimize ContinuousFileReaderOperator
    
    Current approach of re-enqueuing mails creates an overhead
    visible in benchmarks. This change eliminates unnecessary
    re-enqueueing of mails by checking mailboxExecutor.isIdle.
---
 .../source/ContinuousFileReaderOperator.java       | 25 ++++++++++++----------
 .../runtime/tasks/mailbox/MailboxExecutorImpl.java | 13 +++++++++++
 .../runtime/tasks/mailbox/MailboxProcessor.java    |  2 +-
 .../runtime/tasks/mailbox/TaskMailbox.java         | 11 +++++++++-
 4 files changed, 38 insertions(+), 13 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index c53049a..87a028c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.RunnableWithException;
@@ -205,7 +206,7 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
 
 	private transient InputFormat<OUT, ? super T> format;
 	private TypeSerializer<OUT> serializer;
-	private transient MailboxExecutor executor;
+	private transient MailboxExecutorImpl executor;
 	private transient OUT reusedRecord;
 	private transient SourceFunction.SourceContext<OUT> sourceContext;
 	private transient ListState<T> checkpointedState;
@@ -233,7 +234,7 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
 
 		this.format = checkNotNull(format);
 		this.processingTimeService = checkNotNull(processingTimeService);
-		this.executor = checkNotNull(mailboxExecutor);
+		this.executor = (MailboxExecutorImpl) checkNotNull(mailboxExecutor);
 	}
 
 	@Override
@@ -311,17 +312,19 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
 	}
 
 	private void processRecord() throws IOException {
-		if (!state.prepareToProcessRecord(this)) {
-			return;
-		}
+		do {
+			if (!state.prepareToProcessRecord(this)) {
+				return;
+			}
 
-		readAndCollectRecord();
+			readAndCollectRecord();
 
-		if (format.reachedEnd()) {
-			onSplitProcessed();
-		} else {
-			enqueueProcessRecord();
-		}
+			if (format.reachedEnd()) {
+				onSplitProcessed();
+				return;
+			}
+		} while (executor.isIdle()); // todo: consider moving this loop into MailboxProcessor (return boolean "re-execute" from enqueued action)
+		enqueueProcessRecord();
 	}
 
 	private void onSplitProcessed() throws IOException {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
index 2f99bec..48b57d8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
@@ -43,10 +43,22 @@ public final class MailboxExecutorImpl implements MailboxExecutor {
 
 	private final StreamTaskActionExecutor actionExecutor;
 
+	private final MailboxProcessor mailboxProcessor;
+
 	public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor) {
+		this(mailbox, priority, actionExecutor, null);
+	}
+
+	public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor, MailboxProcessor mailboxProcessor) {
 		this.mailbox = mailbox;
 		this.priority = priority;
 		this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
+		this.mailboxProcessor = mailboxProcessor;
+	}
+
+	public boolean isIdle() {
+		return !mailboxProcessor.isMailboxLoopRunning() ||
+			(mailboxProcessor.isDefaultActionUnavailable() && !mailbox.hasMail() && mailbox.getState().isAcceptingMails());
 	}
 
 	@Override
@@ -85,4 +97,5 @@ public final class MailboxExecutorImpl implements MailboxExecutor {
 			return false;
 		}
 	}
+
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
index a955c2b..2180971 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
@@ -116,7 +116,7 @@ public class MailboxProcessor implements Closeable {
 	 * @param priority the priority of the {@link MailboxExecutor}.
 	 */
 	public MailboxExecutor getMailboxExecutor(int priority) {
-		return new MailboxExecutorImpl(mailbox, priority, actionExecutor);
+		return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this);
 	}
 
 	public void initMetric(TaskMetricGroup metricGroup) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
index 5e9fb85..8928525 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
@@ -173,7 +173,16 @@ public interface TaskMailbox {
 	 * This enum represents the states of the mailbox lifecycle.
 	 */
 	enum State {
-		OPEN, QUIESCED, CLOSED
+		OPEN(true), QUIESCED(false), CLOSED(false);
+		private final boolean acceptingMails;
+
+		State(boolean acceptingMails) {
+			this.acceptingMails = acceptingMails;
+		}
+
+		public boolean isAcceptingMails() {
+			return acceptingMails;
+		}
 	}
 
 	/**