You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/08 20:25:08 UTC
[2/4] flink git commit: [FLINK-4984] [checkpointing] Add Cancellation
Barriers as a way to signal aborted checkpoints
[FLINK-4984] [checkpointing] Add Cancellation Barriers as a way to signal aborted checkpoints
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1f028de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1f028de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1f028de
Branch: refs/heads/release-1.1
Commit: a1f028dee49928ada014632bb27216b36e30250e
Parents: 4dd3efe
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Oct 23 18:41:32 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 8 18:27:47 2016 +0100
----------------------------------------------------------------------
.../io/network/api/CancelCheckpointMarker.java | 77 +++
.../api/serialization/EventSerializer.java | 57 +-
.../runtime/io/network/netty/NettyMessage.java | 2 +-
.../partition/PipelinedSubpartition.java | 3 +-
.../runtime/jobgraph/tasks/StatefulTask.java | 21 +
.../api/serialization/EventSerializerTest.java | 45 +-
.../jobmanager/JobManagerHARecoveryTest.java | 10 +
.../runtime/taskmanager/TaskAsyncCallTest.java | 10 +
.../streaming/runtime/io/BarrierBuffer.java | 265 ++++++--
.../streaming/runtime/io/BarrierTracker.java | 164 +++--
.../streaming/runtime/io/BufferSpiller.java | 2 +-
.../runtime/io/CheckpointBarrierHandler.java | 22 +-
.../runtime/io/StreamInputProcessor.java | 5 +-
.../runtime/io/StreamTwoInputProcessor.java | 5 +-
.../runtime/tasks/OneInputStreamTask.java | 2 +-
.../streaming/runtime/tasks/OperatorChain.java | 32 +-
.../streaming/runtime/tasks/StreamTask.java | 48 +-
.../runtime/tasks/TwoInputStreamTask.java | 2 +-
.../streaming/runtime/io/BarrierBufferTest.java | 617 +++++++++++++++++--
.../runtime/io/BarrierTrackerTest.java | 157 ++++-
20 files changed, 1291 insertions(+), 255 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
new file mode 100644
index 0000000..52a2517
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * The CancelCheckpointMarker travels through the data streams, similar to the {@link CheckpointBarrier},
+ * but signals that a certain checkpoint should be canceled. Any in-progress alignment for that
+ * checkpoint needs to be canceled and regular processing should be resumed.
+ */
+public class CancelCheckpointMarker extends RuntimeEvent {
+
+ /** The id of the checkpoint to be canceled */
+ private final long checkpointId;
+
+ public CancelCheckpointMarker(long checkpointId) {
+ this.checkpointId = checkpointId;
+ }
+
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ // ------------------------------------------------------------------------
+ // These known and common event go through special code paths, rather than
+ // through generic serialization
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ throw new UnsupportedOperationException("this method should never be called");
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ throw new UnsupportedOperationException("this method should never be called");
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return (int) (checkpointId ^ (checkpointId >>> 32));
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other != null &&
+ other.getClass() == CancelCheckpointMarker.class &&
+ this.checkpointId == ((CancelCheckpointMarker) other).checkpointId;
+ }
+
+ @Override
+ public String toString() {
+ return "CancelCheckpointMarker " + checkpointId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index a34f8cf..0bc3b28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.api.serialization;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
@@ -38,7 +39,7 @@ import java.nio.ByteOrder;
* Utility class to serialize and deserialize task events.
*/
public class EventSerializer {
-
+
private static final int END_OF_PARTITION_EVENT = 0;
private static final int CHECKPOINT_BARRIER_EVENT = 1;
@@ -46,17 +47,19 @@ public class EventSerializer {
private static final int END_OF_SUPERSTEP_EVENT = 2;
private static final int OTHER_EVENT = 3;
-
+
+ private static final int CANCEL_CHECKPOINT_MARKER_EVENT = 4;
+
// ------------------------------------------------------------------------
-
- public static ByteBuffer toSerializedEvent(AbstractEvent event) {
+
+ public static ByteBuffer toSerializedEvent(AbstractEvent event) throws IOException {
final Class<?> eventClass = event.getClass();
if (eventClass == EndOfPartitionEvent.class) {
return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_PARTITION_EVENT });
}
else if (eventClass == CheckpointBarrier.class) {
CheckpointBarrier barrier = (CheckpointBarrier) event;
-
+
ByteBuffer buf = ByteBuffer.allocate(20);
buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
buf.putLong(4, barrier.getId());
@@ -66,32 +69,39 @@ public class EventSerializer {
else if (eventClass == EndOfSuperstepEvent.class) {
return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_SUPERSTEP_EVENT });
}
+ else if (eventClass == CancelCheckpointMarker.class) {
+ CancelCheckpointMarker marker = (CancelCheckpointMarker) event;
+
+ ByteBuffer buf = ByteBuffer.allocate(12);
+ buf.putInt(0, CANCEL_CHECKPOINT_MARKER_EVENT);
+ buf.putLong(4, marker.getCheckpointId());
+ return buf;
+ }
else {
try {
final DataOutputSerializer serializer = new DataOutputSerializer(128);
serializer.writeInt(OTHER_EVENT);
serializer.writeUTF(event.getClass().getName());
event.write(serializer);
-
return serializer.wrapAsByteBuffer();
}
catch (IOException e) {
- throw new RuntimeException("Error while serializing event.", e);
+ throw new IOException("Error while serializing event.", e);
}
}
}
- public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) {
+ public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) throws IOException {
if (buffer.remaining() < 4) {
- throw new RuntimeException("Incomplete event");
+ throw new IOException("Incomplete event");
}
-
+
final ByteOrder bufferOrder = buffer.order();
buffer.order(ByteOrder.BIG_ENDIAN);
-
+
try {
int type = buffer.getInt();
-
+
if (type == END_OF_PARTITION_EVENT) {
return EndOfPartitionEvent.INSTANCE;
}
@@ -103,35 +113,38 @@ public class EventSerializer {
else if (type == END_OF_SUPERSTEP_EVENT) {
return EndOfSuperstepEvent.INSTANCE;
}
+ else if (type == CANCEL_CHECKPOINT_MARKER_EVENT) {
+ long id = buffer.getLong();
+ return new CancelCheckpointMarker(id);
+ }
else if (type == OTHER_EVENT) {
try {
final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);
-
final String className = deserializer.readUTF();
-
+
final Class<? extends AbstractEvent> clazz;
try {
clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
}
catch (ClassNotFoundException e) {
- throw new RuntimeException("Could not load event class '" + className + "'.", e);
+ throw new IOException("Could not load event class '" + className + "'.", e);
}
catch (ClassCastException e) {
- throw new RuntimeException("The class '" + className + "' is not a valid subclass of '"
+ throw new IOException("The class '" + className + "' is not a valid subclass of '"
+ AbstractEvent.class.getName() + "'.", e);
}
-
+
final AbstractEvent event = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
event.read(deserializer);
-
+
return event;
}
catch (Exception e) {
- throw new RuntimeException("Error while deserializing or instantiating event.", e);
+ throw new IOException("Error while deserializing or instantiating event.", e);
}
}
else {
- throw new RuntimeException("Corrupt byte stream for event");
+ throw new IOException("Corrupt byte stream for event");
}
}
finally {
@@ -143,7 +156,7 @@ public class EventSerializer {
// Buffer helpers
// ------------------------------------------------------------------------
- public static Buffer toBuffer(AbstractEvent event) {
+ public static Buffer toBuffer(AbstractEvent event) throws IOException {
final ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event);
MemorySegment data = MemorySegmentFactory.wrap(serializedEvent.array());
@@ -154,7 +167,7 @@ public class EventSerializer {
return buffer;
}
- public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) {
+ public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) throws IOException {
return fromSerializedEvent(buffer.getNioBuffer(), classLoader);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 3a24181..6f6001b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -463,7 +463,7 @@ abstract class NettyMessage {
}
@Override
- public void readFrom(ByteBuf buffer) {
+ public void readFrom(ByteBuf buffer) throws IOException {
// TODO Directly deserialize fromNetty's buffer
int length = buffer.readInt();
ByteBuffer serializedEvent = ByteBuffer.allocate(length);
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 2d7097d..b703acb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.util.event.NotificationListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayDeque;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -88,7 +89,7 @@ class PipelinedSubpartition extends ResultSubpartition {
}
@Override
- public void finish() {
+ public void finish() throws IOException {
final NotificationListener listener;
synchronized (buffers) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index f8bba1a..7c581df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -47,6 +47,27 @@ public interface StatefulTask<T extends StateHandle<?>> {
*/
boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
+ /**
+ * This method is called when a checkpoint is triggered as a result of receiving checkpoint
+ * barriers on all input streams.
+ *
+ * @param checkpointId The ID of the checkpoint, incrementing.
+ * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
+ *
+ * @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
+ */
+ void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception;
+
+ /**
+ * Aborts a checkpoint as the result of receiving possibly some checkpoint barriers,
+ * but at least one {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker}.
+ *
+ * <p>This requires implementing tasks to forward a
+ * {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their outputs.
+ *
+ * @param checkpointId The ID of the checkpoint to be aborted.
+ */
+ void abortCheckpointOnBarrier(long checkpointId) throws Exception;
/**
* Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index ddfd758..d47a0b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.api.serialization;
import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
@@ -28,34 +29,30 @@ import org.junit.Test;
import java.nio.ByteBuffer;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
public class EventSerializerTest {
@Test
- public void testSerializeDeserializeEvent() {
- try {
- AbstractEvent[] events = {
- EndOfPartitionEvent.INSTANCE,
- EndOfSuperstepEvent.INSTANCE,
- new CheckpointBarrier(1678L, 4623784L),
- new TestTaskEvent(Math.random(), 12361231273L)
- };
-
- for (AbstractEvent evt : events) {
- ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt);
- assertTrue(serializedEvent.hasRemaining());
-
- AbstractEvent deserialized =
- EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader());
- assertNotNull(deserialized);
- assertEquals(evt, deserialized);
- }
-
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ public void testSerializeDeserializeEvent() throws Exception {
+ AbstractEvent[] events = {
+ EndOfPartitionEvent.INSTANCE,
+ EndOfSuperstepEvent.INSTANCE,
+ new CheckpointBarrier(1678L, 4623784L),
+ new TestTaskEvent(Math.random(), 12361231273L),
+ new CancelCheckpointMarker(287087987329842L)
+ };
+
+ for (AbstractEvent evt : events) {
+ ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt);
+ assertTrue(serializedEvent.hasRemaining());
+
+ AbstractEvent deserialized =
+ EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader());
+ assertNotNull(deserialized);
+ assertEquals(evt, deserialized);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index f050e29..4dfaf95 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -450,6 +450,16 @@ public class JobManagerHARecoveryTest {
}
@Override
+ public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception {
+ throw new UnsupportedOperationException("should not be called!");
+ }
+
+ @Override
+ public void abortCheckpointOnBarrier(long checkpointId) {
+ throw new UnsupportedOperationException("should not be called!");
+ }
+
+ @Override
public void notifyCheckpointComplete(long checkpointId) {
if (completedCheckpoints++ > NUM_CHECKPOINTS_TO_COMPLETE) {
completedCheckpointsLatch.countDown();
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 0c0d064..5b344eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -221,6 +221,16 @@ public class TaskAsyncCallTest {
}
@Override
+ public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception {
+ throw new UnsupportedOperationException("Should not be called");
+ }
+
+ @Override
+ public void abortCheckpointOnBarrier(long checkpointId) {
+ throw new UnsupportedOperationException("Should not be called");
+ }
+
+ @Override
public void notifyCheckpointComplete(long checkpointId) {
if (checkpointId != lastCheckpointId && this.error == null) {
this.error = new Exception("calls out of order");
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index dcd76c6..36de717 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -17,42 +17,43 @@
package org.apache.flink.streaming.runtime.io;
-import java.io.IOException;
-import java.util.ArrayDeque;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayDeque;
+
/**
* The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
* all inputs have received the barrier for a given checkpoint.
*
* <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
* BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until
- * the blocks are released.</p>
+ * the blocks are released.
*/
@Internal
public class BarrierBuffer implements CheckpointBarrierHandler {
private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
-
+
/** The gate that the buffer draws its input from */
private final InputGate inputGate;
/** Flags that indicate whether a channel is currently blocked/buffered */
private final boolean[] blockedChannels;
-
+
/** The total number of channels that this buffer handles data from */
private final int totalNumberOfInputChannels;
-
+
/** To utility to write blocked data to a file channel */
private final BufferSpiller bufferSpiller;
@@ -65,17 +66,24 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
/** Handler that receives the checkpoint notifications */
- private EventListener<CheckpointBarrier> checkpointHandler;
+ private StatefulTask<?> toNotifyOnCheckpoint;
/** The ID of the checkpoint for which we expect barriers */
private long currentCheckpointId = -1L;
- /** The number of received barriers (= number of blocked/buffered channels) */
+ /** The number of received barriers (= number of blocked/buffered channels)
+ * IMPORTANT: A canceled checkpoint must always have 0 barriers */
private int numBarriersReceived;
-
+
/** The number of already closed channels */
private int numClosedChannels;
-
+
+ /** The timestamp as in {@link System#nanoTime()} at which the last alignment started */
+ private long startOfAlignmentTimestamp;
+
+ /** The time (in nanoseconds) that the latest alignment took */
+ private long latestAlignmentDurationNanos;
+
/** Flag to indicate whether we have drawn all available input */
private boolean endOfStream;
@@ -90,7 +98,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
this.inputGate = inputGate;
this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
-
+
this.bufferSpiller = new BufferSpiller(ioManager, inputGate.getPageSize());
this.queuedBuffered = new ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>();
}
@@ -100,7 +108,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
// ------------------------------------------------------------------------
@Override
- public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+ public BufferOrEvent getNextNonBlocked() throws Exception {
while (true) {
// process buffered BufferOrEvents before grabbing new ones
BufferOrEvent next;
@@ -114,7 +122,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
return getNextNonBlocked();
}
}
-
+
if (next != null) {
if (isBlocked(next.getChannelIndex())) {
// if the channel is blocked we, we just store the BufferOrEvent
@@ -129,27 +137,29 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
}
}
+ else if (next.getEvent().getClass() == CancelCheckpointMarker.class) {
+ processCancellationBarrier((CancelCheckpointMarker) next.getEvent());
+ }
else {
if (next.getEvent().getClass() == EndOfPartitionEvent.class) {
- numClosedChannels++;
- // no chance to complete this checkpoint
- releaseBlocks();
+ processEndOfPartition(next.getChannelIndex());
}
return next;
}
}
else if (!endOfStream) {
- // end of stream. we feed the data that is still buffered
+ // end of input stream. stream continues with the buffered data
endOfStream = true;
- releaseBlocks();
+ releaseBlocksAndResetBarriers();
return getNextNonBlocked();
}
else {
+ // final end of both input and buffered data
return null;
}
}
}
-
+
private void completeBufferedSequence() throws IOException {
currentBuffered.cleanup();
currentBuffered = queuedBuffered.pollFirst();
@@ -157,66 +167,175 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
currentBuffered.open();
}
}
-
- private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws IOException {
+
+ private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
final long barrierId = receivedBarrier.getId();
+ // fast path for single channel cases
+ if (totalNumberOfInputChannels == 1) {
+ if (barrierId > currentCheckpointId) {
+ // new checkpoint
+ currentCheckpointId = barrierId;
+ notifyCheckpoint(receivedBarrier);
+ }
+ return;
+ }
+
+ // -- general code path for multiple input channels --
+
if (numBarriersReceived > 0) {
- // subsequent barrier of a checkpoint.
+ // this is only true if some alignment is already progress and was not canceled
+
if (barrierId == currentCheckpointId) {
// regular case
onBarrier(channelIndex);
}
else if (barrierId > currentCheckpointId) {
- // we did not complete the current checkpoint
+ // we did not complete the current checkpoint, another started before
LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.", barrierId, currentCheckpointId);
- releaseBlocks();
- currentCheckpointId = barrierId;
- onBarrier(channelIndex);
+ // let the task know we are not completing this
+ notifyAbort(currentCheckpointId);
+
+ // abort the current checkpoint
+ releaseBlocksAndResetBarriers();
+
+ // begin a the new checkpoint
+ beginNewAlignment(barrierId, channelIndex);
}
else {
- // ignore trailing barrier from aborted checkpoint
+ // ignore trailing barrier from an earlier checkpoint (obsolete now)
return;
}
-
}
else if (barrierId > currentCheckpointId) {
// first barrier of a new checkpoint
- currentCheckpointId = barrierId;
- onBarrier(channelIndex);
+ beginNewAlignment(barrierId, channelIndex);
}
else {
- // trailing barrier from previous (skipped) checkpoint
+ // either the current checkpoint was canceled (numBarriers == 0) or
+ // this barrier is from an old subsumed checkpoint
return;
}
- // check if we have all barriers
+ // check if we have all barriers - since canceled checkpoints always have zero barriers
+ // this can only happen on a non canceled checkpoint
if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
+ // actually trigger checkpoint
if (LOG.isDebugEnabled()) {
- LOG.debug("Received all barrier, triggering checkpoint {} at {}",
+ LOG.debug("Received all barriers, triggering checkpoint {} at {}",
receivedBarrier.getId(), receivedBarrier.getTimestamp());
}
- if (checkpointHandler != null) {
- checkpointHandler.onEvent(receivedBarrier);
+ releaseBlocksAndResetBarriers();
+ notifyCheckpoint(receivedBarrier);
+ }
+ }
+
+ private void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
+ final long barrierId = cancelBarrier.getCheckpointId();
+
+ // fast path for single channel cases
+ if (totalNumberOfInputChannels == 1) {
+ if (barrierId > currentCheckpointId) {
+ // new checkpoint
+ currentCheckpointId = barrierId;
+ notifyAbort(barrierId);
}
-
- releaseBlocks();
+ return;
+ }
+
+ // -- general code path for multiple input channels --
+
+ if (numBarriersReceived > 0) {
+ // this is only true if some alignment is in progress and nothing was canceled
+
+ if (barrierId == currentCheckpointId) {
+ // cancel this alignment
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checkpoint {} canceled, aborting alignment", barrierId);
+ }
+
+ releaseBlocksAndResetBarriers();
+ notifyAbort(barrierId);
+ }
+ else if (barrierId > currentCheckpointId) {
+ // we canceled the next which also cancels the current
+ LOG.warn("Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
+ "Skipping current checkpoint.", barrierId, currentCheckpointId);
+
+ // this stops the current alignment
+ releaseBlocksAndResetBarriers();
+
+ // the next checkpoint starts as canceled
+ currentCheckpointId = barrierId;
+ startOfAlignmentTimestamp = 0L;
+ latestAlignmentDurationNanos = 0L;
+ notifyAbort(barrierId);
+ }
+
+ // else: ignore trailing (cancellation) barrier from an earlier checkpoint (obsolete now)
+
+ }
+ else if (barrierId > currentCheckpointId) {
+ // first barrier of a new checkpoint is directly a cancellation
+
+ // by setting the currentCheckpointId to this checkpoint while keeping the numBarriers
+ // at zero means that no checkpoint barrier can start a new alignment
+ currentCheckpointId = barrierId;
+
+ startOfAlignmentTimestamp = 0L;
+ latestAlignmentDurationNanos = 0L;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId);
+ }
+
+ notifyAbort(barrierId);
}
+
+ // else: trailing barrier from either
+ // - a previous (subsumed) checkpoint
+ // - the current checkpoint if it was already canceled
}
-
+
+ private void processEndOfPartition(int channel) throws Exception {
+ numClosedChannels++;
+
+ if (numBarriersReceived > 0) {
+ // let the task know we skip a checkpoint
+ notifyAbort(currentCheckpointId);
+
+ // no chance to complete this checkpoint
+ releaseBlocksAndResetBarriers();
+ }
+ }
+
+ private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception {
+ if (toNotifyOnCheckpoint != null) {
+ toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+ checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
+ }
+ }
+
+ private void notifyAbort(long checkpointId) throws Exception {
+ if (toNotifyOnCheckpoint != null) {
+ toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+ }
+ }
+
+
@Override
- public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
- if (this.checkpointHandler == null) {
- this.checkpointHandler = checkpointHandler;
+ public void registerCheckpointEventHandler(StatefulTask<?> toNotifyOnCheckpoint) {
+ if (this.toNotifyOnCheckpoint == null) {
+ this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
}
else {
- throw new IllegalStateException("BarrierBuffer already has a registered checkpoint handler");
+ throw new IllegalStateException("BarrierBuffer already has a registered checkpoint notifyee");
}
}
-
+
@Override
public boolean isEmpty() {
return currentBuffered == null;
@@ -231,8 +350,20 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
for (BufferSpiller.SpilledBufferOrEventSequence seq : queuedBuffered) {
seq.cleanup();
}
+ queuedBuffered.clear();
}
-
+
+ private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException {
+ currentCheckpointId = checkpointId;
+ onBarrier(channelIndex);
+
+ startOfAlignmentTimestamp = System.nanoTime();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting stream alignment for checkpoint " + checkpointId);
+ }
+ }
+
/**
* Checks whether the channel with the given index is blocked.
*
@@ -242,7 +373,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
private boolean isBlocked(int channelIndex) {
return blockedChannels[channelIndex];
}
-
+
/**
* Blocks the given channel index, from which a barrier has been received.
*
@@ -251,30 +382,28 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
private void onBarrier(int channelIndex) throws IOException {
if (!blockedChannels[channelIndex]) {
blockedChannels[channelIndex] = true;
+
numBarriersReceived++;
-
+
if (LOG.isDebugEnabled()) {
LOG.debug("Received barrier from channel " + channelIndex);
}
}
else {
- throw new IOException("Stream corrupt: Repeated barrier for same checkpoint and input stream");
+ throw new IOException("Stream corrupt: Repeated barrier for same checkpoint on input " + channelIndex);
}
}
/**
- * Releases the blocks on all channels. Makes sure the just written data
- * is the next to be consumed.
+ * Releases the blocks on all channels and resets the barrier count.
+ * Makes sure the just written data is the next to be consumed.
*/
- private void releaseBlocks() throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Releasing blocks");
- }
+ private void releaseBlocksAndResetBarriers() throws IOException {
+ LOG.debug("End of stream alignment, feeding buffered data back");
for (int i = 0; i < blockedChannels.length; i++) {
blockedChannels[i] = false;
}
- numBarriersReceived = 0;
if (currentBuffered == null) {
// common case: no more buffered data
@@ -295,10 +424,18 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
currentBuffered = bufferedNow;
}
}
+
+ // the next barrier that comes must assume it is the first
+ numBarriersReceived = 0;
+
+ if (startOfAlignmentTimestamp > 0) {
+ latestAlignmentDurationNanos = System.nanoTime() - startOfAlignmentTimestamp;
+ startOfAlignmentTimestamp = 0;
+ }
}
// ------------------------------------------------------------------------
- // For Testing
+ // Properties
// ------------------------------------------------------------------------
/**
@@ -309,7 +446,17 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
public long getCurrentCheckpointId() {
return this.currentCheckpointId;
}
-
+
+ @Override
+ public long getAlignmentDurationNanos() {
+ long start = this.startOfAlignmentTimestamp;
+ if (start <= 0) {
+ return latestAlignmentDurationNanos;
+ } else {
+ return System.nanoTime() - start;
+ }
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 9c9ec4f..5157336 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -19,12 +19,12 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import java.io.IOException;
import java.util.ArrayDeque;
/**
@@ -34,9 +34,9 @@ import java.util.ArrayDeque;
*
* <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input
* channels that have sent barriers, so it cannot be used to gain "exactly-once" processing
- * guarantees. It can, however, be used to gain "at least once" processing guarantees.</p>
+ * guarantees. It can, however, be used to gain "at least once" processing guarantees.
*
- * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.</p>
+ * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.
*/
@Internal
public class BarrierTracker implements CheckpointBarrierHandler {
@@ -57,11 +57,12 @@ public class BarrierTracker implements CheckpointBarrierHandler {
private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
/** The listener to be notified on complete checkpoints */
- private EventListener<CheckpointBarrier> checkpointHandler;
+ private StatefulTask<?> toNotifyOnCheckpoint;
/** The highest checkpoint ID encountered so far */
private long latestPendingCheckpointID = -1;
-
+
+ // ------------------------------------------------------------------------
public BarrierTracker(InputGate inputGate) {
this.inputGate = inputGate;
@@ -70,28 +71,33 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
@Override
- public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+ public BufferOrEvent getNextNonBlocked() throws Exception {
while (true) {
BufferOrEvent next = inputGate.getNextBufferOrEvent();
- if (next == null) {
- return null;
- }
- else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) {
+ if (next == null || next.isBuffer()) {
+ // buffer or input exhausted
return next;
}
- else {
+ else if (next.getEvent().getClass() == CheckpointBarrier.class) {
processBarrier((CheckpointBarrier) next.getEvent());
}
+ else if (next.getEvent().getClass() == CancelCheckpointMarker.class) {
+ processCheckpointAbortBarrier((CancelCheckpointMarker) next.getEvent());
+ }
+ else {
+ // some other event
+ return next;
+ }
}
}
@Override
- public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
- if (this.checkpointHandler == null) {
- this.checkpointHandler = checkpointHandler;
+ public void registerCheckpointEventHandler(StatefulTask<?> toNotifyOnCheckpoint) {
+ if (this.toNotifyOnCheckpoint == null) {
+ this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
}
else {
- throw new IllegalStateException("BarrierTracker already has a registered checkpoint handler");
+ throw new IllegalStateException("BarrierTracker already has a registered checkpoint notifyee");
}
}
@@ -105,22 +111,27 @@ public class BarrierTracker implements CheckpointBarrierHandler {
return pendingCheckpoints.isEmpty();
}
- private void processBarrier(CheckpointBarrier receivedBarrier) {
+ @Override
+ public long getAlignmentDurationNanos() {
+ // this one does not do alignment at all
+ return 0L;
+ }
+
+ private void processBarrier(CheckpointBarrier receivedBarrier) throws Exception {
+ final long barrierId = receivedBarrier.getId();
+
// fast path for single channel trackers
if (totalNumberOfInputChannels == 1) {
- if (checkpointHandler != null) {
- checkpointHandler.onEvent(receivedBarrier);
- }
+ notifyCheckpoint(barrierId, receivedBarrier.getTimestamp());
return;
}
-
+
// general path for multiple input channels
- final long barrierId = receivedBarrier.getId();
// find the checkpoint barrier in the queue of bending barriers
CheckpointBarrierCount cbc = null;
int pos = 0;
-
+
for (CheckpointBarrierCount next : pendingCheckpoints) {
if (next.checkpointId == barrierId) {
cbc = next;
@@ -128,21 +139,21 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
pos++;
}
-
+
if (cbc != null) {
// add one to the count to that barrier and check for completion
int numBarriersNew = cbc.incrementBarrierCount();
if (numBarriersNew == totalNumberOfInputChannels) {
- // checkpoint can be triggered
+ // checkpoint can be triggered (or is aborted and all barriers have been seen)
// first, remove this checkpoint and all all prior pending
// checkpoints (which are now subsumed)
for (int i = 0; i <= pos; i++) {
pendingCheckpoints.pollFirst();
}
-
+
// notify the listener
- if (checkpointHandler != null) {
- checkpointHandler.onEvent(receivedBarrier);
+ if (!cbc.isAborted()) {
+ notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp());
}
}
}
@@ -163,45 +174,104 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
}
+ private void processCheckpointAbortBarrier(CancelCheckpointMarker barrier) throws Exception {
+ final long checkpointId = barrier.getCheckpointId();
+
+ // fast path for single channel trackers
+ if (totalNumberOfInputChannels == 1) {
+ notifyAbort(checkpointId);
+ return;
+ }
+
+ // -- general path for multiple input channels --
+
+ // find the checkpoint barrier in the queue of pending barriers
+ // while doing this we "abort" all checkpoints before that one
+ CheckpointBarrierCount cbc;
+ while ((cbc = pendingCheckpoints.peekFirst()) != null && cbc.checkpointId() < checkpointId) {
+ pendingCheckpoints.removeFirst();
+ }
+
+ if (cbc != null && cbc.checkpointId() == checkpointId) {
+ // make sure the checkpoint is remembered as aborted
+ if (cbc.markAborted()) {
+ // this was the first time the checkpoint was aborted - notify
+ notifyAbort(checkpointId);
+ }
+
+ // we still count the barriers to be able to remove the entry once all barriers have been seen
+ if (cbc.incrementBarrierCount() == totalNumberOfInputChannels) {
+ // we can remove this entry
+ pendingCheckpoints.removeFirst();
+ }
+ }
+ else {
+ notifyAbort(checkpointId);
+
+ // first barrier for this checkpoint - remember it as aborted
+ // since we polled away all entries with lower checkpoint IDs
+ // this entry will become the new first entry
+ if (pendingCheckpoints.size() < MAX_CHECKPOINTS_TO_TRACK) {
+ CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId);
+ abortedMarker.markAborted();
+ pendingCheckpoints.addFirst(abortedMarker);
+ }
+ }
+ }
+
+ private void notifyCheckpoint(long checkpointId, long timestamp) throws Exception {
+ if (toNotifyOnCheckpoint != null) {
+ toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointId, timestamp);
+ }
+ }
+
+ private void notifyAbort(long checkpointId) throws Exception {
+ if (toNotifyOnCheckpoint != null) {
+ toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+ }
+ }
+
// ------------------------------------------------------------------------
/**
* Simple class for a checkpoint ID with a barrier counter.
*/
private static final class CheckpointBarrierCount {
-
+
private final long checkpointId;
-
+
private int barrierCount;
-
- private CheckpointBarrierCount(long checkpointId) {
+
+ private boolean aborted;
+
+ CheckpointBarrierCount(long checkpointId) {
this.checkpointId = checkpointId;
this.barrierCount = 1;
}
+ public long checkpointId() {
+ return checkpointId;
+ }
+
public int incrementBarrierCount() {
return ++barrierCount;
}
-
- @Override
- public int hashCode() {
- return (int) ((checkpointId >>> 32) ^ checkpointId) + 17 * barrierCount;
+
+ public boolean isAborted() {
+ return aborted;
}
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof CheckpointBarrierCount) {
- CheckpointBarrierCount that = (CheckpointBarrierCount) obj;
- return this.checkpointId == that.checkpointId && this.barrierCount == that.barrierCount;
- }
- else {
- return false;
- }
+ public boolean markAborted() {
+ boolean firstAbort = !this.aborted;
+ this.aborted = true;
+ return firstAbort;
}
@Override
public String toString() {
- return String.format("checkpointID=%d, count=%d", checkpointId, barrierCount);
+ return isAborted() ?
+ String.format("checkpointID=%d - ABORTED", checkpointId) :
+ String.format("checkpointID=%d, count=%d", checkpointId, barrierCount);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 1b38a56..dc8d245 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -134,7 +134,7 @@ public class BufferSpiller {
else {
contents = EventSerializer.toSerializedEvent(boe.getEvent());
}
-
+
headBuffer.clear();
headBuffer.putInt(boe.getChannelIndex());
headBuffer.putInt(contents.remaining());
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 5aa2030..ca23491 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -20,8 +20,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import java.io.IOException;
@@ -43,14 +42,14 @@ public interface CheckpointBarrierHandler {
* @throws java.lang.InterruptedException Thrown if the thread is interrupted while blocking during
* waiting for the next BufferOrEvent to become available.
*/
- BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException;
+ BufferOrEvent getNextNonBlocked() throws Exception;
/**
- * Registers the given event handler to be notified on successful checkpoints.
- *
- * @param checkpointHandler The handler to register.
+ * Registers the task be notified once all checkpoint barriers have been received for a checkpoint.
+ *
+ * @param task The task to notify
*/
- void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler);
+ void registerCheckpointEventHandler(StatefulTask<?> task);
/**
* Cleans up all internally held resources.
@@ -64,4 +63,13 @@ public interface CheckpointBarrierHandler {
* @return {@code True}, if no data is buffered internally, {@code false} otherwise.
*/
boolean isEmpty();
+
+ /**
+ * Gets the time that the latest alignment took, in nanoseconds.
+ * If there is currently an alignment in progress, it will return the time spent in the
+ * current alignment so far.
+ *
+ * @return The duration in nanoseconds
+ */
+ long getAlignmentDurationNanos();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index d11990e..7d9e4d2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -24,6 +24,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
@@ -37,7 +38,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -45,7 +45,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
/**
* Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
@@ -85,7 +84,7 @@ public class StreamInputProcessor<IN> {
@SuppressWarnings("unchecked")
public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
- EventListener<CheckpointBarrier> checkpointListener,
+ StatefulTask<?> checkpointListener,
CheckpointingMode checkpointMode,
IOManager ioManager,
boolean enableWatermarkMultiplexing) throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index ce764b7..a3ae077 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
@@ -34,7 +35,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import java.io.IOException;
import java.util.Arrays;
@@ -95,7 +94,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
Collection<InputGate> inputGates2,
TypeSerializer<IN1> inputSerializer1,
TypeSerializer<IN2> inputSerializer2,
- EventListener<CheckpointBarrier> checkpointListener,
+ StatefulTask<?> checkpointListener,
CheckpointingMode checkpointMode,
IOManager ioManager,
boolean enableWatermarkMultiplexing) throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 938d8c1..d18ca16 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -43,7 +43,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
if (numberOfInputs > 0) {
InputGate[] inputGates = getEnvironment().getAllInputGates();
inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
- getCheckpointBarrierListener(),
+ this,
configuration.getCheckpointMode(),
getEnvironment().getIOManager(),
isSerializingTimestamps());
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 0e24516..351acaa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -126,15 +127,32 @@ public class OperatorChain<OUT> {
}
}
-
-
- public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException, InterruptedException {
- CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
- for (RecordWriterOutput<?> streamOutput : streamOutputs) {
- streamOutput.broadcastEvent(barrier);
+
+
+ public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException {
+ try {
+ CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
+ for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+ streamOutput.broadcastEvent(barrier);
+ }
+ }
+ catch (InterruptedException e) {
+ throw new IOException("Interrupted while broadcasting checkpoint barrier");
}
}
-
+
+ public void broadcastCheckpointCancelMarker(long id) throws IOException {
+ try {
+ CancelCheckpointMarker barrier = new CancelCheckpointMarker(id);
+ for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+ streamOutput.broadcastEvent(barrier);
+ }
+ }
+ catch (InterruptedException e) {
+ throw new IOException("Interrupted while broadcasting checkpoint cancellation");
+ }
+ }
+
public RecordWriterOutput<?>[] getStreamOutputs() {
return streamOutputs;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8f28cef..d55a9c5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -37,7 +36,6 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
-import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
@@ -580,9 +578,34 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
}
- protected boolean performCheckpoint(final long checkpointId, final long timestamp) throws Exception {
+ @Override
+ public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception {
+ try {
+ performCheckpoint(checkpointId, timestamp);
+ }
+ catch (CancelTaskException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new Exception("Error while performing a checkpoint", e);
+ }
+ }
+
+ @Override
+ public void abortCheckpointOnBarrier(long checkpointId) throws Exception {
+ LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", checkpointId, getName());
+
+ synchronized (lock) {
+ if (isRunning) {
+ operatorChain.broadcastCheckpointCancelMarker(checkpointId);
+ }
+ }
+ }
+
+ private boolean performCheckpoint(final long checkpointId, final long timestamp) throws Exception {
+
LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
-
+
synchronized (lock) {
if (isRunning) {
@@ -759,23 +782,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
return getName();
}
- protected final EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
- return new EventListener<CheckpointBarrier>() {
- @Override
- public void onEvent(CheckpointBarrier barrier) {
- try {
- performCheckpoint(barrier.getId(), barrier.getTimestamp());
- }
- catch (CancelTaskException e) {
- throw e;
- }
- catch (Exception e) {
- throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
- }
- }
- };
- }
-
// ------------------------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index c3305eb..9252063 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -68,7 +68,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
inputDeserializer1, inputDeserializer2,
- getCheckpointBarrierListener(),
+ this,
configuration.getCheckpointMode(),
getEnvironment().getIOManager(),
isSerializingTimestamps());