You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/29 09:47:52 UTC

[GitHub] [flink] pnowojski commented on a change in pull request #12401: [FLINK-16057] Optimize ContinuousFileReaderOperator

pnowojski commented on a change in pull request #12401:
URL: https://github.com/apache/flink/pull/12401#discussion_r432370071



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
##########
@@ -43,10 +43,22 @@
 
 	private final StreamTaskActionExecutor actionExecutor;
 
+	private MailboxProcessor mailboxProcessor;

Review comment:
       `@Nullable` - but check other comments first (for a better solution)

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
##########
@@ -114,6 +114,7 @@ public MailboxProcessor(
 		this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
 		this.mailbox = Preconditions.checkNotNull(mailbox);
 		this.mainMailboxExecutor = Preconditions.checkNotNull(mainMailboxExecutor);
+		((MailboxExecutorImpl) this.mainMailboxExecutor).setMailboxProcessor(this);

Review comment:
       Couple of problems:
   1. it changes an object passed from the outside
   2. requires cast (could be solved by changing the signature of the constructor)
   
   I'm thinking that maybe a better solution is to get rid of the `mainMailboxExecutor` and replace getter `getMainMailboxExecutor` with:
   ```
   	public MailboxExecutor getMainMailboxExecutor() {
   		return new MailboxExecutorImpl(mailbox, MIN_PRIORITY, actionExecutor, this);
   	}
   ```
   +
   cache access to `mailboxProcessor.getMainMailboxExecutor()` inside `StreamTask` by creating a field in ` StreamTask`: `private final MailboxExecutor mainMailboxExecutor`?
   
   This would solve the problem of circular dependency and nullable non final field inside `MailboxExecutorImpl`.
   
   WDYT?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
##########
@@ -43,10 +43,22 @@
 
 	private final StreamTaskActionExecutor actionExecutor;
 
+	private MailboxProcessor mailboxProcessor;
+
 	public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor) {
+		this(mailbox, priority, actionExecutor, null);
+	}
+
+	public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor, MailboxProcessor mailboxProcessor) {
 		this.mailbox = mailbox;
 		this.priority = priority;
 		this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
+		this.mailboxProcessor = mailboxProcessor;
+	}
+
+	public boolean isIdle() {
+		return !mailboxProcessor.isMailboxLoopRunning() ||
+			(mailboxProcessor.isDefaultActionUnavailable() && !mailbox.hasMail() && mailbox.getState().isAcceptingMails());

Review comment:
       `mailboxProcessor != null && mailboxProcessor.isDefaultActionUnavailable()`?  - but check other comments first (for a better solution)

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
##########
@@ -311,17 +312,19 @@ private void enqueueProcessRecord() {
 	}
 
 	private void processRecord() throws IOException {
-		if (!state.prepareToProcessRecord(this)) {
-			return;
-		}
+		do {
+			if (!state.prepareToProcessRecord(this)) {
+				return;
+			}
 
-		readAndCollectRecord();
+			readAndCollectRecord();
 
-		if (format.reachedEnd()) {
-			onSplitProcessed();
-		} else {
-			enqueueProcessRecord();
-		}
+			if (format.reachedEnd()) {
+				onSplitProcessed();
+				return;
+			}
+		} while (executor.isIdle()); // todo: consider moving this loop into MailboxProcessor (return boolean "re-execute" from enqueued action)

Review comment:
       nit: I would be slightly in favour of adding this method to the `MailboxExecutor` interface, but if you think opposite let it be for now.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
##########
@@ -50,7 +50,7 @@ public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, StreamTas
 	}
 
 	public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor, MailboxProcessor mailboxProcessor) {
-		this.mailbox = mailbox;
+		this.mailbox = (TaskMailboxImpl) mailbox;

Review comment:
       revert this change?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org