You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/01/30 13:19:22 UTC

[GitHub] [flink] pnowojski commented on a change in pull request #10435: [FLINK-13955][runtime] migrate ContinuousFileReaderOperator to the mailbox execution model

pnowojski commented on a change in pull request #10435: [FLINK-13955][runtime] migrate ContinuousFileReaderOperator to the mailbox execution model
URL: https://github.com/apache/flink/pull/10435#discussion_r372933131
 
 

 ##########
 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ##########
 @@ -67,361 +89,422 @@
 
 	private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
 
-	private FileInputFormat<OUT> format;
-	private TypeSerializer<OUT> serializer;
+	private enum ReaderState {
+		IDLE {
+			@Override
+			public boolean prepareToProcessRecord(ContinuousFileReaderOperator<?> op) {
+				throw new IllegalStateException("not processing any records in IDLE state");
+			}
+		},
+		/**
+		 * A message is enqueued to process split, but no split is opened.
+		 */
+		OPENING { // the split was added and message to itself was enqueued to process it
+			@Override
+			public boolean prepareToProcessRecord(ContinuousFileReaderOperator<?> op) throws IOException {
+				if (op.splits.isEmpty()) {
+					op.switchState(ReaderState.IDLE);
+					return false;
+				} else {
+					op.loadSplit(op.splits.poll());
+					op.switchState(ReaderState.READING);
+					return true;
+				}
+			}
+		},
+		/**
+		 * A message is enqueued to process split and its processing was started.
+		 */
+		READING {
+			@Override
+			public boolean prepareToProcessRecord(ContinuousFileReaderOperator<?> op) {
+				return true;
+			}
+
+			@Override
+			public void onNoMoreData(ContinuousFileReaderOperator<?> op) {
+				op.switchState(ReaderState.IDLE);
+			}
+		},
+		/**
+		 * {@link #close()} was called but unprocessed data (records and splits) remains and needs to be processed.
+		 * {@link #close()} caller is blocked.
+		 */
+		CLOSING {
+			@Override
+			public boolean prepareToProcessRecord(ContinuousFileReaderOperator<?> op) throws IOException {
+				if (op.currentSplit == null && !op.splits.isEmpty()) {
+					op.loadSplit(op.splits.poll());
+				}
+				return true;
+			}
+
+			@Override
+			public void onNoMoreData(ContinuousFileReaderOperator<?> op) {
+				// need one more mail to unblock possible yield() in close() method (todo: wait with timeout in yield)
+				op.enqueueMail();
+				op.switchState(CLOSED);
+			}
+		},
+		CLOSED {
+			@Override
+			public boolean prepareToProcessRecord(ContinuousFileReaderOperator<?> op) {
+				LOG.warn("not processing any records while closed");
+				return false;
+			}
+		};
+
+		private static final Set<ReaderState> ACCEPT_SPLITS = EnumSet.of(IDLE, OPENING, READING);
+		/**
+		 * Possible transition FROM each state.
+		 */
+		private static final Map<ReaderState, Set<ReaderState>> TRANSITIONS;
+		static {
+			Map<ReaderState, Set<ReaderState>> tmpTransitions = new HashMap<>();
+			tmpTransitions.put(IDLE, EnumSet.of(OPENING, CLOSED));
+			tmpTransitions.put(OPENING, EnumSet.of(READING, CLOSING));
+			tmpTransitions.put(READING, EnumSet.of(IDLE, OPENING, CLOSING));
+			tmpTransitions.put(CLOSING, EnumSet.of(CLOSED));
+			tmpTransitions.put(CLOSED, EnumSet.noneOf(ReaderState.class));
+			TRANSITIONS = new EnumMap<>(tmpTransitions);
+		}
 
-	private transient Object checkpointLock;
+		public boolean isAcceptingSplits() {
+			return ACCEPT_SPLITS.contains(this);
+		}
 
-	private transient SplitReader<OUT> reader;
-	private transient SourceFunction.SourceContext<OUT> readerContext;
+		public final boolean isTerminal() {
+			return this == CLOSED;
+		}
 
+		public boolean canSwitchTo(ReaderState next) {
+			return TRANSITIONS
+					.getOrDefault(this, EnumSet.noneOf(ReaderState.class))
+					.contains(next);
+		}
+
+		/**
+		 * Prepare to process new record OR split.
+		 * @return true if should read the record
+		 */
+		public abstract boolean prepareToProcessRecord(ContinuousFileReaderOperator<?> op) throws IOException;
+
+		public void onNoMoreData(ContinuousFileReaderOperator<?> op) {
+		}
+	}
+
+	private transient FileInputFormat<OUT> format;
+	private TypeSerializer<OUT> serializer;
+	private transient MailboxExecutor executor;
+	private transient OUT reusedRecord;
+	private transient SourceFunction.SourceContext<OUT> sourceContext;
 	private transient ListState<TimestampedFileInputSplit> checkpointedState;
-	private transient List<TimestampedFileInputSplit> restoredReaderState;
+	/**
+	 * MUST only be changed via {@link #switchState(ReaderState) switchState}.
+	 */
+	private transient ReaderState state = ReaderState.IDLE;
+	private transient PriorityQueue<TimestampedFileInputSplit> splits;
+	private transient TimestampedFileInputSplit currentSplit; // can't work just on queue tail because it can change because it's PQ
+	private transient Counter completedSplitsCounter;
+
+	private final transient RunnableWithException runnable = () -> {
+		try {
+			processRecord();
+		} catch (Exception e) {
+			switchState(ReaderState.CLOSED);
+			throw e;
+		}
+	};
 
+	@VisibleForTesting
 	public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
 		this.format = checkNotNull(format);
 	}
 
-	@Override
-	public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
-		this.serializer = outTypeInfo.createSerializer(executionConfig);
+	public ContinuousFileReaderOperator(FileInputFormat<OUT> format, MailboxExecutor mailboxExecutor) {
+		this.format = checkNotNull(format);
+		this.executor = checkNotNull(mailboxExecutor);
 	}
 
 	@Override
 	public void initializeState(StateInitializationContext context) throws Exception {
 		super.initializeState(context);
 
-		checkState(checkpointedState == null,	"The reader state has already been initialized.");
+		checkState(checkpointedState == null, "The reader state has already been initialized.");
 
 		checkpointedState = context.getOperatorStateStore().getSerializableListState("splits");
 
 		int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
-		if (context.isRestored()) {
-			LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIdx);
-
-			// this may not be null in case we migrate from a previous Flink version.
-			if (restoredReaderState == null) {
-				restoredReaderState = new ArrayList<>();
-				for (TimestampedFileInputSplit split : checkpointedState.get()) {
-					restoredReaderState.add(split);
-				}
-
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("{} (taskIdx={}) restored {}.", getClass().getSimpleName(), subtaskIdx, restoredReaderState);
-				}
-			}
-		} else {
+		if (!context.isRestored()) {
 			LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIdx);
+			return;
+		}
+
+		LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIdx);
+
+		splits = splits == null ? new PriorityQueue<>() : splits;
+		for (TimestampedFileInputSplit split : checkpointedState.get()) {
+			splits.add(split);
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("{} (taskIdx={}) restored {}.", getClass().getSimpleName(), subtaskIdx, splits);
 		}
 	}
 
 	@Override
 	public void open() throws Exception {
 		super.open();
 
-		checkState(this.reader == null, "The reader is already initialized.");
 		checkState(this.serializer != null, "The serializer has not been set. " +
 			"Probably the setOutputType() was not called. Please report it.");
 
+		this.state = ReaderState.IDLE;
 		this.format.setRuntimeContext(getRuntimeContext());
 		this.format.configure(new Configuration());
-		this.checkpointLock = getContainingTask().getCheckpointLock();
-
-		// set the reader context based on the time characteristic
-		final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
-		final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
-		this.readerContext = StreamSourceContexts.getSourceContext(
-			timeCharacteristic,
-			getProcessingTimeService(),
-			checkpointLock,
-			getContainingTask().getStreamStatusMaintainer(),
-			output,
-			watermarkInterval,
-			-1);
-
-		// and initialize the split reading thread
-		this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, restoredReaderState);
-		this.restoredReaderState = null;
-		this.reader.start();
+
+		this.sourceContext = StreamSourceContexts.getSourceContext(
+				getOperatorConfig().getTimeCharacteristic(),
+				getProcessingTimeService(),
+				new Object(), // no actual locking needed
+				getContainingTask().getStreamStatusMaintainer(),
+				output,
+				getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(),
+				-1);
 
 Review comment:
   nitty nit: this should be single tab according to [our standard](https://flink.apache.org/contributing/code-style-and-quality-formatting.html#breaking-the-lines-of-too-long-statements)
   
   > Each new line should have one extra indentation (or two for a function declaration) relative to the line of the parent function name or the called entity

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services