You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/01 14:41:22 UTC

[flink] 13/16: [FLINK-12777][operator] Use CheckpointedInputGate StreamTwoInputSelectableProcessor

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6c7a69a0442d6bfb3b5d86006db462756a51ec7c
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Jun 18 17:26:47 2019 +0200

    [FLINK-12777][operator] Use CheckpointedInputGate StreamTwoInputSelectableProcessor
---
 .../runtime/io/CheckpointBarrierDiscarder.java     | 74 ----------------------
 .../runtime/io/CheckpointedInputGate.java          | 23 ++++++-
 .../streaming/runtime/io/InputProcessorUtil.java   | 41 ++++++++++++
 .../io/StreamTwoInputSelectableProcessor.java      | 23 +++++--
 .../tasks/TwoInputSelectableStreamTask.java        |  7 +-
 5 files changed, 86 insertions(+), 82 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java
deleted file mode 100644
index 4c6cdab..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-import java.io.IOException;
-
-/**
- * The {@link CheckpointBarrierDiscarder} discards checkpoint barriers have been received from which input channels.
- */
-@Internal
-public class CheckpointBarrierDiscarder extends CheckpointBarrierHandler {
-	public CheckpointBarrierDiscarder() {
-		super(null);
-	}
-
-	@Override
-	public void releaseBlocksAndResetBarriers() throws IOException {
-	}
-
-	@Override
-	public boolean isBlocked(int channelIndex) {
-		return false;
-	}
-
-	@Override
-	public boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception {
-		return false;
-	}
-
-	@Override
-	public boolean processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
-		return false;
-	}
-
-	@Override
-	public boolean processEndOfPartition() throws Exception {
-		return false;
-	}
-
-	@Override
-	public long getLatestCheckpointId() {
-		return 0;
-	}
-
-	@Override
-	public long getAlignmentDurationNanos() {
-		return 0;
-	}
-
-	@Override
-	public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws Exception {
-
-	}
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
index 7604d0a..ce80e30 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
@@ -52,6 +52,8 @@ public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> {
 	/** The gate that the buffer draws its input from. */
 	private final InputGate inputGate;
 
+	private final int channelIndexOffset;
+
 	private final BufferStorage bufferStorage;
 
 	/** Flag to indicate whether we have drawn all available input. */
@@ -89,6 +91,13 @@ public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> {
 		);
 	}
 
+	public CheckpointedInputGate(
+			InputGate inputGate,
+			BufferStorage bufferStorage,
+			CheckpointBarrierHandler barrierHandler) {
+		this(inputGate, bufferStorage, barrierHandler, 0);
+	}
+
 	/**
 	 * Creates a new checkpoint stream aligner.
 	 *
@@ -99,12 +108,16 @@ public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> {
 	 * @param inputGate The input gate to draw the buffers and events from.
 	 * @param bufferStorage The storage to hold the buffers and events for blocked channels.
 	 * @param barrierHandler Handler that controls which channels are blocked.
+	 * @param channelIndexOffset Optional offset added to channelIndex returned from the inputGate
+	 *                           before passing it to the barrierHandler.
 	 */
 	public CheckpointedInputGate(
 			InputGate inputGate,
 			BufferStorage bufferStorage,
-			CheckpointBarrierHandler barrierHandler) {
+			CheckpointBarrierHandler barrierHandler,
+			int channelIndexOffset) {
 		this.inputGate = inputGate;
+		this.channelIndexOffset = channelIndexOffset;
 		this.bufferStorage = checkNotNull(bufferStorage);
 		this.barrierHandler = barrierHandler;
 	}
@@ -138,7 +151,7 @@ public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> {
 			}
 
 			BufferOrEvent bufferOrEvent = next.get();
-			if (barrierHandler.isBlocked(bufferOrEvent.getChannelIndex())) {
+			if (barrierHandler.isBlocked(offsetChannelIndex(bufferOrEvent.getChannelIndex()))) {
 				// if the channel is blocked, we just store the BufferOrEvent
 				bufferStorage.add(bufferOrEvent);
 				if (bufferStorage.isFull()) {
@@ -153,7 +166,7 @@ public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> {
 				CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
 				if (!endOfInputGate) {
 					// process barriers only if there is a chance of the checkpoint completing
-					if (barrierHandler.processBarrier(checkpointBarrier, bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) {
+					if (barrierHandler.processBarrier(checkpointBarrier, offsetChannelIndex(bufferOrEvent.getChannelIndex()), bufferStorage.getPendingBytes())) {
 						bufferStorage.rollOver();
 					}
 				}
@@ -174,6 +187,10 @@ public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> {
 		}
 	}
 
+	private int offsetChannelIndex(int channelIndex) {
+		return channelIndex + channelIndexOffset;
+	}
+
 	private Optional<BufferOrEvent> handleEmptyBuffer() throws Exception {
 		if (!inputGate.isFinished()) {
 			return Optional.empty();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 419cf16..800c33e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * Utility for creating {@link CheckpointedInputGate} based on checkpoint mode
  * for {@link StreamInputProcessor} and {@link StreamTwoInputProcessor}.
@@ -51,6 +53,45 @@ public class InputProcessorUtil {
 		return new CheckpointedInputGate(inputGate, bufferStorage, barrierHandler);
 	}
 
+	/**
+	 * @return a pair of {@link CheckpointedInputGate} created for two corresponding
+	 * {@link InputGate}s supplied as parameters.
+	 */
+	public static CheckpointedInputGate[] createCheckpointedInputGatePair(
+			AbstractInvokable toNotifyOnCheckpoint,
+			CheckpointingMode checkpointMode,
+			IOManager ioManager,
+			InputGate inputGate1,
+			InputGate inputGate2,
+			Configuration taskManagerConfig,
+			String taskName) throws IOException {
+
+		BufferStorage mainBufferStorage1 = createBufferStorage(
+			checkpointMode, ioManager, inputGate1.getPageSize(), taskManagerConfig, taskName);
+		BufferStorage mainBufferStorage2 = createBufferStorage(
+			checkpointMode, ioManager, inputGate2.getPageSize(), taskManagerConfig, taskName);
+		checkState(mainBufferStorage1.getMaxBufferedBytes() == mainBufferStorage2.getMaxBufferedBytes());
+
+		BufferStorage linkedBufferStorage1 = new LinkedBufferStorage(
+			mainBufferStorage1,
+			mainBufferStorage2,
+			mainBufferStorage1.getMaxBufferedBytes());
+		BufferStorage linkedBufferStorage2 = new LinkedBufferStorage(
+			mainBufferStorage2,
+			mainBufferStorage1,
+			mainBufferStorage1.getMaxBufferedBytes());
+
+		CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler(
+			checkpointMode,
+			inputGate1.getNumberOfInputChannels() + inputGate2.getNumberOfInputChannels(),
+			taskName,
+			toNotifyOnCheckpoint);
+		return new CheckpointedInputGate[] {
+			new CheckpointedInputGate(inputGate1, linkedBufferStorage1, barrierHandler),
+			new CheckpointedInputGate(inputGate2, linkedBufferStorage2, barrierHandler, inputGate1.getNumberOfInputChannels())
+		};
+	}
+
 	private static CheckpointBarrierHandler createCheckpointBarrierHandler(
 			CheckpointingMode checkpointMode,
 			int numberOfInputChannels,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
index 37c17db..d5172ac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
@@ -20,11 +20,13 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.InputSelectable;
 import org.apache.flink.streaming.api.operators.InputSelection;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
@@ -36,6 +38,7 @@ import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -101,13 +104,17 @@ public class StreamTwoInputSelectableProcessor<IN1, IN2> {
 		Collection<InputGate> inputGates2,
 		TypeSerializer<IN1> inputSerializer1,
 		TypeSerializer<IN2> inputSerializer2,
+		StreamTask<?, ?> streamTask,
+		CheckpointingMode checkpointingMode,
 		Object lock,
 		IOManager ioManager,
+		Configuration taskManagerConfig,
 		StreamStatusMaintainer streamStatusMaintainer,
 		TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
 		WatermarkGauge input1WatermarkGauge,
 		WatermarkGauge input2WatermarkGauge,
-		OperatorChain<?, ?> operatorChain) {
+		String taskName,
+		OperatorChain<?, ?> operatorChain) throws IOException {
 
 		checkState(streamOperator instanceof InputSelectable);
 
@@ -120,9 +127,17 @@ public class StreamTwoInputSelectableProcessor<IN1, IN2> {
 		InputGate unionedInputGate2 = InputGateUtil.createInputGate(inputGates2.toArray(new InputGate[0]));
 
 		// create a Input instance for each input
-		CachedBufferStorage bufferStorage = new CachedBufferStorage(unionedInputGate1.getPageSize());
-		this.input1 = new StreamTaskNetworkInput(new CheckpointedInputGate(unionedInputGate1, bufferStorage, new CheckpointBarrierDiscarder()), inputSerializer1, ioManager, 0);
-		this.input2 = new StreamTaskNetworkInput(new CheckpointedInputGate(unionedInputGate2, bufferStorage, new CheckpointBarrierDiscarder()), inputSerializer2, ioManager, 1);
+		CheckpointedInputGate[] checkpointedInputGates = InputProcessorUtil.createCheckpointedInputGatePair(
+			streamTask,
+			checkpointingMode,
+			ioManager,
+			unionedInputGate1,
+			unionedInputGate2,
+			taskManagerConfig,
+			taskName);
+		checkState(checkpointedInputGates.length == 2);
+		this.input1 = new StreamTaskNetworkInput(checkpointedInputGates[0], inputSerializer1, ioManager, 0);
+		this.input2 = new StreamTaskNetworkInput(checkpointedInputGates[1], inputSerializer2, ioManager, 1);
 
 		this.statusWatermarkValve1 = new StatusWatermarkValve(
 			unionedInputGate1.getNumberOfInputChannels(),
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
index b577b20..cde5a5a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor;
 
+import java.io.IOException;
 import java.util.Collection;
 
 /**
@@ -44,17 +45,21 @@ public class TwoInputSelectableStreamTask<IN1, IN2, OUT> extends AbstractTwoInpu
 		Collection<InputGate> inputGates1,
 		Collection<InputGate> inputGates2,
 		TypeSerializer<IN1> inputDeserializer1,
-		TypeSerializer<IN2> inputDeserializer2) {
+		TypeSerializer<IN2> inputDeserializer2) throws IOException {
 
 		this.inputProcessor = new StreamTwoInputSelectableProcessor<>(
 			inputGates1, inputGates2,
 			inputDeserializer1, inputDeserializer2,
+			this,
+			getConfiguration().getCheckpointMode(),
 			getCheckpointLock(),
 			getEnvironment().getIOManager(),
+			getEnvironment().getTaskManagerInfo().getConfiguration(),
 			getStreamStatusMaintainer(),
 			this.headOperator,
 			input1WatermarkGauge,
 			input2WatermarkGauge,
+			getTaskNameWithSubtaskAndId(),
 			operatorChain);
 	}