You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/01 14:41:17 UTC
[flink] 08/16: [FLIKN-12777][network] Refactor BarrierTracker to
use the same code structure as BarrierBuffer
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 52525884826cd0e99b2309d89d74fa270d51ec90
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Sat Jun 15 10:18:01 2019 +0200
[FLIKN-12777][network] Refactor BarrierTracker to use the same code structure as BarrierBuffer
---
.../flink/streaming/runtime/io/BarrierBuffer.java | 52 ++++---
.../streaming/runtime/io/BarrierDiscarder.java | 105 --------------
.../runtime/io/BufferOrEventSequence.java | 2 +-
.../runtime/io/CheckpointBarrierAligner.java | 65 ++-------
.../runtime/io/CheckpointBarrierDiscarder.java | 74 ++++++++++
.../runtime/io/CheckpointBarrierHandler.java | 85 ++++++++---
...rTracker.java => CheckpointBarrierTracker.java} | 158 +++++++--------------
...rierHandler.java => CheckpointedInputGate.java} | 7 +-
.../streaming/runtime/io/EmptyBufferStorage.java | 73 ++++++++++
.../streaming/runtime/io/InputProcessorUtil.java | 11 +-
.../streaming/runtime/io/StreamInputProcessor.java | 2 +-
.../runtime/io/StreamTaskNetworkInput.java | 6 +-
.../runtime/io/StreamTwoInputProcessor.java | 2 +-
.../io/StreamTwoInputSelectableProcessor.java | 5 +-
.../runtime/io/BarrierBufferTestBase.java | 32 ++---
.../streaming/runtime/io/BarrierTrackerTest.java | 11 +-
16 files changed, 349 insertions(+), 341 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 1594389..8dcc005 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -38,7 +38,7 @@ import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
+ * The barrier buffer is {@link CheckpointedInputGate} that blocks inputs with barriers until
* all inputs have received the barrier for a given checkpoint.
*
* <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
@@ -46,11 +46,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* the blocks are released.
*/
@Internal
-public class BarrierBuffer implements CheckpointBarrierHandler {
+public class BarrierBuffer implements CheckpointedInputGate {
private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
- private final CheckpointBarrierAligner barrierAligner;
+ private final CheckpointBarrierHandler barrierHandler;
/** The gate that the buffer draws its input from. */
private final InputGate inputGate;
@@ -77,6 +77,21 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
this (inputGate, bufferStorage, "Testing: No task associated", null);
}
+ BarrierBuffer(
+ InputGate inputGate,
+ BufferStorage bufferStorage,
+ String taskName,
+ @Nullable AbstractInvokable toNotifyOnCheckpoint) {
+ this(
+ inputGate,
+ bufferStorage,
+ new CheckpointBarrierAligner(
+ inputGate.getNumberOfInputChannels(),
+ taskName,
+ toNotifyOnCheckpoint)
+ );
+ }
+
/**
* Creates a new checkpoint stream aligner.
*
@@ -86,20 +101,15 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
*
* @param inputGate The input gate to draw the buffers and events from.
* @param bufferStorage The storage to hold the buffers and events for blocked channels.
- * @param taskName The task name for logging.
- * @param toNotifyOnCheckpoint optional Handler that receives the checkpoint notifications.
+ * @param barrierHandler Handler that controls which channels are blocked.
*/
BarrierBuffer(
InputGate inputGate,
BufferStorage bufferStorage,
- String taskName,
- @Nullable AbstractInvokable toNotifyOnCheckpoint) {
+ CheckpointBarrierHandler barrierHandler) {
this.inputGate = inputGate;
this.bufferStorage = checkNotNull(bufferStorage);
- this.barrierAligner = new CheckpointBarrierAligner(
- inputGate.getNumberOfInputChannels(),
- taskName,
- toNotifyOnCheckpoint);
+ this.barrierHandler = barrierHandler;
}
@Override
@@ -131,11 +141,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
BufferOrEvent bufferOrEvent = next.get();
- if (barrierAligner.isBlocked(bufferOrEvent.getChannelIndex())) {
+ if (barrierHandler.isBlocked(bufferOrEvent.getChannelIndex())) {
// if the channel is blocked, we just store the BufferOrEvent
bufferStorage.add(bufferOrEvent);
if (bufferStorage.isFull()) {
- barrierAligner.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes());
+ barrierHandler.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes());
bufferStorage.rollOver();
}
}
@@ -146,19 +156,19 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
if (!endOfInputGate) {
// process barriers only if there is a chance of the checkpoint completing
- if (barrierAligner.processBarrier(checkpointBarrier, bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) {
+ if (barrierHandler.processBarrier(checkpointBarrier, bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) {
bufferStorage.rollOver();
}
}
}
else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
- if (barrierAligner.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent())) {
+ if (barrierHandler.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent())) {
bufferStorage.rollOver();
}
}
else {
if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
- if (barrierAligner.processEndOfPartition()) {
+ if (barrierHandler.processEndOfPartition()) {
bufferStorage.rollOver();
}
}
@@ -178,7 +188,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
} else {
// end of input stream. stream continues with the buffered data
endOfInputGate = true;
- barrierAligner.releaseBlocksAndResetBarriers();
+ barrierHandler.releaseBlocksAndResetBarriers();
bufferStorage.rollOver();
return pollNext();
}
@@ -208,13 +218,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
*
* @return The ID of the pending of completed checkpoint.
*/
- public long getCurrentCheckpointId() {
- return barrierAligner.getCurrentCheckpointId();
+ public long getLatestCheckpointId() {
+ return barrierHandler.getLatestCheckpointId();
}
@Override
public long getAlignmentDurationNanos() {
- return barrierAligner.getAlignmentDurationNanos();
+ return barrierHandler.getAlignmentDurationNanos();
}
@Override
@@ -228,6 +238,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
@Override
public String toString() {
- return barrierAligner.toString();
+ return barrierHandler.toString();
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java
deleted file mode 100644
index c33c940..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * The BarrierDiscarder discards checkpoint barriers have been received from which input channels.
- */
-@Internal
-public class BarrierDiscarder implements CheckpointBarrierHandler {
-
- // ------------------------------------------------------------------------
-
- /** The input gate, to draw the buffers and events from. */
- private final InputGate inputGate;
-
- /**
- * The number of channels. Once that many barriers have been received for a checkpoint, the
- * checkpoint is considered complete.
- */
- private final int totalNumberOfInputChannels;
-
- // ------------------------------------------------------------------------
-
- public BarrierDiscarder(InputGate inputGate) {
- this.inputGate = inputGate;
- this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
- }
-
- @Override
- public CompletableFuture<?> isAvailable() {
- return inputGate.isAvailable();
- }
-
- @Override
- public boolean isFinished() {
- return inputGate.isFinished();
- }
-
- @Override
- public Optional<BufferOrEvent> pollNext() throws Exception {
- while (true) {
- Optional<BufferOrEvent> next = inputGate.pollNext();
- if (!next.isPresent()) {
- // buffer or input exhausted
- return next;
- }
-
- BufferOrEvent bufferOrEvent = next.get();
- if (bufferOrEvent.isBuffer()) {
- return next;
- }
- else if (bufferOrEvent.getEvent().getClass() != CheckpointBarrier.class &&
- bufferOrEvent.getEvent().getClass() != CancelCheckpointMarker.class) {
- // some other event
- return next;
- }
- }
- }
-
- @Override
- public void cleanup() {
-
- }
-
- @Override
- public boolean isEmpty() {
- return true;
- }
-
- @Override
- public long getAlignmentDurationNanos() {
- // this one does not do alignment at all
- return 0L;
- }
-
- @Override
- public int getNumberOfInputChannels() {
- return totalNumberOfInputChannels;
- }
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java
index c5bde1b..21649bf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java
@@ -27,7 +27,7 @@ import java.io.IOException;
/**
* This class represents a sequence of buffers and events which are blocked by
- * {@link CheckpointBarrierHandler}. The sequence of buffers and events can be
+ * {@link CheckpointedInputGate}. The sequence of buffers and events can be
* read back using the method {@link #getNext()}.
*/
@Internal
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
index 30e05c1..482ba65 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
@@ -21,8 +21,6 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -40,7 +38,7 @@ import java.io.IOException;
* release blocked channels.
*/
@Internal
-public class CheckpointBarrierAligner {
+public class CheckpointBarrierAligner extends CheckpointBarrierHandler {
private static final Logger LOG = LoggerFactory.getLogger(CheckpointBarrierAligner.class);
@@ -52,9 +50,6 @@ public class CheckpointBarrierAligner {
private final String taskName;
- @Nullable
- private final AbstractInvokable toNotifyOnCheckpoint;
-
/** The ID of the checkpoint for which we expect barriers. */
private long currentCheckpointId = -1L;
@@ -77,13 +72,14 @@ public class CheckpointBarrierAligner {
int totalNumberOfInputChannels,
String taskName,
@Nullable AbstractInvokable toNotifyOnCheckpoint) {
+ super(toNotifyOnCheckpoint);
this.totalNumberOfInputChannels = totalNumberOfInputChannels;
this.taskName = taskName;
- this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
this.blockedChannels = new boolean[totalNumberOfInputChannels];
}
+ @Override
public void releaseBlocksAndResetBarriers() throws IOException {
LOG.debug("{}: End of stream alignment, feeding buffered data back.", taskName);
@@ -100,19 +96,12 @@ public class CheckpointBarrierAligner {
}
}
- /**
- * Checks whether the channel with the given index is blocked.
- *
- * @param channelIndex The channel index to check.
- * @return True if the channel is blocked, false if not.
- */
+ @Override
public boolean isBlocked(int channelIndex) {
return blockedChannels[channelIndex];
}
- /**
- * @return true if some blocked data should be unblocked/rolled over.
- */
+ @Override
public boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception {
final long barrierId = receivedBarrier.getId();
@@ -121,7 +110,7 @@ public class CheckpointBarrierAligner {
if (barrierId > currentCheckpointId) {
// new checkpoint
currentCheckpointId = barrierId;
- notifyCheckpoint(receivedBarrier, bufferedBytes);
+ notifyCheckpoint(receivedBarrier, bufferedBytes, latestAlignmentDurationNanos);
}
return false;
}
@@ -185,7 +174,7 @@ public class CheckpointBarrierAligner {
}
releaseBlocksAndResetBarriers();
- notifyCheckpoint(receivedBarrier, bufferedBytes);
+ notifyCheckpoint(receivedBarrier, bufferedBytes, latestAlignmentDurationNanos);
return true;
}
return checkpointAborted;
@@ -222,9 +211,7 @@ public class CheckpointBarrierAligner {
}
}
- /**
- * @return true if some blocked data should be unblocked/rolled over.
- */
+ @Override
public boolean processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
final long barrierId = cancelBarrier.getCheckpointId();
@@ -300,9 +287,7 @@ public class CheckpointBarrierAligner {
return false;
}
- /**
- * @return true if some blocked data should be unblocked/rolled over.
- */
+ @Override
public boolean processEndOfPartition() throws Exception {
numClosedChannels++;
@@ -317,37 +302,12 @@ public class CheckpointBarrierAligner {
return false;
}
- private void notifyCheckpoint(CheckpointBarrier checkpointBarrier, long bufferedBytes) throws Exception {
- if (toNotifyOnCheckpoint != null) {
- CheckpointMetaData checkpointMetaData =
- new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
-
- CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
- .setBytesBufferedInAlignment(bufferedBytes)
- .setAlignmentDurationNanos(latestAlignmentDurationNanos);
-
- toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
- checkpointMetaData,
- checkpointBarrier.getCheckpointOptions(),
- checkpointMetrics);
- }
- }
-
- private void notifyAbortOnCancellationBarrier(long checkpointId) throws Exception {
- notifyAbort(checkpointId,
- new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
- }
-
- private void notifyAbort(long checkpointId, CheckpointException cause) throws Exception {
- if (toNotifyOnCheckpoint != null) {
- toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
- }
- }
-
- public long getCurrentCheckpointId() {
+ @Override
+ public long getLatestCheckpointId() {
return currentCheckpointId;
}
+ @Override
public long getAlignmentDurationNanos() {
if (startOfAlignmentTimestamp <= 0) {
return latestAlignmentDurationNanos;
@@ -365,6 +325,7 @@ public class CheckpointBarrierAligner {
numClosedChannels);
}
+ @Override
public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws Exception {
releaseBlocksAndResetBarriers();
notifyAbort(currentCheckpointId,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java
new file mode 100644
index 0000000..4c6cdab
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+
+import java.io.IOException;
+
+/**
+ * The {@link CheckpointBarrierDiscarder} discards checkpoint barriers have been received from which input channels.
+ */
+@Internal
+public class CheckpointBarrierDiscarder extends CheckpointBarrierHandler {
+ public CheckpointBarrierDiscarder() {
+ super(null);
+ }
+
+ @Override
+ public void releaseBlocksAndResetBarriers() throws IOException {
+ }
+
+ @Override
+ public boolean isBlocked(int channelIndex) {
+ return false;
+ }
+
+ @Override
+ public boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception {
+ return false;
+ }
+
+ @Override
+ public boolean processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
+ return false;
+ }
+
+ @Override
+ public boolean processEndOfPartition() throws Exception {
+ return false;
+ }
+
+ @Override
+ public long getLatestCheckpointId() {
+ return 0;
+ }
+
+ @Override
+ public long getAlignmentDurationNanos() {
+ return 0;
+ }
+
+ @Override
+ public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws Exception {
+
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 2ee1a97..41e043e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -18,43 +18,88 @@
package org.apache.flink.streaming.runtime.io;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.io.AsyncDataInput;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+import javax.annotation.Nullable;
import java.io.IOException;
/**
- * The CheckpointBarrierHandler reacts to checkpoint barrier arriving from the input channels.
+ * The {@link CheckpointBarrierHandler} reacts to checkpoint barrier arriving from the input channels.
* Different implementations may either simply track barriers, or block certain inputs on
* barriers.
*/
-@Internal
-public interface CheckpointBarrierHandler extends AsyncDataInput<BufferOrEvent> {
+public abstract class CheckpointBarrierHandler {
+
+ /** The listener to be notified on complete checkpoints. */
+ @Nullable
+ private final AbstractInvokable toNotifyOnCheckpoint;
+
+ public CheckpointBarrierHandler(@Nullable AbstractInvokable toNotifyOnCheckpoint) {
+ this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
+ }
+
+ public abstract void releaseBlocksAndResetBarriers() throws IOException;
+
/**
- * Cleans up all internally held resources.
+ * Checks whether the channel with the given index is blocked.
*
- * @throws IOException Thrown if the cleanup of I/O resources failed.
+ * @param channelIndex The channel index to check.
+ * @return True if the channel is blocked, false if not.
*/
- void cleanup() throws IOException;
+ public abstract boolean isBlocked(int channelIndex);
/**
- * Checks if the barrier handler has buffered any data internally.
- * @return {@code True}, if no data is buffered internally, {@code false} otherwise.
+ * @return true if some blocked data should be unblocked/rolled over.
*/
- boolean isEmpty();
+ public abstract boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception;
/**
- * Gets the time that the latest alignment took, in nanoseconds.
- * If there is currently an alignment in progress, it will return the time spent in the
- * current alignment so far.
- *
- * @return The duration in nanoseconds
+ * @return true if some blocked data should be unblocked/rolled over.
*/
- long getAlignmentDurationNanos();
+ public abstract boolean processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception;
/**
- * @return number of underlying input channels.
+ * @return true if some blocked data should be unblocked/rolled over.
*/
- int getNumberOfInputChannels();
+ public abstract boolean processEndOfPartition() throws Exception;
+
+ public abstract long getLatestCheckpointId();
+
+ public abstract long getAlignmentDurationNanos();
+
+ public abstract void checkpointSizeLimitExceeded(long maxBufferedBytes) throws Exception;
+
+ protected void notifyCheckpoint(CheckpointBarrier checkpointBarrier, long bufferedBytes, long alignmentDurationNanos) throws Exception {
+ if (toNotifyOnCheckpoint != null) {
+ CheckpointMetaData checkpointMetaData =
+ new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
+
+ CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
+ .setBytesBufferedInAlignment(bufferedBytes)
+ .setAlignmentDurationNanos(alignmentDurationNanos);
+
+ toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+ checkpointMetaData,
+ checkpointBarrier.getCheckpointOptions(),
+ checkpointMetrics);
+ }
+ }
+
+ protected void notifyAbortOnCancellationBarrier(long checkpointId) throws Exception {
+ notifyAbort(checkpointId,
+ new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
+ }
+
+ protected void notifyAbort(long checkpointId, CheckpointException cause) throws Exception {
+ if (toNotifyOnCheckpoint != null) {
+ toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
+ }
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTracker.java
similarity index 61%
rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTracker.java
index f7629bb..0ec9004 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTracker.java
@@ -21,13 +21,8 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.slf4j.Logger;
@@ -36,24 +31,22 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayDeque;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
/**
- * The BarrierTracker keeps track of what checkpoint barriers have been received from
+ * The {@link CheckpointBarrierTracker} keeps track of what checkpoint barriers have been received from
* which input channels. Once it has observed all checkpoint barriers for a checkpoint ID,
* it notifies its listener of a completed checkpoint.
*
- * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input
+ * <p>Unlike the {@link CheckpointBarrierAligner}, the BarrierTracker does not block the input
* channels that have sent barriers, so it cannot be used to gain "exactly-once" processing
* guarantees. It can, however, be used to gain "at least once" processing guarantees.
*
* <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.
*/
@Internal
-public class BarrierTracker implements CheckpointBarrierHandler {
+public class CheckpointBarrierTracker extends CheckpointBarrierHandler {
- private static final Logger LOG = LoggerFactory.getLogger(BarrierTracker.class);
+ private static final Logger LOG = LoggerFactory.getLogger(CheckpointBarrierTracker.class);
/**
* The tracker tracks a maximum number of checkpoints, for which some, but not all barriers
@@ -63,9 +56,6 @@ public class BarrierTracker implements CheckpointBarrierHandler {
// ------------------------------------------------------------------------
- /** The input gate, to draw the buffers and events from. */
- private final InputGate inputGate;
-
/**
* The number of channels. Once that many barriers have been received for a checkpoint, the
* checkpoint is considered complete.
@@ -78,89 +68,36 @@ public class BarrierTracker implements CheckpointBarrierHandler {
*/
private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
- /** The listener to be notified on complete checkpoints. */
- private final AbstractInvokable toNotifyOnCheckpoint;
-
/** The highest checkpoint ID encountered so far. */
private long latestPendingCheckpointID = -1;
- // ------------------------------------------------------------------------
-
- public BarrierTracker(InputGate inputGate) {
- this(inputGate, null);
+ public CheckpointBarrierTracker(int totalNumberOfInputChannels) {
+ this(totalNumberOfInputChannels, null);
}
- public BarrierTracker(InputGate inputGate, @Nullable AbstractInvokable toNotifyOnCheckpoint) {
- this.inputGate = inputGate;
- this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
+ public CheckpointBarrierTracker(int totalNumberOfInputChannels, @Nullable AbstractInvokable toNotifyOnCheckpoint) {
+ super(toNotifyOnCheckpoint);
+ this.totalNumberOfInputChannels = totalNumberOfInputChannels;
this.pendingCheckpoints = new ArrayDeque<>();
- this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
}
@Override
- public CompletableFuture<?> isAvailable() {
- return inputGate.isAvailable();
+ public void releaseBlocksAndResetBarriers() {
}
@Override
- public boolean isFinished() {
- return inputGate.isFinished();
+ public boolean isBlocked(int channelIndex) {
+ return false;
}
@Override
- public Optional<BufferOrEvent> pollNext() throws Exception {
- while (true) {
- Optional<BufferOrEvent> next = inputGate.pollNext();
- if (!next.isPresent()) {
- // buffer or input exhausted
- return next;
- }
-
- BufferOrEvent bufferOrEvent = next.get();
- if (bufferOrEvent.isBuffer()) {
- return next;
- }
- else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
- processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
- }
- else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
- processCheckpointAbortBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
- }
- else {
- // some other event
- return next;
- }
- }
- }
-
- @Override
- public void cleanup() {
- pendingCheckpoints.clear();
- }
-
- @Override
- public boolean isEmpty() {
- return pendingCheckpoints.isEmpty();
- }
-
- @Override
- public long getAlignmentDurationNanos() {
- // this one does not do alignment at all
- return 0L;
- }
-
- @Override
- public int getNumberOfInputChannels() {
- return totalNumberOfInputChannels;
- }
-
- private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
+ public boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception {
final long barrierId = receivedBarrier.getId();
// fast path for single channel trackers
if (totalNumberOfInputChannels == 1) {
- notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
- return;
+ notifyCheckpoint(receivedBarrier, 0, 0);
+ return false;
}
// general path for multiple input channels
@@ -169,20 +106,20 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
// find the checkpoint barrier in the queue of pending barriers
- CheckpointBarrierCount cbc = null;
+ CheckpointBarrierCount barrierCount = null;
int pos = 0;
for (CheckpointBarrierCount next : pendingCheckpoints) {
if (next.checkpointId == barrierId) {
- cbc = next;
+ barrierCount = next;
break;
}
pos++;
}
- if (cbc != null) {
+ if (barrierCount != null) {
// add one to the count to that barrier and check for completion
- int numBarriersNew = cbc.incrementBarrierCount();
+ int numBarriersNew = barrierCount.incrementBarrierCount();
if (numBarriersNew == totalNumberOfInputChannels) {
// checkpoint can be triggered (or is aborted and all barriers have been seen)
// first, remove this checkpoint and all all prior pending
@@ -192,12 +129,12 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
// notify the listener
- if (!cbc.isAborted()) {
+ if (!barrierCount.isAborted()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received all barriers for checkpoint {}", barrierId);
}
- notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
+ notifyCheckpoint(receivedBarrier, 0, 0);
}
}
}
@@ -216,19 +153,21 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
}
}
+ return false;
}
- private void processCheckpointAbortBarrier(CancelCheckpointMarker barrier, int channelIndex) throws Exception {
- final long checkpointId = barrier.getCheckpointId();
+ @Override
+ public boolean processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
+ final long checkpointId = cancelBarrier.getCheckpointId();
if (LOG.isDebugEnabled()) {
- LOG.debug("Received cancellation barrier for checkpoint {} from channel {}", checkpointId, channelIndex);
+ LOG.debug("Received cancellation barrier for checkpoint {}", checkpointId);
}
// fast path for single channel trackers
if (totalNumberOfInputChannels == 1) {
- notifyAbort(checkpointId);
- return;
+ notifyAbortOnCancellationBarrier(checkpointId);
+ return false;
}
// -- general path for multiple input channels --
@@ -241,7 +180,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
if (cbc.markAborted()) {
// abort the subsumed checkpoints if not already done
- notifyAbort(cbc.checkpointId());
+ notifyAbortOnCancellationBarrier(cbc.checkpointId());
}
}
@@ -249,7 +188,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
// make sure the checkpoint is remembered as aborted
if (cbc.markAborted()) {
// this was the first time the checkpoint was aborted - notify
- notifyAbort(checkpointId);
+ notifyAbortOnCancellationBarrier(checkpointId);
}
// we still count the barriers to be able to remove the entry once all barriers have been seen
@@ -259,7 +198,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
}
else if (checkpointId > latestPendingCheckpointID) {
- notifyAbort(checkpointId);
+ notifyAbortOnCancellationBarrier(checkpointId);
latestPendingCheckpointID = checkpointId;
@@ -272,28 +211,33 @@ public class BarrierTracker implements CheckpointBarrierHandler {
} else {
// trailing cancellation barrier which was already cancelled
}
+ return false;
}
- private void notifyCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
- if (toNotifyOnCheckpoint != null) {
- CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
- CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
- .setBytesBufferedInAlignment(0L)
- .setAlignmentDurationNanos(0L);
-
- toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointOptions, checkpointMetrics);
+ @Override
+ public boolean processEndOfPartition() throws Exception {
+ while (!pendingCheckpoints.isEmpty()) {
+ CheckpointBarrierCount barrierCount = pendingCheckpoints.removeFirst();
+ if (barrierCount.markAborted()) {
+ notifyAbort(barrierCount.checkpointId(),
+ new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+ }
}
+ return false;
}
- private void notifyAbort(long checkpointId) throws Exception {
- if (toNotifyOnCheckpoint != null) {
- toNotifyOnCheckpoint.abortCheckpointOnBarrier(
- checkpointId,
- new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
- }
+ public long getLatestCheckpointId() {
+ return pendingCheckpoints.isEmpty() ? -1 : pendingCheckpoints.peekLast().checkpointId();
}
- // ------------------------------------------------------------------------
+ public long getAlignmentDurationNanos() {
+ return 0;
+ }
+
+ @Override
+ public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws Exception {
+ throw new UnsupportedOperationException("This should never happened as this class doesn't block any data");
+ }
/**
* Simple class for a checkpoint ID with a barrier counter.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
similarity index 84%
copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
index 2ee1a97..cdbbfbc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
@@ -25,12 +25,11 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import java.io.IOException;
/**
- * The CheckpointBarrierHandler reacts to checkpoint barrier arriving from the input channels.
- * Different implementations may either simply track barriers, or block certain inputs on
- * barriers.
+ * The {@link CheckpointedInputGate} uses {@link CheckpointBarrierHandler} to handle incoming
+ * {@link org.apache.flink.runtime.io.network.api.CheckpointBarrier} from the {@link org.apache.flink.runtime.io.network.partition.consumer.InputGate}.
*/
@Internal
-public interface CheckpointBarrierHandler extends AsyncDataInput<BufferOrEvent> {
+public interface CheckpointedInputGate extends AsyncDataInput<BufferOrEvent> {
/**
* Cleans up all internally held resources.
*
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/EmptyBufferStorage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/EmptyBufferStorage.java
new file mode 100644
index 0000000..5535c49
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/EmptyBufferStorage.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Always empty implementation of {@link BufferStorage}. It doesn't allow for adding any data.
+ */
+@Internal
+public class EmptyBufferStorage implements BufferStorage {
+ @Override
+ public void add(BufferOrEvent boe) throws IOException {
+ throw new UnsupportedOperationException("Adding to EmptyBufferStorage is unsupported");
+ }
+
+ @Override
+ public boolean isFull() {
+ return false;
+ }
+
+ @Override
+ public void rollOver() throws IOException {
+ }
+
+ @Override
+ public long getPendingBytes() {
+ return 0;
+ }
+
+ @Override
+ public long getRolledBytes() {
+ return 0;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return true;
+ }
+
+ @Override
+ public Optional<BufferOrEvent> pollNext() throws IOException {
+ return Optional.empty();
+ }
+
+ @Override
+ public long getMaxBufferedBytes() {
+ return -1;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index c9ec6bf..75926b9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -30,13 +30,13 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
import java.io.IOException;
/**
- * Utility for creating {@link CheckpointBarrierHandler} based on checkpoint mode
+ * Utility for creating {@link CheckpointedInputGate} based on checkpoint mode
* for {@link StreamInputProcessor} and {@link StreamTwoInputProcessor}.
*/
@Internal
public class InputProcessorUtil {
- public static CheckpointBarrierHandler createCheckpointBarrierHandler(
+ public static CheckpointedInputGate createCheckpointBarrierHandler(
StreamTask<?, ?> checkpointedTask,
CheckpointingMode checkpointMode,
IOManager ioManager,
@@ -44,7 +44,7 @@ public class InputProcessorUtil {
Configuration taskManagerConfig,
String taskName) throws IOException {
- CheckpointBarrierHandler barrierHandler;
+ CheckpointedInputGate barrierHandler;
if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
if (!(maxAlign == -1 || maxAlign > 0)) {
@@ -67,7 +67,10 @@ public class InputProcessorUtil {
checkpointedTask);
}
} else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
- barrierHandler = new BarrierTracker(inputGate);
+ barrierHandler = new BarrierBuffer(
+ inputGate,
+ new EmptyBufferStorage(),
+ new CheckpointBarrierTracker(inputGate.getNumberOfInputChannels(), checkpointedTask));
} else {
throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode);
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 742dbe8..58b2051 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -104,7 +104,7 @@ public class StreamInputProcessor<IN> {
InputGate inputGate = InputGateUtil.createInputGate(inputGates);
- CheckpointBarrierHandler barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(
+ CheckpointedInputGate barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(
checkpointedTask,
checkpointMode,
ioManager,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
index 85e7f46..ecf88e2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
@@ -42,12 +42,12 @@ import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkState;
/**
- * Implementation of {@link StreamTaskInput} that wraps an input from network taken from {@link CheckpointBarrierHandler}.
+ * Implementation of {@link StreamTaskInput} that wraps an input from network taken from {@link CheckpointedInputGate}.
*/
@Internal
public final class StreamTaskNetworkInput implements StreamTaskInput {
- private final CheckpointBarrierHandler barrierHandler;
+ private final CheckpointedInputGate barrierHandler;
private final DeserializationDelegate<StreamElement> deserializationDelegate;
@@ -63,7 +63,7 @@ public final class StreamTaskNetworkInput implements StreamTaskInput {
@SuppressWarnings("unchecked")
public StreamTaskNetworkInput(
- CheckpointBarrierHandler barrierHandler,
+ CheckpointedInputGate barrierHandler,
TypeSerializer<?> inputSerializer,
IOManager ioManager,
int inputIndex) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 4efbc7f..aa6354d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -86,7 +86,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
private final DeserializationDelegate<StreamElement> deserializationDelegate1;
private final DeserializationDelegate<StreamElement> deserializationDelegate2;
- private final CheckpointBarrierHandler barrierHandler;
+ private final CheckpointedInputGate barrierHandler;
private final Object lock;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
index 54ce749..d5ebf29 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
@@ -120,8 +120,9 @@ public class StreamTwoInputSelectableProcessor<IN1, IN2> {
InputGate unionedInputGate2 = InputGateUtil.createInputGate(inputGates2.toArray(new InputGate[0]));
// create a Input instance for each input
- this.input1 = new StreamTaskNetworkInput(new BarrierDiscarder(unionedInputGate1), inputSerializer1, ioManager, 0);
- this.input2 = new StreamTaskNetworkInput(new BarrierDiscarder(unionedInputGate2), inputSerializer2, ioManager, 1);
+ CachedBufferStorage bufferStorage = new CachedBufferStorage(unionedInputGate1.getPageSize());
+ this.input1 = new StreamTaskNetworkInput(new BarrierBuffer(unionedInputGate1, bufferStorage, new CheckpointBarrierDiscarder()), inputSerializer1, ioManager, 0);
+ this.input2 = new StreamTaskNetworkInput(new BarrierBuffer(unionedInputGate2, bufferStorage, new CheckpointBarrierDiscarder()), inputSerializer2, ioManager, 1);
this.statusWatermarkValve1 = new StatusWatermarkValve(
unionedInputGate1.getNumberOfInputChannels(),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
index e6e48a8..13c4aad 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
@@ -483,7 +483,7 @@ public abstract class BarrierBufferTestBase {
// align checkpoint 1
startTs = System.nanoTime();
check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
- assertEquals(1L, buffer.getCurrentCheckpointId());
+ assertEquals(1L, buffer.getLatestCheckpointId());
// checkpoint done - replay buffered
check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
@@ -501,7 +501,7 @@ public abstract class BarrierBufferTestBase {
// checkpoint 2 aborted, checkpoint 3 started
check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
- assertEquals(3L, buffer.getCurrentCheckpointId());
+ assertEquals(3L, buffer.getLatestCheckpointId());
validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos());
verify(toNotify).abortCheckpointOnBarrier(eq(2L),
argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)));
@@ -565,7 +565,7 @@ public abstract class BarrierBufferTestBase {
check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
- assertEquals(1L, buffer.getCurrentCheckpointId());
+ assertEquals(1L, buffer.getLatestCheckpointId());
check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
@@ -574,7 +574,7 @@ public abstract class BarrierBufferTestBase {
// alignment of checkpoint 2
check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
- assertEquals(2L, buffer.getCurrentCheckpointId());
+ assertEquals(2L, buffer.getLatestCheckpointId());
check(sequence[15], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
@@ -583,7 +583,7 @@ public abstract class BarrierBufferTestBase {
// checkpoint 2 aborted, checkpoint 4 started. replay buffered
check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
- assertEquals(4L, buffer.getCurrentCheckpointId());
+ assertEquals(4L, buffer.getLatestCheckpointId());
check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
@@ -651,7 +651,7 @@ public abstract class BarrierBufferTestBase {
check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
- assertEquals(1L, buffer.getCurrentCheckpointId());
+ assertEquals(1L, buffer.getLatestCheckpointId());
check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
@@ -660,7 +660,7 @@ public abstract class BarrierBufferTestBase {
// alignment of checkpoint 2
check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
- assertEquals(2L, buffer.getCurrentCheckpointId());
+ assertEquals(2L, buffer.getLatestCheckpointId());
// checkpoint 2 completed
check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
@@ -669,7 +669,7 @@ public abstract class BarrierBufferTestBase {
// checkpoint 3 skipped, alignment for 4 started
check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
- assertEquals(4L, buffer.getCurrentCheckpointId());
+ assertEquals(4L, buffer.getLatestCheckpointId());
check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[24], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[26], buffer.pollNext().get(), PAGE_SIZE);
@@ -790,24 +790,24 @@ public abstract class BarrierBufferTestBase {
// checkpoint 3 alignment
check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
- assertEquals(2L, buffer.getCurrentCheckpointId());
+ assertEquals(2L, buffer.getLatestCheckpointId());
check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[11], buffer.pollNext().get(), PAGE_SIZE);
// checkpoint 3 buffered
check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
- assertEquals(3L, buffer.getCurrentCheckpointId());
+ assertEquals(3L, buffer.getLatestCheckpointId());
// after checkpoint 4
check(sequence[15], buffer.pollNext().get(), PAGE_SIZE);
- assertEquals(4L, buffer.getCurrentCheckpointId());
+ assertEquals(4L, buffer.getLatestCheckpointId());
check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[17], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
- assertEquals(5L, buffer.getCurrentCheckpointId());
+ assertEquals(5L, buffer.getLatestCheckpointId());
check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
}
@@ -837,11 +837,11 @@ public abstract class BarrierBufferTestBase {
check(sequence[3], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[4], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
- assertEquals(1L, buffer.getCurrentCheckpointId());
+ assertEquals(1L, buffer.getLatestCheckpointId());
// alignment of second checkpoint
check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
- assertEquals(2L, buffer.getCurrentCheckpointId());
+ assertEquals(2L, buffer.getLatestCheckpointId());
// first end-of-partition encountered: checkpoint will not be completed
check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
@@ -874,7 +874,7 @@ public abstract class BarrierBufferTestBase {
assertEquals(0L, buffer.getAlignmentDurationNanos());
check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
- assertEquals(5L, buffer.getCurrentCheckpointId());
+ assertEquals(5L, buffer.getLatestCheckpointId());
verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L),
argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
@@ -882,7 +882,7 @@ public abstract class BarrierBufferTestBase {
assertEquals(0L, buffer.getAlignmentDurationNanos());
check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
- assertEquals(6L, buffer.getCurrentCheckpointId());
+ assertEquals(6L, buffer.getLatestCheckpointId());
verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L),
argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
assertEquals(0L, buffer.getAlignmentDurationNanos());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index a8e7727..5112f63 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -46,7 +46,7 @@ public class BarrierTrackerTest {
private static final int PAGE_SIZE = 512;
- private BarrierTracker tracker;
+ private CheckpointedInputGate tracker;
@After
public void ensureEmpty() throws Exception {
@@ -365,16 +365,19 @@ public class BarrierTrackerTest {
// ------------------------------------------------------------------------
// Utils
// ------------------------------------------------------------------------
- private static BarrierTracker createBarrierTracker(int numberOfChannels, BufferOrEvent[] sequence) {
+ private static CheckpointedInputGate createBarrierTracker(int numberOfChannels, BufferOrEvent[] sequence) {
return createBarrierTracker(numberOfChannels, sequence, null);
}
- private static BarrierTracker createBarrierTracker(
+ private static CheckpointedInputGate createBarrierTracker(
int numberOfChannels,
BufferOrEvent[] sequence,
@Nullable AbstractInvokable toNotifyOnCheckpoint) {
MockInputGate gate = new MockInputGate(PAGE_SIZE, numberOfChannels, Arrays.asList(sequence));
- return new BarrierTracker(gate, toNotifyOnCheckpoint);
+ return new BarrierBuffer(
+ gate,
+ new CachedBufferStorage(PAGE_SIZE, -1, "Testing"),
+ new CheckpointBarrierTracker(gate.getNumberOfInputChannels(), toNotifyOnCheckpoint));
}
private static BufferOrEvent createBarrier(long id, int channel) {