You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/06/02 17:42:13 UTC

[flink] branch master updated (3dabc69 -> 31d661d)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 3dabc69  [FLINK-18012] Deactivate slot timeout when calling TaskSlotTable.tryMarkSlotActive
     new 78b4e3d  [FLINK-16057][hotfix][task] Remove MailboxProcessor.mainMailboxExecutor field
     new 1a69cb9  [FLINK-16057][task] Optimize ContinuousFileReaderOperator
     new 31d661d  [FLINK-16057][task] Optimize TaskMailbox state retrieval

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../source/ContinuousFileReaderOperator.java       | 25 ++++++++++++----------
 .../flink/streaming/runtime/tasks/StreamTask.java  | 11 ++++++----
 .../runtime/tasks/mailbox/MailboxExecutorImpl.java | 13 +++++++++++
 .../runtime/tasks/mailbox/MailboxProcessor.java    | 19 ++--------------
 .../runtime/tasks/mailbox/TaskMailbox.java         | 11 +++++++++-
 .../runtime/tasks/mailbox/TaskMailboxImpl.java     |  3 +++
 .../tasks/mailbox/MailboxExecutorImplTest.java     |  2 +-
 7 files changed, 50 insertions(+), 34 deletions(-)


[flink] 02/03: [FLINK-16057][task] Optimize ContinuousFileReaderOperator

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a69cb9fce629b0c458f5ea514d9ac8de008687f
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri May 29 18:10:22 2020 +0200

    [FLINK-16057][task] Optimize ContinuousFileReaderOperator
    
    Current approach of re-enqueuing mails creates an overhead
    visible in benchmarks. This change eliminates unnecessary
    re-enqueueing of mails by checking mailboxExecutor.isIdle.
---
 .../source/ContinuousFileReaderOperator.java       | 25 ++++++++++++----------
 .../runtime/tasks/mailbox/MailboxExecutorImpl.java | 13 +++++++++++
 .../runtime/tasks/mailbox/MailboxProcessor.java    |  2 +-
 .../runtime/tasks/mailbox/TaskMailbox.java         | 11 +++++++++-
 4 files changed, 38 insertions(+), 13 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index c53049a..87a028c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.RunnableWithException;
@@ -205,7 +206,7 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
 
 	private transient InputFormat<OUT, ? super T> format;
 	private TypeSerializer<OUT> serializer;
-	private transient MailboxExecutor executor;
+	private transient MailboxExecutorImpl executor;
 	private transient OUT reusedRecord;
 	private transient SourceFunction.SourceContext<OUT> sourceContext;
 	private transient ListState<T> checkpointedState;
@@ -233,7 +234,7 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
 
 		this.format = checkNotNull(format);
 		this.processingTimeService = checkNotNull(processingTimeService);
-		this.executor = checkNotNull(mailboxExecutor);
+		this.executor = (MailboxExecutorImpl) checkNotNull(mailboxExecutor);
 	}
 
 	@Override
@@ -311,17 +312,19 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
 	}
 
 	private void processRecord() throws IOException {
-		if (!state.prepareToProcessRecord(this)) {
-			return;
-		}
+		do {
+			if (!state.prepareToProcessRecord(this)) {
+				return;
+			}
 
-		readAndCollectRecord();
+			readAndCollectRecord();
 
-		if (format.reachedEnd()) {
-			onSplitProcessed();
-		} else {
-			enqueueProcessRecord();
-		}
+			if (format.reachedEnd()) {
+				onSplitProcessed();
+				return;
+			}
+		} while (executor.isIdle()); // todo: consider moving this loop into MailboxProcessor (return boolean "re-execute" from enqueued action)
+		enqueueProcessRecord();
 	}
 
 	private void onSplitProcessed() throws IOException {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
index 2f99bec..48b57d8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
@@ -43,10 +43,22 @@ public final class MailboxExecutorImpl implements MailboxExecutor {
 
 	private final StreamTaskActionExecutor actionExecutor;
 
+	private final MailboxProcessor mailboxProcessor;
+
 	public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor) {
+		this(mailbox, priority, actionExecutor, null);
+	}
+
+	public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor, MailboxProcessor mailboxProcessor) {
 		this.mailbox = mailbox;
 		this.priority = priority;
 		this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
+		this.mailboxProcessor = mailboxProcessor;
+	}
+
+	public boolean isIdle() {
+		return !mailboxProcessor.isMailboxLoopRunning() ||
+			(mailboxProcessor.isDefaultActionUnavailable() && !mailbox.hasMail() && mailbox.getState().isAcceptingMails());
 	}
 
 	@Override
@@ -85,4 +97,5 @@ public final class MailboxExecutorImpl implements MailboxExecutor {
 			return false;
 		}
 	}
+
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
index a955c2b..2180971 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
@@ -116,7 +116,7 @@ public class MailboxProcessor implements Closeable {
 	 * @param priority the priority of the {@link MailboxExecutor}.
 	 */
 	public MailboxExecutor getMailboxExecutor(int priority) {
-		return new MailboxExecutorImpl(mailbox, priority, actionExecutor);
+		return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this);
 	}
 
 	public void initMetric(TaskMetricGroup metricGroup) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
index 5e9fb85..8928525 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
@@ -173,7 +173,16 @@ public interface TaskMailbox {
 	 * This enum represents the states of the mailbox lifecycle.
 	 */
 	enum State {
-		OPEN, QUIESCED, CLOSED
+		OPEN(true), QUIESCED(false), CLOSED(false);
+		private final boolean acceptingMails;
+
+		State(boolean acceptingMails) {
+			this.acceptingMails = acceptingMails;
+		}
+
+		public boolean isAcceptingMails() {
+			return acceptingMails;
+		}
 	}
 
 	/**


[flink] 03/03: [FLINK-16057][task] Optimize TaskMailbox state retrieval

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 31d661dfaf3798e1a80bff437e358e3bfc40bbce
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu May 28 10:41:56 2020 +0200

    [FLINK-16057][task] Optimize TaskMailbox state retrieval
    
    Don't lock in TaskMailboxImpl.getState for task thread.
    This makes ContinuousFileReaderOperator about 29% faster.
---
 .../apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java  | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
index 99a63a4..adeb356 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
@@ -311,6 +311,9 @@ public class TaskMailboxImpl implements TaskMailbox {
 	@Nonnull
 	@Override
 	public State getState() {
+		if (isMailboxThread()) {
+			return state;
+		}
 		final ReentrantLock lock = this.lock;
 		lock.lock();
 		try {


[flink] 01/03: [FLINK-16057][hotfix][task] Remove MailboxProcessor.mainMailboxExecutor field

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 78b4e3da6fc038cb2a1cb33fe9a89cdbaaa348d5
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue Jun 2 09:19:14 2020 +0200

    [FLINK-16057][hotfix][task] Remove MailboxProcessor.mainMailboxExecutor field
    
    Currently, the field is unnecessary; further adding MailboxProcessor
    field to MailboxExecutor creates cyclic dependency
---
 .../flink/streaming/runtime/tasks/StreamTask.java       | 11 +++++++----
 .../runtime/tasks/mailbox/MailboxProcessor.java         | 17 +----------------
 .../runtime/tasks/mailbox/MailboxExecutorImplTest.java  |  2 +-
 3 files changed, 9 insertions(+), 21 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 7c08712..c44576b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -207,6 +207,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	protected final MailboxProcessor mailboxProcessor;
 
+	final MailboxExecutor mainMailboxExecutor;
+
 	/**
 	 * TODO it might be replaced by the global IO executor on TaskManager level future.
 	 */
@@ -276,6 +278,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
 		this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
 		this.mailboxProcessor.initMetric(environment.getMetricGroup());
+		this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor();
 		this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment);
 		this.asyncOperationsThreadPool = Executors.newCachedThreadPool(
 			new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler));
@@ -498,7 +501,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			}
 
 			// Note that we must request partition after all the single gates finished recovery.
-			CompletableFuture.allOf(futures).thenRun(() -> mailboxProcessor.getMainMailboxExecutor().execute(
+			CompletableFuture.allOf(futures).thenRun(() -> mainMailboxExecutor.execute(
 				this::requestPartitions, "Input gates request partitions"));
 		}
 	}
@@ -778,7 +781,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			boolean advanceToEndOfEventTime) {
 
 		CompletableFuture<Boolean> result = new CompletableFuture<>();
-		mailboxProcessor.getMainMailboxExecutor().execute(
+		mainMailboxExecutor.execute(
 				() -> {
 					try {
 						result.complete(triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime));
@@ -830,7 +833,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		if (mailboxProcessor.isMailboxThread()) {
 			runnable.run();
 		} else {
-			mailboxProcessor.getMainMailboxExecutor().execute(runnable, descriptionFormat, descriptionArgs);
+			mainMailboxExecutor.execute(runnable, descriptionFormat, descriptionArgs);
 		}
 	}
 
@@ -966,7 +969,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	@Override
 	public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) throws FlinkException {
 		try {
-			mailboxProcessor.getMainMailboxExecutor().execute(
+			mainMailboxExecutor.execute(
 				() -> {
 					try {
 						operatorChain.dispatchOperatorEvent(operator, event);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
index b315e4c..a955c2b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
@@ -71,9 +71,6 @@ public class MailboxProcessor implements Closeable {
 	/** Action that is repeatedly executed if no action request is in the mailbox. Typically record processing. */
 	protected final MailboxDefaultAction mailboxDefaultAction;
 
-	/** A pre-created instance of mailbox executor that executes all mails. */
-	private final MailboxExecutor mainMailboxExecutor;
-
 	/** Control flag to terminate the mailbox loop. Must only be accessed from mailbox thread. */
 	private boolean mailboxLoopRunning;
 
@@ -102,27 +99,15 @@ public class MailboxProcessor implements Closeable {
 			MailboxDefaultAction mailboxDefaultAction,
 			TaskMailbox mailbox,
 			StreamTaskActionExecutor actionExecutor) {
-		this(mailboxDefaultAction, actionExecutor, mailbox, new MailboxExecutorImpl(mailbox, MIN_PRIORITY, actionExecutor));
-	}
-
-	public MailboxProcessor(
-			MailboxDefaultAction mailboxDefaultAction,
-			StreamTaskActionExecutor actionExecutor,
-			TaskMailbox mailbox,
-			MailboxExecutor mainMailboxExecutor) {
 		this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction);
 		this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
 		this.mailbox = Preconditions.checkNotNull(mailbox);
-		this.mainMailboxExecutor = Preconditions.checkNotNull(mainMailboxExecutor);
 		this.mailboxLoopRunning = true;
 		this.suspendedDefaultAction = null;
 	}
 
-	/**
-	 * Returns a pre-created executor service that executes all mails.
-	 */
 	public MailboxExecutor getMainMailboxExecutor() {
-		return mainMailboxExecutor;
+		return new MailboxExecutorImpl(mailbox, MIN_PRIORITY, actionExecutor);
 	}
 
 	/**
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java
index efd70d9..ab40979 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java
@@ -60,7 +60,7 @@ public class MailboxExecutorImplTest {
 		this.mailbox = new TaskMailboxImpl();
 		this.mailboxExecutor = new MailboxExecutorImpl(mailbox, DEFAULT_PRIORITY, StreamTaskActionExecutor.IMMEDIATE);
 		this.otherThreadExecutor = Executors.newSingleThreadScheduledExecutor();
-		this.mailboxProcessor = new MailboxProcessor(c -> { }, StreamTaskActionExecutor.IMMEDIATE, mailbox, mailboxExecutor);
+		this.mailboxProcessor = new MailboxProcessor(c -> { }, mailbox, StreamTaskActionExecutor.IMMEDIATE);
 	}
 
 	@After