You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/01 14:41:11 UTC
[flink] 02/16: [hotfix][network] Make toNotifyOnCheckpoint field
final in ChekpointBarrierHandlers
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3adc3eb2a6ece13f3befddd41f508d01bde067d7
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Jun 13 15:07:01 2019 +0200
[hotfix][network] Make toNotifyOnCheckpoint field final in ChekpointBarrierHandlers
---
.../flink/streaming/runtime/io/BarrierBuffer.java | 27 ++++----
.../streaming/runtime/io/BarrierDiscarder.java | 15 -----
.../flink/streaming/runtime/io/BarrierTracker.java | 19 +++---
.../runtime/io/CheckpointBarrierHandler.java | 9 ---
.../streaming/runtime/io/InputProcessorUtil.java | 10 ++-
.../io/BarrierBufferAlignmentLimitTest.java | 18 ++++--
.../runtime/io/BarrierBufferTestBase.java | 73 ++++++++++------------
.../streaming/runtime/io/BarrierTrackerTest.java | 59 ++++++++---------
.../runtime/io/CreditBasedBarrierBufferTest.java | 7 ++-
.../runtime/io/SpillingBarrierBufferTest.java | 7 ++-
10 files changed, 105 insertions(+), 139 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index ad62360..b2e6ea1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -33,6 +33,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Optional;
@@ -86,8 +88,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
*/
private BufferOrEventSequence currentBuffered;
- /** Handler that receives the checkpoint notifications. */
- private AbstractInvokable toNotifyOnCheckpoint;
+ @Nullable
+ private final AbstractInvokable toNotifyOnCheckpoint;
/** The ID of the checkpoint for which we expect barriers. */
private long currentCheckpointId = -1L;
@@ -127,7 +129,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
*/
@VisibleForTesting
BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage) {
- this (inputGate, bufferStorage, -1, "Testing: No task associated");
+ this (inputGate, bufferStorage, -1, "Testing: No task associated", null);
}
/**
@@ -141,8 +143,14 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
* @param bufferStorage The storage to hold the buffers and events for blocked channels.
* @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts.
* @param taskName The task name for logging.
+ * @param toNotifyOnCheckpoint optional Handler that receives the checkpoint notifications.
*/
- BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage, long maxBufferedBytes, String taskName) {
+ BarrierBuffer(
+ InputGate inputGate,
+ BufferStorage bufferStorage,
+ long maxBufferedBytes,
+ String taskName,
+ @Nullable AbstractInvokable toNotifyOnCheckpoint) {
checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);
this.inputGate = inputGate;
@@ -154,6 +162,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
this.queuedBuffered = new ArrayDeque<BufferOrEventSequence>();
this.taskName = taskName;
+ this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
}
@Override
@@ -452,16 +461,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
@Override
- public void registerCheckpointEventHandler(AbstractInvokable toNotifyOnCheckpoint) {
- if (this.toNotifyOnCheckpoint == null) {
- this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
- }
- else {
- throw new IllegalStateException("BarrierBuffer already has a registered checkpoint notifyee");
- }
- }
-
- @Override
public boolean isEmpty() {
return currentBuffered == null;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java
index e8d9f34..c33c940 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -45,10 +44,6 @@ public class BarrierDiscarder implements CheckpointBarrierHandler {
*/
private final int totalNumberOfInputChannels;
-
- /** The listener to be notified on complete checkpoints. */
- private AbstractInvokable toNotifyOnCheckpoint;
-
// ------------------------------------------------------------------------
public BarrierDiscarder(InputGate inputGate) {
@@ -88,16 +83,6 @@ public class BarrierDiscarder implements CheckpointBarrierHandler {
}
@Override
- public void registerCheckpointEventHandler(AbstractInvokable toNotifyOnCheckpoint) {
- if (this.toNotifyOnCheckpoint == null) {
- this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
- }
- else {
- throw new IllegalStateException("BarrierDiscarder already has a registered checkpoint notifyee");
- }
- }
-
- @Override
public void cleanup() {
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 49d2991..f7629bb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -33,6 +33,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.ArrayDeque;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -77,7 +79,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
/** The listener to be notified on complete checkpoints. */
- private AbstractInvokable toNotifyOnCheckpoint;
+ private final AbstractInvokable toNotifyOnCheckpoint;
/** The highest checkpoint ID encountered so far. */
private long latestPendingCheckpointID = -1;
@@ -85,9 +87,14 @@ public class BarrierTracker implements CheckpointBarrierHandler {
// ------------------------------------------------------------------------
public BarrierTracker(InputGate inputGate) {
+ this(inputGate, null);
+ }
+
+ public BarrierTracker(InputGate inputGate, @Nullable AbstractInvokable toNotifyOnCheckpoint) {
this.inputGate = inputGate;
this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
this.pendingCheckpoints = new ArrayDeque<>();
+ this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
}
@Override
@@ -127,16 +134,6 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
@Override
- public void registerCheckpointEventHandler(AbstractInvokable toNotifyOnCheckpoint) {
- if (this.toNotifyOnCheckpoint == null) {
- this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
- }
- else {
- throw new IllegalStateException("BarrierTracker already has a registered checkpoint notifyee");
- }
- }
-
- @Override
public void cleanup() {
pendingCheckpoints.clear();
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index faffd44..2ee1a97 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.AsyncDataInput;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import java.io.IOException;
@@ -32,14 +31,6 @@ import java.io.IOException;
*/
@Internal
public interface CheckpointBarrierHandler extends AsyncDataInput<BufferOrEvent> {
-
- /**
- * Registers the task be notified once all checkpoint barriers have been received for a checkpoint.
- *
- * @param task The task to notify
- */
- void registerCheckpointEventHandler(AbstractInvokable task);
-
/**
* Cleans up all internally held resources.
*
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 289dd1a..ebef48f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -58,13 +58,15 @@ public class InputProcessorUtil {
inputGate,
new CachedBufferStorage(inputGate.getPageSize()),
maxAlign,
- taskName);
+ taskName,
+ checkpointedTask);
} else {
barrierHandler = new BarrierBuffer(
inputGate,
new BufferSpiller(ioManager, inputGate.getPageSize()),
maxAlign,
- taskName);
+ taskName,
+ checkpointedTask);
}
} else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
barrierHandler = new BarrierTracker(inputGate);
@@ -72,10 +74,6 @@ public class InputProcessorUtil {
throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode);
}
- if (checkpointedTask != null) {
- barrierHandler.registerCheckpointEventHandler(checkpointedTask);
- }
-
return barrierHandler;
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index 0a284e1..8c97938 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -115,10 +115,13 @@ public class BarrierBufferAlignmentLimitTest {
// the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
- BarrierBuffer buffer = new BarrierBuffer(gate, new BufferSpiller(ioManager, gate.getPageSize()), 1000, "Testing");
-
AbstractInvokable toNotify = mock(AbstractInvokable.class);
- buffer.registerCheckpointEventHandler(toNotify);
+ BarrierBuffer buffer = new BarrierBuffer(
+ gate,
+ new BufferSpiller(ioManager, gate.getPageSize()),
+ 1000,
+ "Testing",
+ toNotify);
// validating the sequence of buffers
@@ -210,10 +213,13 @@ public class BarrierBufferAlignmentLimitTest {
// the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
- BarrierBuffer buffer = new BarrierBuffer(gate, new BufferSpiller(ioManager, gate.getPageSize()), 500, "Testing");
-
AbstractInvokable toNotify = mock(AbstractInvokable.class);
- buffer.registerCheckpointEventHandler(toNotify);
+ BarrierBuffer buffer = new BarrierBuffer(
+ gate,
+ new BufferSpiller(ioManager, gate.getPageSize()),
+ 500,
+ "Testing",
+ toNotify);
// validating the sequence of buffers
long startTs;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
index 4bc05ff..908a199 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
@@ -41,6 +41,8 @@ import org.hamcrest.Description;
import org.junit.After;
import org.junit.Test;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
@@ -70,12 +72,23 @@ public abstract class BarrierBufferTestBase {
BarrierBuffer buffer;
- protected BarrierBuffer createBarrierBuffer(int numberOfChannels, BufferOrEvent[] sequence) throws IOException {
+ protected BarrierBuffer createBarrierBuffer(
+ int numberOfChannels,
+ BufferOrEvent[] sequence,
+ @Nullable AbstractInvokable toNotify) throws IOException {
MockInputGate gate = new MockInputGate(PAGE_SIZE, numberOfChannels, Arrays.asList(sequence));
- return createBarrierBuffer(gate);
+ return createBarrierBuffer(gate, toNotify);
+ }
+
+ protected BarrierBuffer createBarrierBuffer(int numberOfChannels, BufferOrEvent[] sequence) throws IOException {
+ return createBarrierBuffer(numberOfChannels, sequence, null);
}
- abstract BarrierBuffer createBarrierBuffer(InputGate gate) throws IOException;
+ protected BarrierBuffer createBarrierBuffer(InputGate gate) throws IOException {
+ return createBarrierBuffer(gate, null);
+ }
+
+ abstract BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable AbstractInvokable toNotify) throws IOException;
abstract void validateAlignmentBuffered(long actualBytesBuffered, BufferOrEvent... sequence);
@@ -147,10 +160,9 @@ public abstract class BarrierBufferTestBase {
createBarrier(4, 0), createBarrier(5, 0), createBarrier(6, 0),
createBuffer(0, PAGE_SIZE), createEndOfPartition(0)
};
- buffer = createBarrierBuffer(1, sequence);
-
ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
- buffer.registerCheckpointEventHandler(handler);
+ buffer = createBarrierBuffer(1, sequence, handler);
+
handler.setNextExpectedCheckpointId(1L);
for (BufferOrEvent boe : sequence) {
@@ -198,10 +210,9 @@ public abstract class BarrierBufferTestBase {
createBuffer(0, PAGE_SIZE),
createEndOfPartition(0), createEndOfPartition(1), createEndOfPartition(2)
};
- buffer = createBarrierBuffer(3, sequence);
-
ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
- buffer.registerCheckpointEventHandler(handler);
+ buffer = createBarrierBuffer(3, sequence, handler);
+
handler.setNextExpectedCheckpointId(1L);
// pre checkpoint 1
@@ -292,10 +303,9 @@ public abstract class BarrierBufferTestBase {
createBarrier(2, 2),
createBuffer(2, PAGE_SIZE), createEndOfPartition(2), createBuffer(0, PAGE_SIZE), createEndOfPartition(0)
};
- buffer = createBarrierBuffer(3, sequence);
-
ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
- buffer.registerCheckpointEventHandler(handler);
+ buffer = createBarrierBuffer(3, sequence, handler);
+
handler.setNextExpectedCheckpointId(1L);
// pre-checkpoint 1
@@ -368,10 +378,9 @@ public abstract class BarrierBufferTestBase {
createBuffer(2, PAGE_SIZE), createEndOfPartition(2),
createBuffer(0, PAGE_SIZE), createEndOfPartition(0)
};
- buffer = createBarrierBuffer(3, sequence);
-
ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
- buffer.registerCheckpointEventHandler(handler);
+ buffer = createBarrierBuffer(3, sequence, handler);
+
handler.setNextExpectedCheckpointId(1L);
// around checkpoint 1
@@ -461,10 +470,8 @@ public abstract class BarrierBufferTestBase {
createBuffer(2, PAGE_SIZE), createEndOfPartition(2),
createBuffer(0, PAGE_SIZE), createEndOfPartition(0)
};
- buffer = createBarrierBuffer(3, sequence);
-
AbstractInvokable toNotify = mock(AbstractInvokable.class);
- buffer.registerCheckpointEventHandler(toNotify);
+ buffer = createBarrierBuffer(3, sequence, toNotify);
long startTs;
@@ -548,10 +555,9 @@ public abstract class BarrierBufferTestBase {
createBuffer(2, PAGE_SIZE), createEndOfPartition(2),
createBuffer(0, PAGE_SIZE), createEndOfPartition(0)
};
- buffer = createBarrierBuffer(3, sequence);
-
ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
- buffer.registerCheckpointEventHandler(handler);
+ buffer = createBarrierBuffer(3, sequence, handler);
+
handler.setNextExpectedCheckpointId(1L);
// checkpoint 1
@@ -694,10 +700,9 @@ public abstract class BarrierBufferTestBase {
createBarrier(2, 2),
createBuffer(2, PAGE_SIZE), createEndOfPartition(2), createBuffer(0, PAGE_SIZE), createEndOfPartition(0)
};
- buffer = createBarrierBuffer(3, sequence);
-
ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
- buffer.registerCheckpointEventHandler(handler);
+ buffer = createBarrierBuffer(3, sequence, handler);
+
handler.setNextExpectedCheckpointId(1L);
// pre-checkpoint 1
@@ -841,10 +846,8 @@ public abstract class BarrierBufferTestBase {
createCancellationBarrier(6, 0),
createBuffer(0, PAGE_SIZE)
};
- buffer = createBarrierBuffer(1, sequence);
-
AbstractInvokable toNotify = mock(AbstractInvokable.class);
- buffer.registerCheckpointEventHandler(toNotify);
+ buffer = createBarrierBuffer(1, sequence, toNotify);
check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
@@ -902,10 +905,8 @@ public abstract class BarrierBufferTestBase {
/* 37 */ createBuffer(0, PAGE_SIZE)
};
- buffer = createBarrierBuffer(3, sequence);
-
AbstractInvokable toNotify = mock(AbstractInvokable.class);
- buffer.registerCheckpointEventHandler(toNotify);
+ buffer = createBarrierBuffer(3, sequence, toNotify);
long startTs;
@@ -990,10 +991,8 @@ public abstract class BarrierBufferTestBase {
// some more buffers
/* 16 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE)
};
- buffer = createBarrierBuffer(3, sequence);
-
AbstractInvokable toNotify = mock(AbstractInvokable.class);
- buffer.registerCheckpointEventHandler(toNotify);
+ buffer = createBarrierBuffer(3, sequence, toNotify);
long startTs;
@@ -1074,10 +1073,8 @@ public abstract class BarrierBufferTestBase {
// some more buffers
/* 18 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE)
};
- buffer = createBarrierBuffer(3, sequence);
-
AbstractInvokable toNotify = mock(AbstractInvokable.class);
- buffer.registerCheckpointEventHandler(toNotify);
+ buffer = createBarrierBuffer(3, sequence, toNotify);
long startTs;
@@ -1151,10 +1148,8 @@ public abstract class BarrierBufferTestBase {
// some more buffers
/* 16 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE)
};
- buffer = createBarrierBuffer(3, sequence);
-
AbstractInvokable toNotify = mock(AbstractInvokable.class);
- buffer.registerCheckpointEventHandler(toNotify);
+ buffer = createBarrierBuffer(3, sequence, toNotify);
long startTs;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 398a95a..cb58837 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -33,6 +33,8 @@ import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.junit.After;
import org.junit.Test;
+import javax.annotation.Nullable;
+
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
@@ -95,11 +97,9 @@ public class BarrierTrackerTest {
createBarrier(4, 0), createBarrier(5, 0), createBarrier(6, 0),
createBuffer(0)
};
- tracker = createBarrierTracker(1, sequence);
-
CheckpointSequenceValidator validator =
- new CheckpointSequenceValidator(1, 2, 3, 4, 5, 6);
- tracker.registerCheckpointEventHandler(validator);
+ new CheckpointSequenceValidator(1, 2, 3, 4, 5, 6);
+ tracker = createBarrierTracker(1, sequence, validator);
for (BufferOrEvent boe : sequence) {
if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
@@ -119,11 +119,9 @@ public class BarrierTrackerTest {
createBarrier(7, 0), createBuffer(0), createBarrier(10, 0),
createBuffer(0)
};
- tracker = createBarrierTracker(1, sequence);
-
CheckpointSequenceValidator validator =
- new CheckpointSequenceValidator(1, 3, 4, 6, 7, 10);
- tracker.registerCheckpointEventHandler(validator);
+ new CheckpointSequenceValidator(1, 3, 4, 6, 7, 10);
+ tracker = createBarrierTracker(1, sequence, validator);
for (BufferOrEvent boe : sequence) {
if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
@@ -152,11 +150,9 @@ public class BarrierTrackerTest {
createBuffer(0)
};
- tracker = createBarrierTracker(3, sequence);
-
CheckpointSequenceValidator validator =
- new CheckpointSequenceValidator(1, 2, 3, 4);
- tracker.registerCheckpointEventHandler(validator);
+ new CheckpointSequenceValidator(1, 2, 3, 4);
+ tracker = createBarrierTracker(3, sequence, validator);
for (BufferOrEvent boe : sequence) {
if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
@@ -189,11 +185,9 @@ public class BarrierTrackerTest {
createBuffer(0)
};
- tracker = createBarrierTracker(3, sequence);
-
CheckpointSequenceValidator validator =
- new CheckpointSequenceValidator(1, 2, 4);
- tracker.registerCheckpointEventHandler(validator);
+ new CheckpointSequenceValidator(1, 2, 4);
+ tracker = createBarrierTracker(3, sequence, validator);
for (BufferOrEvent boe : sequence) {
if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
@@ -265,11 +259,9 @@ public class BarrierTrackerTest {
// complete checkpoint 10
createBarrier(10, 0), createBarrier(10, 1),
};
- tracker = createBarrierTracker(3, sequence);
-
CheckpointSequenceValidator validator =
- new CheckpointSequenceValidator(2, 3, 4, 5, 7, 8, 9, 10);
- tracker.registerCheckpointEventHandler(validator);
+ new CheckpointSequenceValidator(2, 3, 4, 5, 7, 8, 9, 10);
+ tracker = createBarrierTracker(3, sequence, validator);
for (BufferOrEvent boe : sequence) {
if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
@@ -291,12 +283,10 @@ public class BarrierTrackerTest {
createCancellationBarrier(6, 0),
createBuffer(0)
};
- tracker = createBarrierTracker(1, sequence);
-
// negative values mean an expected cancellation call!
CheckpointSequenceValidator validator =
- new CheckpointSequenceValidator(1, 2, -4, 5, -6);
- tracker.registerCheckpointEventHandler(validator);
+ new CheckpointSequenceValidator(1, 2, -4, 5, -6);
+ tracker = createBarrierTracker(1, sequence, validator);
for (BufferOrEvent boe : sequence) {
if (boe.isBuffer()) {
@@ -342,12 +332,10 @@ public class BarrierTrackerTest {
createBuffer(0)
};
- tracker = createBarrierTracker(3, sequence);
-
// negative values mean an expected cancellation call!
CheckpointSequenceValidator validator =
- new CheckpointSequenceValidator(1, -2, 3, -4, 5, -6);
- tracker.registerCheckpointEventHandler(validator);
+ new CheckpointSequenceValidator(1, -2, 3, -4, 5, -6);
+ tracker = createBarrierTracker(3, sequence, validator);
for (BufferOrEvent boe : sequence) {
if (boe.isBuffer()) {
@@ -371,11 +359,8 @@ public class BarrierTrackerTest {
createCancellationBarrier(2L, 2),
createBuffer(0)
};
- tracker = createBarrierTracker(3, sequence);
-
AbstractInvokable statefulTask = mock(AbstractInvokable.class);
-
- tracker.registerCheckpointEventHandler(statefulTask);
+ tracker = createBarrierTracker(3, sequence, statefulTask);
for (BufferOrEvent boe : sequence) {
if (boe.isBuffer() || (boe.getEvent().getClass() != CheckpointBarrier.class && boe.getEvent().getClass() != CancelCheckpointMarker.class)) {
@@ -390,10 +375,16 @@ public class BarrierTrackerTest {
// ------------------------------------------------------------------------
// Utils
// ------------------------------------------------------------------------
-
private static BarrierTracker createBarrierTracker(int numberOfChannels, BufferOrEvent[] sequence) {
+ return createBarrierTracker(numberOfChannels, sequence, null);
+ }
+
+ private static BarrierTracker createBarrierTracker(
+ int numberOfChannels,
+ BufferOrEvent[] sequence,
+ @Nullable AbstractInvokable toNotifyOnCheckpoint) {
MockInputGate gate = new MockInputGate(PAGE_SIZE, numberOfChannels, Arrays.asList(sequence));
- return new BarrierTracker(gate);
+ return new BarrierTracker(gate, toNotifyOnCheckpoint);
}
private static BufferOrEvent createBarrier(long id, int channel) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
index da88ffb..bbfe8b6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
@@ -20,8 +20,9 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import java.io.IOException;
+import javax.annotation.Nullable;
import static org.junit.Assert.assertEquals;
@@ -31,8 +32,8 @@ import static org.junit.Assert.assertEquals;
public class CreditBasedBarrierBufferTest extends BarrierBufferTestBase {
@Override
- public BarrierBuffer createBarrierBuffer(InputGate gate) throws IOException {
- return new BarrierBuffer(gate, new CachedBufferStorage(PAGE_SIZE));
+ BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable AbstractInvokable toNotify) {
+ return new BarrierBuffer(gate, new CachedBufferStorage(PAGE_SIZE), -1, "Testing", toNotify);
}
@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
index 546fb62..2101f40 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
@@ -22,10 +22,13 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.io.IOException;
@@ -64,8 +67,8 @@ public class SpillingBarrierBufferTest extends BarrierBufferTestBase {
}
@Override
- public BarrierBuffer createBarrierBuffer(InputGate gate) throws IOException{
- return new BarrierBuffer(gate, new BufferSpiller(ioManager, PAGE_SIZE));
+ BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable AbstractInvokable toNotify) throws IOException {
+ return new BarrierBuffer(gate, new BufferSpiller(ioManager, PAGE_SIZE), -1, "Testing", toNotify);
}
@Override