You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/01 14:41:22 UTC
[flink] 13/16: [FLINK-12777][operator] Use CheckpointedInputGate
StreamTwoInputSelectableProcessor
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6c7a69a0442d6bfb3b5d86006db462756a51ec7c
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Jun 18 17:26:47 2019 +0200
[FLINK-12777][operator] Use CheckpointedInputGate StreamTwoInputSelectableProcessor
---
.../runtime/io/CheckpointBarrierDiscarder.java | 74 ----------------------
.../runtime/io/CheckpointedInputGate.java | 23 ++++++-
.../streaming/runtime/io/InputProcessorUtil.java | 41 ++++++++++++
.../io/StreamTwoInputSelectableProcessor.java | 23 +++++--
.../tasks/TwoInputSelectableStreamTask.java | 7 +-
5 files changed, 86 insertions(+), 82 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java
deleted file mode 100644
index 4c6cdab..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-import java.io.IOException;
-
-/**
- * The {@link CheckpointBarrierDiscarder} discards checkpoint barriers have been received from which input channels.
- */
-@Internal
-public class CheckpointBarrierDiscarder extends CheckpointBarrierHandler {
- public CheckpointBarrierDiscarder() {
- super(null);
- }
-
- @Override
- public void releaseBlocksAndResetBarriers() throws IOException {
- }
-
- @Override
- public boolean isBlocked(int channelIndex) {
- return false;
- }
-
- @Override
- public boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception {
- return false;
- }
-
- @Override
- public boolean processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
- return false;
- }
-
- @Override
- public boolean processEndOfPartition() throws Exception {
- return false;
- }
-
- @Override
- public long getLatestCheckpointId() {
- return 0;
- }
-
- @Override
- public long getAlignmentDurationNanos() {
- return 0;
- }
-
- @Override
- public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws Exception {
-
- }
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
index 7604d0a..ce80e30 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
@@ -52,6 +52,8 @@ public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> {
/** The gate that the buffer draws its input from. */
private final InputGate inputGate;
+ private final int channelIndexOffset;
+
private final BufferStorage bufferStorage;
/** Flag to indicate whether we have drawn all available input. */
@@ -89,6 +91,13 @@ public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> {
);
}
+ public CheckpointedInputGate(
+ InputGate inputGate,
+ BufferStorage bufferStorage,
+ CheckpointBarrierHandler barrierHandler) {
+ this(inputGate, bufferStorage, barrierHandler, 0);
+ }
+
/**
* Creates a new checkpoint stream aligner.
*
@@ -99,12 +108,16 @@ public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> {
* @param inputGate The input gate to draw the buffers and events from.
* @param bufferStorage The storage to hold the buffers and events for blocked channels.
* @param barrierHandler Handler that controls which channels are blocked.
+ * @param channelIndexOffset Optional offset added to channelIndex returned from the inputGate
+ * before passing it to the barrierHandler.
*/
public CheckpointedInputGate(
InputGate inputGate,
BufferStorage bufferStorage,
- CheckpointBarrierHandler barrierHandler) {
+ CheckpointBarrierHandler barrierHandler,
+ int channelIndexOffset) {
this.inputGate = inputGate;
+ this.channelIndexOffset = channelIndexOffset;
this.bufferStorage = checkNotNull(bufferStorage);
this.barrierHandler = barrierHandler;
}
@@ -138,7 +151,7 @@ public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> {
}
BufferOrEvent bufferOrEvent = next.get();
- if (barrierHandler.isBlocked(bufferOrEvent.getChannelIndex())) {
+ if (barrierHandler.isBlocked(offsetChannelIndex(bufferOrEvent.getChannelIndex()))) {
// if the channel is blocked, we just store the BufferOrEvent
bufferStorage.add(bufferOrEvent);
if (bufferStorage.isFull()) {
@@ -153,7 +166,7 @@ public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> {
CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
if (!endOfInputGate) {
// process barriers only if there is a chance of the checkpoint completing
- if (barrierHandler.processBarrier(checkpointBarrier, bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) {
+ if (barrierHandler.processBarrier(checkpointBarrier, offsetChannelIndex(bufferOrEvent.getChannelIndex()), bufferStorage.getPendingBytes())) {
bufferStorage.rollOver();
}
}
@@ -174,6 +187,10 @@ public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> {
}
}
+ private int offsetChannelIndex(int channelIndex) {
+ return channelIndex + channelIndexOffset;
+ }
+
private Optional<BufferOrEvent> handleEmptyBuffer() throws Exception {
if (!inputGate.isFinished()) {
return Optional.empty();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 419cf16..800c33e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.CheckpointingMode;
import java.io.IOException;
+import static org.apache.flink.util.Preconditions.checkState;
+
/**
* Utility for creating {@link CheckpointedInputGate} based on checkpoint mode
* for {@link StreamInputProcessor} and {@link StreamTwoInputProcessor}.
@@ -51,6 +53,45 @@ public class InputProcessorUtil {
return new CheckpointedInputGate(inputGate, bufferStorage, barrierHandler);
}
+ /**
+ * @return a pair of {@link CheckpointedInputGate} created for two corresponding
+ * {@link InputGate}s supplied as parameters.
+ */
+ public static CheckpointedInputGate[] createCheckpointedInputGatePair(
+ AbstractInvokable toNotifyOnCheckpoint,
+ CheckpointingMode checkpointMode,
+ IOManager ioManager,
+ InputGate inputGate1,
+ InputGate inputGate2,
+ Configuration taskManagerConfig,
+ String taskName) throws IOException {
+
+ BufferStorage mainBufferStorage1 = createBufferStorage(
+ checkpointMode, ioManager, inputGate1.getPageSize(), taskManagerConfig, taskName);
+ BufferStorage mainBufferStorage2 = createBufferStorage(
+ checkpointMode, ioManager, inputGate2.getPageSize(), taskManagerConfig, taskName);
+ checkState(mainBufferStorage1.getMaxBufferedBytes() == mainBufferStorage2.getMaxBufferedBytes());
+
+ BufferStorage linkedBufferStorage1 = new LinkedBufferStorage(
+ mainBufferStorage1,
+ mainBufferStorage2,
+ mainBufferStorage1.getMaxBufferedBytes());
+ BufferStorage linkedBufferStorage2 = new LinkedBufferStorage(
+ mainBufferStorage2,
+ mainBufferStorage1,
+ mainBufferStorage1.getMaxBufferedBytes());
+
+ CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler(
+ checkpointMode,
+ inputGate1.getNumberOfInputChannels() + inputGate2.getNumberOfInputChannels(),
+ taskName,
+ toNotifyOnCheckpoint);
+ return new CheckpointedInputGate[] {
+ new CheckpointedInputGate(inputGate1, linkedBufferStorage1, barrierHandler),
+ new CheckpointedInputGate(inputGate2, linkedBufferStorage2, barrierHandler, inputGate1.getNumberOfInputChannels())
+ };
+ }
+
private static CheckpointBarrierHandler createCheckpointBarrierHandler(
CheckpointingMode checkpointMode,
int numberOfInputChannels,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
index 37c17db..d5172ac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
@@ -20,11 +20,13 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
@@ -36,6 +38,7 @@ import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
@@ -101,13 +104,17 @@ public class StreamTwoInputSelectableProcessor<IN1, IN2> {
Collection<InputGate> inputGates2,
TypeSerializer<IN1> inputSerializer1,
TypeSerializer<IN2> inputSerializer2,
+ StreamTask<?, ?> streamTask,
+ CheckpointingMode checkpointingMode,
Object lock,
IOManager ioManager,
+ Configuration taskManagerConfig,
StreamStatusMaintainer streamStatusMaintainer,
TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
WatermarkGauge input1WatermarkGauge,
WatermarkGauge input2WatermarkGauge,
- OperatorChain<?, ?> operatorChain) {
+ String taskName,
+ OperatorChain<?, ?> operatorChain) throws IOException {
checkState(streamOperator instanceof InputSelectable);
@@ -120,9 +127,17 @@ public class StreamTwoInputSelectableProcessor<IN1, IN2> {
InputGate unionedInputGate2 = InputGateUtil.createInputGate(inputGates2.toArray(new InputGate[0]));
// create a Input instance for each input
- CachedBufferStorage bufferStorage = new CachedBufferStorage(unionedInputGate1.getPageSize());
- this.input1 = new StreamTaskNetworkInput(new CheckpointedInputGate(unionedInputGate1, bufferStorage, new CheckpointBarrierDiscarder()), inputSerializer1, ioManager, 0);
- this.input2 = new StreamTaskNetworkInput(new CheckpointedInputGate(unionedInputGate2, bufferStorage, new CheckpointBarrierDiscarder()), inputSerializer2, ioManager, 1);
+ CheckpointedInputGate[] checkpointedInputGates = InputProcessorUtil.createCheckpointedInputGatePair(
+ streamTask,
+ checkpointingMode,
+ ioManager,
+ unionedInputGate1,
+ unionedInputGate2,
+ taskManagerConfig,
+ taskName);
+ checkState(checkpointedInputGates.length == 2);
+ this.input1 = new StreamTaskNetworkInput(checkpointedInputGates[0], inputSerializer1, ioManager, 0);
+ this.input2 = new StreamTaskNetworkInput(checkpointedInputGates[1], inputSerializer2, ioManager, 1);
this.statusWatermarkValve1 = new StatusWatermarkValve(
unionedInputGate1.getNumberOfInputChannels(),
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
index b577b20..cde5a5a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor;
+import java.io.IOException;
import java.util.Collection;
/**
@@ -44,17 +45,21 @@ public class TwoInputSelectableStreamTask<IN1, IN2, OUT> extends AbstractTwoInpu
Collection<InputGate> inputGates1,
Collection<InputGate> inputGates2,
TypeSerializer<IN1> inputDeserializer1,
- TypeSerializer<IN2> inputDeserializer2) {
+ TypeSerializer<IN2> inputDeserializer2) throws IOException {
this.inputProcessor = new StreamTwoInputSelectableProcessor<>(
inputGates1, inputGates2,
inputDeserializer1, inputDeserializer2,
+ this,
+ getConfiguration().getCheckpointMode(),
getCheckpointLock(),
getEnvironment().getIOManager(),
+ getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),
this.headOperator,
input1WatermarkGauge,
input2WatermarkGauge,
+ getTaskNameWithSubtaskAndId(),
operatorChain);
}