You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/27 15:18:24 UTC
[2/3] flink git commit: [FLINK-4685] [checkpoints] Gather sync/async
duration and alignment information for task checkpoints
[FLINK-4685] [checkpoints] Gather sync/async duration and alignment information for task checkpoints
This adds to each 'acknowledge checkpoint' message
- number of bytes buffered during alignment
- duration of alignment phase
- duration of synchronous part of the operator checkpoint
- duration of asynchronous part of the operator checkpoint
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1642e32
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1642e32
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1642e32
Branch: refs/heads/master
Commit: b1642e32c2f69c60c2b212260c3479feb66a9165
Parents: 6ea9284
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 26 14:10:21 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 27 14:58:41 2016 +0200
----------------------------------------------------------------------
.../state/RocksDBAsyncSnapshotTest.java | 19 ++--
.../flink/runtime/execution/Environment.java | 43 +++++++--
.../runtime/jobgraph/tasks/StatefulTask.java | 27 ++++--
.../checkpoint/AcknowledgeCheckpoint.java | 93 +++++++++++++++++---
.../ActorGatewayCheckpointResponder.java | 23 ++---
.../taskmanager/CheckpointResponder.java | 29 ++++--
.../runtime/taskmanager/RuntimeEnvironment.java | 27 ++++--
.../jobmanager/JobManagerHARecoveryTest.java | 11 ++-
.../operators/testutils/DummyEnvironment.java | 14 +--
.../operators/testutils/MockEnvironment.java | 15 ++--
.../runtime/taskmanager/TaskAsyncCallTest.java | 6 ++
.../streaming/runtime/io/BarrierBuffer.java | 83 +++++++++++------
.../streaming/runtime/io/BarrierTracker.java | 44 +++++----
.../streaming/runtime/io/BufferSpiller.java | 36 +++++---
.../runtime/io/CheckpointBarrierHandler.java | 30 +++++--
.../runtime/io/StreamInputProcessor.java | 24 +++--
.../runtime/io/StreamTwoInputProcessor.java | 16 ++--
.../runtime/tasks/OneInputStreamTask.java | 2 +-
.../streaming/runtime/tasks/StreamTask.java | 87 ++++++++++++------
.../runtime/tasks/TwoInputStreamTask.java | 2 +-
.../streaming/runtime/io/BarrierBufferTest.java | 88 ++++++++++++++----
.../runtime/io/BarrierTrackerTest.java | 40 +++++++--
.../runtime/tasks/OneInputStreamTaskTest.java | 21 +++--
.../runtime/tasks/StreamMockEnvironment.java | 14 +--
24 files changed, 576 insertions(+), 218 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index d5b9b46..c0c9ca1 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -136,15 +136,16 @@ public class RocksDBAsyncSnapshotTest {
testHarness.bufferSize) {
@Override
- public void acknowledgeCheckpoint(long checkpointId) {
- super.acknowledgeCheckpoint(checkpointId);
- }
-
- @Override
- public void acknowledgeCheckpoint(long checkpointId,
- ChainedStateHandle<StreamStateHandle> chainedStateHandle,
- List<KeyGroupsStateHandle> keyGroupStateHandles) {
- super.acknowledgeCheckpoint(checkpointId, chainedStateHandle, keyGroupStateHandles);
+ public void acknowledgeCheckpoint(
+ long checkpointId,
+ ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+ List<KeyGroupsStateHandle> keyGroupStateHandles,
+ long synchronousDurationMillis, long asynchronousDurationMillis,
+ long bytesBufferedInAlignment, long alignmentDurationNanos) {
+
+ super.acknowledgeCheckpoint(checkpointId, chainedStateHandle, keyGroupStateHandles,
+ synchronousDurationMillis, asynchronousDurationMillis,
+ bytesBufferedInAlignment, alignmentDurationNanos);
// block on the latch, to verify that triggerCheckpoint returns below,
// even though the async checkpoint would not finish
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 1eee9d4..273c0d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -164,23 +164,52 @@ public interface Environment {
* to for the checkpoint with the give checkpoint-ID. This method does not include
* any state in the checkpoint.
*
- * @param checkpointId The ID of the checkpoint.
+ * @param checkpointId
+ * The ID of the checkpoint.
+ * @param synchronousDurationMillis
+ * The duration (in milliseconds) of the synchronous part of the operator checkpoint
+ * @param asynchronousDurationMillis
+ * The duration (in milliseconds) of the asynchronous part of the operator checkpoint
+ * @param bytesBufferedInAlignment
+ * The number of bytes that were buffered during the checkpoint alignment phase
+ * @param alignmentDurationNanos
+ * The duration (in nanoseconds) that the stream alignment for the checkpoint took
*/
- void acknowledgeCheckpoint(long checkpointId);
+ void acknowledgeCheckpoint(
+ long checkpointId,
+ long synchronousDurationMillis,
+ long asynchronousDurationMillis,
+ long bytesBufferedInAlignment,
+ long alignmentDurationNanos);
/**
* Confirms that the invokable has successfully completed all required steps for
* the checkpoint with the give checkpoint-ID. This method does include
* the given state in the checkpoint.
*
- * @param checkpointId The ID of the checkpoint.
- * @param chainedStateHandle Handle for the chained operator state
- * @param keyGroupStateHandles Handles for key group state
+ * @param checkpointId
+ * The ID of the checkpoint.
+ * @param chainedStateHandle
+ * Handle for the chained operator state
+ * @param keyGroupStateHandles
+ * Handles for key group state
+ * @param synchronousDurationMillis
+ * The duration (in milliseconds) of the synchronous part of the operator checkpoint
+ * @param asynchronousDurationMillis
+ * The duration (in milliseconds) of the asynchronous part of the operator checkpoint
+ * @param bytesBufferedInAlignment
+ * The number of bytes that were buffered during the checkpoint alignment phase
+ * @param alignmentDurationNanos
+ * The duration (in nanoseconds) that the stream alignment for the checkpoint took
*/
void acknowledgeCheckpoint(
long checkpointId,
ChainedStateHandle<StreamStateHandle> chainedStateHandle,
- List<KeyGroupsStateHandle> keyGroupStateHandles);
+ List<KeyGroupsStateHandle> keyGroupStateHandles,
+ long synchronousDurationMillis,
+ long asynchronousDurationMillis,
+ long bytesBufferedInAlignment,
+ long alignmentDurationNanos);
/**
* Marks task execution failed for an external reason (a reason other than the task code itself
@@ -189,7 +218,7 @@ public interface Environment {
* Otherwise it sets the state to FAILED, and, if the invokable code is running,
* starts an asynchronous thread that aborts that code.
*
- * <p>This method never blocks.</p>
+ * <p>This method never blocks.
*/
void failExternally(Throwable cause);
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index cab7ed6..9ddfdf7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -22,7 +22,6 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
-
import java.util.List;
/**
@@ -41,18 +40,34 @@ public interface StatefulTask {
void setInitialState(ChainedStateHandle<StreamStateHandle> chainedState, List<KeyGroupsStateHandle> keyGroupsState) throws Exception;
/**
- * This method is either called directly and asynchronously by the checkpoint
- * coordinator (in the case of functions that are directly notified - usually
- * the data sources), or called synchronously when all incoming channels have
- * reported a checkpoint barrier.
+ * This method is called to trigger a checkpoint, asynchronously by the checkpoint
+ * coordinator.
+ *
+ * <p>This method is called for tasks that start the checkpoints by injecting the initial barriers,
+ * i.e., the source tasks. In contrast, checkpoints on downstream operators, which are the result of
+ * receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(long, long, long, long)}
+ * method.
*
- * @param checkpointId The ID of the checkpoint, incrementing.
+ * @param checkpointId The ID of the checkpoint, strictly incrementing.
* @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
*
* @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise
*/
boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
+ /**
+ * This method is called when a checkpoint is triggered as a result of receiving checkpoint
+ * barriers on all input streams.
+ *
+ * @param checkpointId The ID of the checkpoint, strictly incrementing.
+ * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
+ * @param bytesAligned The number of bytes that were buffered during the alignment of the streams.
+ * @param alignmentTimeNanos The time that the stream alignment took, in nanoseconds.
+ *
+ * @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
+ */
+ void triggerCheckpointOnBarrier(
+ long checkpointId, long timestamp, long bytesAligned, long alignmentTimeNanos) throws Exception;
/**
* Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index 0c56603..72396eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import java.util.List;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the
* {@link org.apache.flink.runtime.jobmanager.JobManager} to signal that the checkpoint of an
@@ -37,27 +39,74 @@ import java.util.List;
public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable {
private static final long serialVersionUID = -7606214777192401493L;
-
+
private final ChainedStateHandle<StreamStateHandle> stateHandle;
private final List<KeyGroupsStateHandle> keyGroupsStateHandle;
- public AcknowledgeCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
+ /** The duration (in milliseconds) that the synchronous part of the checkpoint took */
+ private final long synchronousDurationMillis;
+
+ /** The duration (in milliseconds) that the asynchronous part of the checkpoint took */
+ private final long asynchronousDurationMillis;
+
+ /** The number of bytes that were buffered during the checkpoint alignment phase */
+ private final long bytesBufferedInAlignment;
+
+ /** The duration (in nanoseconds) that the alignment phase of the task's checkpoint took */
+ private final long alignmentDurationNanos;
+
+ // ------------------------------------------------------------------------
+
+ public AcknowledgeCheckpoint(
+ JobID job,
+ ExecutionAttemptID taskExecutionId,
+ long checkpointId) {
this(job, taskExecutionId, checkpointId, null, null);
}
public AcknowledgeCheckpoint(
- JobID job,
- ExecutionAttemptID taskExecutionId,
- long checkpointId,
- ChainedStateHandle<StreamStateHandle> state,
- List<KeyGroupsStateHandle> keyGroupStateAndSizes) {
+ JobID job,
+ ExecutionAttemptID taskExecutionId,
+ long checkpointId,
+ ChainedStateHandle<StreamStateHandle> state,
+ List<KeyGroupsStateHandle> keyGroupStateAndSizes) {
+ this(job, taskExecutionId, checkpointId, state, keyGroupStateAndSizes, -1L, -1L, -1L, -1L);
+ }
+
+ public AcknowledgeCheckpoint(
+ JobID job,
+ ExecutionAttemptID taskExecutionId,
+ long checkpointId,
+ ChainedStateHandle<StreamStateHandle> state,
+ List<KeyGroupsStateHandle> keyGroupStateAndSizes,
+ long synchronousDurationMillis,
+ long asynchronousDurationMillis,
+ long bytesBufferedInAlignment,
+ long alignmentDurationNanos) {
super(job, taskExecutionId, checkpointId);
+
+ // these may be null in cases where the operator has no state
this.stateHandle = state;
this.keyGroupsStateHandle = keyGroupStateAndSizes;
+
+ // these may be "-1", in case the values are unknown or not set
+ checkArgument(synchronousDurationMillis >= -1);
+ checkArgument(asynchronousDurationMillis >= -1);
+ checkArgument(bytesBufferedInAlignment >= -1);
+ checkArgument(alignmentDurationNanos >= -1);
+
+ this.synchronousDurationMillis = synchronousDurationMillis;
+ this.asynchronousDurationMillis = asynchronousDurationMillis;
+ this.bytesBufferedInAlignment = bytesBufferedInAlignment;
+ this.alignmentDurationNanos = alignmentDurationNanos;
}
+ // ------------------------------------------------------------------------
+ // properties
+ // ------------------------------------------------------------------------
+
public ChainedStateHandle<StreamStateHandle> getStateHandle() {
return stateHandle;
}
@@ -66,8 +115,29 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
return keyGroupsStateHandle;
}
+ public long getSynchronousDurationMillis() {
+ return synchronousDurationMillis;
+ }
+
+ public long getAsynchronousDurationMillis() {
+ return asynchronousDurationMillis;
+ }
+
+ public long getBytesBufferedInAlignment() {
+ return bytesBufferedInAlignment;
+ }
+
+ public long getAlignmentDurationNanos() {
+ return alignmentDurationNanos;
+ }
+
// --------------------------------------------------------------------------------------------
-
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -76,9 +146,10 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
else if (o instanceof AcknowledgeCheckpoint) {
AcknowledgeCheckpoint that = (AcknowledgeCheckpoint) o;
return super.equals(o) &&
- (this.stateHandle == null ? that.stateHandle == null : (that.stateHandle != null && this.stateHandle.equals(that.stateHandle))) &&
- (this.keyGroupsStateHandle == null ? that.keyGroupsStateHandle == null : (that.keyGroupsStateHandle != null && this.keyGroupsStateHandle.equals(that.keyGroupsStateHandle)));
-
+ (this.stateHandle == null ? that.stateHandle == null :
+ (that.stateHandle != null && this.stateHandle.equals(that.stateHandle))) &&
+ (this.keyGroupsStateHandle == null ? that.keyGroupsStateHandle == null :
+ (that.keyGroupsStateHandle != null && this.keyGroupsStateHandle.equals(that.keyGroupsStateHandle)));
}
else {
return false;
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
index 56e5922..c317bed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
@@ -43,18 +43,21 @@ public class ActorGatewayCheckpointResponder implements CheckpointResponder {
@Override
public void acknowledgeCheckpoint(
- JobID jobID,
- ExecutionAttemptID executionAttemptID,
- long checkpointID,
- ChainedStateHandle<StreamStateHandle> chainedStateHandle,
- List<KeyGroupsStateHandle> keyGroupStateHandles) {
+ JobID jobID,
+ ExecutionAttemptID executionAttemptID,
+ long checkpointID,
+ ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+ List<KeyGroupsStateHandle> keyGroupStateHandles,
+ long synchronousDurationMillis,
+ long asynchronousDurationMillis,
+ long bytesBufferedInAlignment,
+ long alignmentDurationNanos) {
AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
- jobID,
- executionAttemptID,
- checkpointID,
- chainedStateHandle,
- keyGroupStateHandles);
+ jobID, executionAttemptID, checkpointID,
+ chainedStateHandle, keyGroupStateHandles,
+ synchronousDurationMillis, asynchronousDurationMillis,
+ bytesBufferedInAlignment, alignmentDurationNanos);
actorGateway.tell(message);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
index 9d5c4e1..b3f9827 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
@@ -34,18 +34,35 @@ public interface CheckpointResponder {
/**
* Acknowledges the given checkpoint.
*
- * @param jobID Job ID of the running job
- * @param executionAttemptID Execution attempt ID of the running task
- * @param checkpointID Checkpoint ID of the checkpoint
- * @param chainedStateHandle Chained state handle
- * @param keyGroupStateHandles State handles for key groups
+ * @param jobID
+ * Job ID of the running job
+ * @param executionAttemptID
+ * Execution attempt ID of the running task
+ * @param checkpointID
+ * Checkpoint ID of the checkpoint
+ * @param chainedStateHandle
+ * Chained state handle
+ * @param keyGroupStateHandles
+ * State handles for key groups
+ * @param synchronousDurationMillis
+ * The duration (in milliseconds) of the synchronous part of the operator checkpoint
+ * @param asynchronousDurationMillis
+ * The duration (in milliseconds) of the asynchronous part of the operator checkpoint
+ * @param bytesBufferedInAlignment
+ * The number of bytes that were buffered during the checkpoint alignment phase
+ * @param alignmentDurationNanos
+ * The duration (in nanoseconds) that the stream alignment for the checkpoint took
*/
void acknowledgeCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointID,
ChainedStateHandle<StreamStateHandle> chainedStateHandle,
- List<KeyGroupsStateHandle> keyGroupStateHandles);
+ List<KeyGroupsStateHandle> keyGroupStateHandles,
+ long synchronousDurationMillis,
+ long asynchronousDurationMillis,
+ long bytesBufferedInAlignment,
+ long alignmentDurationNanos);
/**
* Declines the given checkpoint.
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 3e4ba4d..23b6f82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -239,23 +239,34 @@ public class RuntimeEnvironment implements Environment {
}
@Override
- public void acknowledgeCheckpoint(long checkpointId) {
- acknowledgeCheckpoint(checkpointId, null, null);
+ public void acknowledgeCheckpoint(
+ long checkpointId,
+ long synchronousDurationMillis,
+ long asynchronousDurationMillis,
+ long bytesBufferedInAlignment,
+ long alignmentDurationNanos) {
+
+ acknowledgeCheckpoint(checkpointId, null, null,
+ synchronousDurationMillis, asynchronousDurationMillis,
+ bytesBufferedInAlignment, alignmentDurationNanos);
}
@Override
public void acknowledgeCheckpoint(
long checkpointId,
ChainedStateHandle<StreamStateHandle> chainedStateHandle,
- List<KeyGroupsStateHandle> keyGroupStateHandles) {
+ List<KeyGroupsStateHandle> keyGroupStateHandles,
+ long synchronousDurationMillis,
+ long asynchronousDurationMillis,
+ long bytesBufferedInAlignment,
+ long alignmentDurationNanos) {
checkpointResponder.acknowledgeCheckpoint(
- jobId,
- executionId,
- checkpointId,
- chainedStateHandle,
- keyGroupStateHandles);
+ jobId, executionId, checkpointId,
+ chainedStateHandle, keyGroupStateHandles,
+ synchronousDurationMillis, asynchronousDurationMillis,
+ bytesBufferedInAlignment, alignmentDurationNanos);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 6b80c3d..ef8e3bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -453,12 +453,15 @@ public class JobManagerHARecoveryTest {
try {
ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(
InstantiationUtil.serializeObject(checkpointId));
+
RetrievableStreamStateHandle<Long> state = new RetrievableStreamStateHandle<Long>(byteStreamStateHandle);
ChainedStateHandle<StreamStateHandle> chainedStateHandle = new ChainedStateHandle<StreamStateHandle>(Collections.singletonList(state));
+
getEnvironment().acknowledgeCheckpoint(
checkpointId,
chainedStateHandle,
- Collections.<KeyGroupsStateHandle>emptyList());
+ Collections.<KeyGroupsStateHandle>emptyList(),
+ 0L, 0L, 0L, 0L);
return true;
} catch (Exception ex) {
throw new RuntimeException(ex);
@@ -466,6 +469,12 @@ public class JobManagerHARecoveryTest {
}
@Override
+ public void triggerCheckpointOnBarrier(
+ long checkpointId, long timestamp, long bytesAligned, long alignmentTimeNanos) throws Exception {
+ throw new UnsupportedOperationException("should not be called!");
+ }
+
+ @Override
public void notifyCheckpointComplete(long checkpointId) {
if (completedCheckpoints++ > NUM_CHECKPOINTS_TO_COMPLETE) {
completedCheckpointsLatch.countDown();
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 4654507..a857d1b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -153,14 +153,18 @@ public class DummyEnvironment implements Environment {
}
@Override
- public void acknowledgeCheckpoint(long checkpointId) {
-
+ public void acknowledgeCheckpoint(
+ long checkpointId,
+ long synchronousDurationMillis, long asynchronousDurationMillis,
+ long bytesBufferedInAlignment, long alignmentDurationNanos) {
}
@Override
- public void acknowledgeCheckpoint(long checkpointId,
- ChainedStateHandle<StreamStateHandle> chainedStateHandle,
- List<KeyGroupsStateHandle> keyGroupStateHandles) {
+ public void acknowledgeCheckpoint(
+ long checkpointId,
+ ChainedStateHandle<StreamStateHandle> chainedStateHandle, List<KeyGroupsStateHandle> keyGroupStateHandles,
+ long synchronousDurationMillis, long asynchronousDurationMillis,
+ long bytesBufferedInAlignment, long alignmentDurationNanos) {
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index e7bf6e1..75e88eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -314,15 +314,18 @@ public class MockEnvironment implements Environment {
}
@Override
- public void acknowledgeCheckpoint(long checkpointId) {
- throw new UnsupportedOperationException();
+ public void acknowledgeCheckpoint(
+ long checkpointId,
+ long synchronousDurationMillis, long asynchronousDurationMillis,
+ long bytesBufferedInAlignment, long alignmentDurationNanos) {
}
@Override
- public void acknowledgeCheckpoint(long checkpointId,
- ChainedStateHandle<StreamStateHandle> chainedStateHandle,
- List<KeyGroupsStateHandle> keyGroupStateHandles) {
- throw new UnsupportedOperationException();
+ public void acknowledgeCheckpoint(
+ long checkpointId,
+ ChainedStateHandle<StreamStateHandle> chainedStateHandle, List<KeyGroupsStateHandle> keyGroupStateHandles,
+ long synchronousDurationMillis, long asynchronousDurationMillis,
+ long bytesBufferedInAlignment, long alignmentDurationNanos) {
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index a5f4019..ed30fd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -228,6 +228,12 @@ public class TaskAsyncCallTest {
}
@Override
+ public void triggerCheckpointOnBarrier(
+ long checkpointId, long timestamp, long bytesAligned, long alignmentTimeNanos) throws Exception {
+ throw new UnsupportedOperationException("Should not be called");
+ }
+
+ @Override
public void notifyCheckpointComplete(long checkpointId) {
if (checkpointId != lastCheckpointId && this.error == null) {
this.error = new Exception("calls out of order");
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index dcd76c6..d60c999 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.slf4j.Logger;
@@ -37,22 +37,22 @@ import org.slf4j.LoggerFactory;
*
* <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
* BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until
- * the blocks are released.</p>
+ * the blocks are released.
*/
@Internal
public class BarrierBuffer implements CheckpointBarrierHandler {
private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
-
+
/** The gate that the buffer draws its input from */
private final InputGate inputGate;
/** Flags that indicate whether a channel is currently blocked/buffered */
private final boolean[] blockedChannels;
-
+
/** The total number of channels that this buffer handles data from */
private final int totalNumberOfInputChannels;
-
+
/** To utility to write blocked data to a file channel */
private final BufferSpiller bufferSpiller;
@@ -65,17 +65,23 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
/** Handler that receives the checkpoint notifications */
- private EventListener<CheckpointBarrier> checkpointHandler;
+ private StatefulTask toNotifyOnCheckpoint;
/** The ID of the checkpoint for which we expect barriers */
private long currentCheckpointId = -1L;
/** The number of received barriers (= number of blocked/buffered channels) */
private int numBarriersReceived;
-
+
/** The number of already closed channels */
private int numClosedChannels;
-
+
+ /** The timestamp as in {@link System#nanoTime()} at which the last alignment started */
+ private long startOfAlignmentTimestamp;
+
+ /** The time (in nanoseconds) that the latest alignment took */
+ private long latestAlignmentDurationNanos;
+
/** Flag to indicate whether we have drawn all available input */
private boolean endOfStream;
@@ -100,7 +106,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
// ------------------------------------------------------------------------
@Override
- public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+ public BufferOrEvent getNextNonBlocked() throws Exception {
while (true) {
// process buffered BufferOrEvents before grabbing new ones
BufferOrEvent next;
@@ -114,7 +120,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
return getNextNonBlocked();
}
}
-
+
if (next != null) {
if (isBlocked(next.getChannelIndex())) {
// if the channel is blocked we, we just store the BufferOrEvent
@@ -139,12 +145,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
}
else if (!endOfStream) {
- // end of stream. we feed the data that is still buffered
+ // end of input stream. stream continues with the buffered data
endOfStream = true;
releaseBlocks();
return getNextNonBlocked();
}
else {
+ // final end of both input and buffered data
return null;
}
}
@@ -158,7 +165,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
}
- private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws IOException {
+ private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
final long barrierId = receivedBarrier.getId();
if (numBarriersReceived > 0) {
@@ -175,6 +182,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
releaseBlocks();
currentCheckpointId = barrierId;
onBarrier(channelIndex);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting stream alignment for checkpoint {}", barrierId);
+ }
+ startOfAlignmentTimestamp = System.nanoTime();
}
else {
// ignore trailing barrier from aborted checkpoint
@@ -186,6 +198,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
// first barrier of a new checkpoint
currentCheckpointId = barrierId;
onBarrier(channelIndex);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting stream alignment for checkpoint {}", barrierId);
+ }
+ startOfAlignmentTimestamp = System.nanoTime();
}
else {
// trailing barrier from previous (skipped) checkpoint
@@ -199,21 +216,23 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
receivedBarrier.getId(), receivedBarrier.getTimestamp());
}
- if (checkpointHandler != null) {
- checkpointHandler.onEvent(receivedBarrier);
- }
-
releaseBlocks();
+
+ if (toNotifyOnCheckpoint != null) {
+ toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+ receivedBarrier.getId(), receivedBarrier.getTimestamp(),
+ bufferSpiller.getBytesWritten(), latestAlignmentDurationNanos);
+ }
}
}
@Override
- public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
- if (this.checkpointHandler == null) {
- this.checkpointHandler = checkpointHandler;
+ public void registerCheckpointEventHandler(StatefulTask toNotifyOnCheckpoint) {
+ if (this.toNotifyOnCheckpoint == null) {
+ this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
}
else {
- throw new IllegalStateException("BarrierBuffer already has a registered checkpoint handler");
+ throw new IllegalStateException("BarrierBuffer already has a registered checkpoint notifyee");
}
}
@@ -267,9 +286,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
* is the next to be consumed.
*/
private void releaseBlocks() throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Releasing blocks");
- }
+ LOG.debug("End of stream alignment, feeding buffered data back");
for (int i = 0; i < blockedChannels.length; i++) {
blockedChannels[i] = false;
@@ -295,10 +312,16 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
currentBuffered = bufferedNow;
}
}
+
+ final long now = System.nanoTime();
+ if (startOfAlignmentTimestamp > 0) {
+ latestAlignmentDurationNanos = now - startOfAlignmentTimestamp;
+ startOfAlignmentTimestamp = 0;
+ }
}
// ------------------------------------------------------------------------
- // For Testing
+ // Properties
// ------------------------------------------------------------------------
/**
@@ -309,7 +332,17 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
public long getCurrentCheckpointId() {
return this.currentCheckpointId;
}
-
+
+ @Override
+ public long getAlignmentDurationNanos() {
+ long start = this.startOfAlignmentTimestamp;
+ if (start <= 0) {
+ return latestAlignmentDurationNanos;
+ } else {
+ return System.nanoTime() - start;
+ }
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 9c9ec4f..1db5845 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -21,10 +21,9 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import java.io.IOException;
import java.util.ArrayDeque;
/**
@@ -57,11 +56,12 @@ public class BarrierTracker implements CheckpointBarrierHandler {
private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
/** The listener to be notified on complete checkpoints */
- private EventListener<CheckpointBarrier> checkpointHandler;
+ private StatefulTask toNotifyOnCheckpoint;
/** The highest checkpoint ID encountered so far */
private long latestPendingCheckpointID = -1;
-
+
+ // ------------------------------------------------------------------------
public BarrierTracker(InputGate inputGate) {
this.inputGate = inputGate;
@@ -70,7 +70,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
@Override
- public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+ public BufferOrEvent getNextNonBlocked() throws Exception {
while (true) {
BufferOrEvent next = inputGate.getNextBufferOrEvent();
if (next == null) {
@@ -86,12 +86,12 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
@Override
- public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
- if (this.checkpointHandler == null) {
- this.checkpointHandler = checkpointHandler;
+ public void registerCheckpointEventHandler(StatefulTask toNotifyOnCheckpoint) {
+ if (this.toNotifyOnCheckpoint == null) {
+ this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
}
else {
- throw new IllegalStateException("BarrierTracker already has a registered checkpoint handler");
+ throw new IllegalStateException("BarrierTracker already has a registered checkpoint notifyee");
}
}
@@ -105,22 +105,29 @@ public class BarrierTracker implements CheckpointBarrierHandler {
return pendingCheckpoints.isEmpty();
}
- private void processBarrier(CheckpointBarrier receivedBarrier) {
+ @Override
+ public long getAlignmentDurationNanos() {
+ // this one does not do alignment at all
+ return 0L;
+ }
+
+ private void processBarrier(CheckpointBarrier receivedBarrier) throws Exception {
// fast path for single channel trackers
if (totalNumberOfInputChannels == 1) {
- if (checkpointHandler != null) {
- checkpointHandler.onEvent(receivedBarrier);
+ if (toNotifyOnCheckpoint != null) {
+ toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+ receivedBarrier.getId(), receivedBarrier.getTimestamp(), 0L, 0L);
}
return;
}
-
+
// general path for multiple input channels
final long barrierId = receivedBarrier.getId();
// find the checkpoint barrier in the queue of bending barriers
CheckpointBarrierCount cbc = null;
int pos = 0;
-
+
for (CheckpointBarrierCount next : pendingCheckpoints) {
if (next.checkpointId == barrierId) {
cbc = next;
@@ -128,7 +135,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
pos++;
}
-
+
if (cbc != null) {
// add one to the count to that barrier and check for completion
int numBarriersNew = cbc.incrementBarrierCount();
@@ -141,8 +148,9 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
// notify the listener
- if (checkpointHandler != null) {
- checkpointHandler.onEvent(receivedBarrier);
+ if (toNotifyOnCheckpoint != null) {
+ toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+ receivedBarrier.getId(), receivedBarrier.getTimestamp(), 0L, 0L);
}
}
}
@@ -182,7 +190,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
public int incrementBarrierCount() {
return ++barrierCount;
}
-
+
@Override
public int hashCode() {
return (int) ((checkpointId >>> 32) ^ checkpointId) + 17 * barrierCount;
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 1b38a56..5a8a4cd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -85,8 +85,8 @@ public class BufferSpiller {
/** A counter, to created numbered spill files */
private int fileCounter;
- /** A flag to check whether the spiller has written since the last roll over */
- private boolean hasWritten;
+ /** The number of bytes written since the last roll over */
+ private long bytesWritten;
/**
* Creates a new buffer spiller, spilling to one of the I/O manager's temp directories.
@@ -124,7 +124,6 @@ public class BufferSpiller {
* @throws IOException Thrown, if the buffer of event could not be spilled.
*/
public void add(BufferOrEvent boe) throws IOException {
- hasWritten = true;
try {
ByteBuffer contents;
if (boe.isBuffer()) {
@@ -133,6 +132,7 @@ public class BufferSpiller {
}
else {
contents = EventSerializer.toSerializedEvent(boe.getEvent());
+
}
headBuffer.clear();
@@ -140,7 +140,9 @@ public class BufferSpiller {
headBuffer.putInt(contents.remaining());
headBuffer.put((byte) (boe.isBuffer() ? 0 : 1));
headBuffer.flip();
-
+
+ bytesWritten += (headBuffer.remaining() + contents.remaining());
+
sources[1] = contents;
currentChannel.write(sources);
}
@@ -186,10 +188,10 @@ public class BufferSpiller {
}
private SpilledBufferOrEventSequence rollOverInternal(boolean newBuffer) throws IOException {
- if (!hasWritten) {
+ if (bytesWritten == 0) {
return null;
}
-
+
ByteBuffer buf;
if (newBuffer) {
buf = ByteBuffer.allocateDirect(READ_BUFFER_SIZE);
@@ -197,16 +199,16 @@ public class BufferSpiller {
} else {
buf = readBuffer;
}
-
+
// create a reader for the spilled data
currentChannel.position(0L);
SpilledBufferOrEventSequence seq =
new SpilledBufferOrEventSequence(currentSpillFile, currentChannel, buf, pageSize);
-
+
// create ourselves a new spill file
createSpillingChannel();
-
- hasWritten = false;
+
+ bytesWritten = 0L;
return seq;
}
@@ -225,6 +227,14 @@ public class BufferSpiller {
}
}
+ /**
+ * Gets the number of bytes written in the current spill file.
+ * @return the number of bytes written in the current spill file
+ */
+ public long getBytesWritten() {
+ return bytesWritten;
+ }
+
// ------------------------------------------------------------------------
// For testing
// ------------------------------------------------------------------------
@@ -255,16 +265,16 @@ public class BufferSpiller {
* method {@link #getNext()}.
*/
public static class SpilledBufferOrEventSequence {
-
+
/** Header is "channel index" (4 bytes) + length (4 bytes) + buffer/event (1 byte) */
private static final int HEADER_LENGTH = 9;
/** The file containing the data */
private final File file;
-
+
/** The file channel to draw the data from */
private final FileChannel fileChannel;
-
+
/** The byte buffer for bulk reading */
private final ByteBuffer buffer;
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 5aa2030..56859fb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -20,8 +20,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import java.io.IOException;
@@ -39,18 +38,22 @@ public interface CheckpointBarrierHandler {
* has been determined to be finished.
*
* @return The next BufferOrEvent, or {@code null}, if the stream is finished.
- * @throws java.io.IOException Thrown if the network or local disk I/O fails.
- * @throws java.lang.InterruptedException Thrown if the thread is interrupted while blocking during
- * waiting for the next BufferOrEvent to become available.
+ *
+ * @throws IOException Thrown if the network or local disk I/O fails.
+ *
+ * @throws InterruptedException Thrown if the thread is interrupted while blocking during
+ * waiting for the next BufferOrEvent to become available.
+ * @throws Exception Thrown in case that a checkpoint fails that is started as the result of receiving
+ * the last checkpoint barrier
*/
- BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException;
+ BufferOrEvent getNextNonBlocked() throws Exception;
/**
- * Registers the given event handler to be notified on successful checkpoints.
+ * Registers the task be notified once all checkpoint barriers have been received for a checkpoint.
*
- * @param checkpointHandler The handler to register.
+ * @param task The task to notify
*/
- void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler);
+ void registerCheckpointEventHandler(StatefulTask task);
/**
* Cleans up all internally held resources.
@@ -64,4 +67,13 @@ public interface CheckpointBarrierHandler {
* @return {@code True}, if no data is buffered internally, {@code false} otherwise.
*/
boolean isEmpty();
+
+ /**
+ * Gets the time that the latest alignment took, in nanoseconds.
+ * If there is currently an alignment in progress, it will return the time spent in the
+ * current alignment so far.
+ *
+ * @return The duration in nanoseconds
+ */
+ long getAlignmentDurationNanos();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index d11990e..85e9297 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -24,6 +24,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
@@ -37,7 +38,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -45,7 +45,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
/**
* Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
@@ -85,10 +84,10 @@ public class StreamInputProcessor<IN> {
@SuppressWarnings("unchecked")
public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
- EventListener<CheckpointBarrier> checkpointListener,
- CheckpointingMode checkpointMode,
- IOManager ioManager,
- boolean enableWatermarkMultiplexing) throws IOException {
+ StatefulTask checkpointedTask,
+ CheckpointingMode checkpointMode,
+ IOManager ioManager,
+ boolean enableWatermarkMultiplexing) throws IOException {
InputGate inputGate = InputGateUtil.createInputGate(inputGates);
@@ -102,8 +101,8 @@ public class StreamInputProcessor<IN> {
throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode);
}
- if (checkpointListener != null) {
- this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
+ if (checkpointedTask != null) {
+ this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
}
if (enableWatermarkMultiplexing) {
@@ -215,7 +214,7 @@ public class StreamInputProcessor<IN> {
* Sets the metric group for this StreamInputProcessor.
*
* @param metrics metric group
- */
+ */
public void setMetricGroup(IOMetricGroup metrics) {
metrics.gauge("currentLowWatermark", new Gauge<Long>() {
@Override
@@ -223,6 +222,13 @@ public class StreamInputProcessor<IN> {
return lastEmittedWatermark;
}
});
+
+ metrics.gauge("checkpointAlignmentTime", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return barrierHandler.getAlignmentDurationNanos();
+ }
+ });
}
public void cleanup() throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index ce764b7..70ce783 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
@@ -34,7 +35,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import java.io.IOException;
import java.util.Arrays;
@@ -95,7 +94,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
Collection<InputGate> inputGates2,
TypeSerializer<IN1> inputSerializer1,
TypeSerializer<IN2> inputSerializer2,
- EventListener<CheckpointBarrier> checkpointListener,
+ StatefulTask checkpointedTask,
CheckpointingMode checkpointMode,
IOManager ioManager,
boolean enableWatermarkMultiplexing) throws IOException {
@@ -112,8 +111,8 @@ public class StreamTwoInputProcessor<IN1, IN2> {
throw new IllegalArgumentException("Unrecognized CheckpointingMode: " + checkpointMode);
}
- if (checkpointListener != null) {
- this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
+ if (checkpointedTask != null) {
+ this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
}
if (enableWatermarkMultiplexing) {
@@ -294,6 +293,13 @@ public class StreamTwoInputProcessor<IN1, IN2> {
return Math.min(lastEmittedWatermark1, lastEmittedWatermark2);
}
});
+
+ metrics.gauge("checkpointAlignmentTime", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return barrierHandler.getAlignmentDurationNanos();
+ }
+ });
}
public void cleanup() throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 938d8c1..d6d2fb5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -43,7 +43,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
if (numberOfInputs > 0) {
InputGate[] inputGates = getEnvironment().getAllInputGates();
inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
- getCheckpointBarrierListener(),
+ this,
configuration.getCheckpointMode(),
getEnvironment().getIOManager(),
isSerializingTimestamps());
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index ff074b7..7976f01 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -26,7 +26,6 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -41,7 +40,6 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
-import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
@@ -589,7 +587,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
@Override
public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
try {
- return performCheckpoint(checkpointId, timestamp);
+ return performCheckpoint(checkpointId, timestamp, 0L, 0L);
}
catch (Exception e) {
// propagate exceptions only if the task is still in "running" state
@@ -601,11 +599,31 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
}
- private boolean performCheckpoint(final long checkpointId, final long timestamp) throws Exception {
+ @Override
+ public void triggerCheckpointOnBarrier(
+ long checkpointId, long timestamp, long bytesAligned, long alignmentDurationNanos) throws Exception {
+
+ try {
+ performCheckpoint(checkpointId, timestamp, bytesAligned, alignmentDurationNanos);
+ }
+ catch (CancelTaskException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new Exception("Error while performing a checkpoint", e);
+ }
+ }
+
+ private boolean performCheckpoint(
+ long checkpointId, long timestamp, long bytesBufferedAlignment, long alignmentDurationNanos) throws Exception {
+
LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
+
synchronized (lock) {
if (isRunning) {
+ final long startOfSyncPart = System.nanoTime();
+
// Since both state checkpointing and downstream barrier emission occurs in this
// lock scope, they are an atomic operation regardless of the order in which they occur.
// Given this, we immediately emit the checkpoint barriers, so the downstream operators
@@ -654,13 +672,20 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", checkpointId, getName());
+ final long endOfSyncPart = System.nanoTime();
+ final long syncDurationMillis = (endOfSyncPart - startOfSyncPart) / 1_000_000;
+
AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
"checkpoint-" + checkpointId + "-" + timestamp,
this,
cancelables,
chainedStateHandles,
keyGroupsStateHandleFuture,
- checkpointId);
+ checkpointId,
+ bytesBufferedAlignment,
+ alignmentDurationNanos,
+ syncDurationMillis,
+ endOfSyncPart);
synchronized (cancelables) {
cancelables.add(asyncCheckpointRunnable);
@@ -851,29 +876,12 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
-
+
@Override
public String toString() {
return getName();
}
- final EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
- return new EventListener<CheckpointBarrier>() {
- @Override
- public void onEvent(CheckpointBarrier barrier) {
- try {
- performCheckpoint(barrier.getId(), barrier.getTimestamp());
- }
- catch (CancelTaskException e) {
- throw e;
- }
- catch (Exception e) {
- throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
- }
- }
- };
- }
-
// ------------------------------------------------------------------------
private static class AsyncCheckpointRunnable implements Runnable, Closeable {
@@ -890,13 +898,25 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
private final String name;
+ private final long bytesBufferedInAlignment;
+
+ private final long alignmentDurationNanos;
+
+ private final long syncDurationMillies;
+
+ private final long asyncStartNanos;
+
AsyncCheckpointRunnable(
String name,
StreamTask<?, ?> owner,
Set<Closeable> cancelables,
ChainedStateHandle<StreamStateHandle> chainedStateHandles,
RunnableFuture<KeyGroupsStateHandle> keyGroupsStateHandleFuture,
- long checkpointId) {
+ long checkpointId,
+ long bytesBufferedInAlignment,
+ long alignmentDurationNanos,
+ long syncDurationMillies,
+ long asyncStartNanos) {
this.name = name;
this.owner = owner;
@@ -904,6 +924,10 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
this.chainedStateHandles = chainedStateHandles;
this.keyGroupsStateHandleFuture = keyGroupsStateHandleFuture;
this.checkpointId = checkpointId;
+ this.bytesBufferedInAlignment = bytesBufferedInAlignment;
+ this.alignmentDurationNanos = alignmentDurationNanos;
+ this.syncDurationMillies = syncDurationMillies;
+ this.asyncStartNanos = asyncStartNanos;
}
@Override
@@ -925,19 +949,26 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
}
+ final long asyncEndNanos = System.nanoTime();
+ final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000;
+
if (chainedStateHandles.isEmpty() && keyedStates.isEmpty()) {
- owner.getEnvironment().acknowledgeCheckpoint(checkpointId);
+ owner.getEnvironment().acknowledgeCheckpoint(checkpointId,
+ syncDurationMillies, asyncDurationMillis,
+ bytesBufferedInAlignment, alignmentDurationNanos);
} else {
- owner.getEnvironment().acknowledgeCheckpoint(checkpointId, chainedStateHandles, keyedStates);
+ owner.getEnvironment().acknowledgeCheckpoint(checkpointId,
+ chainedStateHandles, keyedStates,
+ syncDurationMillies, asyncDurationMillis,
+ bytesBufferedInAlignment, alignmentDurationNanos);
}
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}. Returning handles on " +
"keyed states {}.", checkpointId, name, keyedStates);
}
}
catch (Exception e) {
-
// registers the exception and tries to fail the whole task
AsynchronousException asyncException = new AsynchronousException(e);
owner.registerAsyncException(asyncException);
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index c3305eb..9252063 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -68,7 +68,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
inputDeserializer1, inputDeserializer2,
- getCheckpointBarrierListener(),
+ this,
configuration.getCheckpointMode(),
getEnvironment().getIOManager(),
isSerializingTimestamps());
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index d4fdc59..b549ef8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.runtime.io;
-import org.apache.flink.core.memory.HeapMemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -26,7 +25,10 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.junit.AfterClass;
@@ -35,6 +37,7 @@ import org.junit.Test;
import java.io.File;
import java.util.Arrays;
+import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -86,7 +89,9 @@ public class BarrierBufferTest {
for (BufferOrEvent boe : sequence) {
assertEquals(boe, buffer.getNextNonBlocked());
}
-
+
+ assertEquals(0L, buffer.getAlignmentDurationNanos());
+
assertNull(buffer.getNextNonBlocked());
assertNull(buffer.getNextNonBlocked());
@@ -120,6 +125,8 @@ public class BarrierBufferTest {
assertEquals(boe, buffer.getNextNonBlocked());
}
+ assertEquals(0L, buffer.getAlignmentDurationNanos());
+
assertNull(buffer.getNextNonBlocked());
assertNull(buffer.getNextNonBlocked());
@@ -222,13 +229,15 @@ public class BarrierBufferTest {
ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
buffer.registerCheckpointEventHandler(handler);
handler.setNextExpectedCheckpointId(1L);
-
+
// pre checkpoint 1
check(sequence[0], buffer.getNextNonBlocked());
check(sequence[1], buffer.getNextNonBlocked());
check(sequence[2], buffer.getNextNonBlocked());
assertEquals(1L, handler.getNextExpectedCheckpointId());
+ long startTs = System.nanoTime();
+
// blocking while aligning for checkpoint 1
check(sequence[7], buffer.getNextNonBlocked());
assertEquals(1L, handler.getNextExpectedCheckpointId());
@@ -236,6 +245,8 @@ public class BarrierBufferTest {
// checkpoint 1 done, returning buffered data
check(sequence[5], buffer.getNextNonBlocked());
assertEquals(2L, handler.getNextExpectedCheckpointId());
+ validateAlignmentTime(startTs, buffer);
+
check(sequence[6], buffer.getNextNonBlocked());
// pre checkpoint 2
@@ -245,10 +256,13 @@ public class BarrierBufferTest {
check(sequence[12], buffer.getNextNonBlocked());
check(sequence[13], buffer.getNextNonBlocked());
assertEquals(2L, handler.getNextExpectedCheckpointId());
-
+
// checkpoint 2 barriers come together
+ startTs = System.nanoTime();
check(sequence[17], buffer.getNextNonBlocked());
assertEquals(3L, handler.getNextExpectedCheckpointId());
+ validateAlignmentTime(startTs, buffer);
+
check(sequence[18], buffer.getNextNonBlocked());
// checkpoint 3 starts, data buffered
@@ -257,7 +271,7 @@ public class BarrierBufferTest {
check(sequence[21], buffer.getNextNonBlocked());
// checkpoint 4 happens without extra data
-
+
// pre checkpoint 5
check(sequence[27], buffer.getNextNonBlocked());
assertEquals(5L, handler.getNextExpectedCheckpointId());
@@ -301,7 +315,7 @@ public class BarrierBufferTest {
BufferOrEvent[] sequence = {
createBuffer(0), createBuffer(1), createBuffer(2),
createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0),
-
+
createBuffer(2), createBuffer(1), createBuffer(0),
createBarrier(2, 1),
createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2),
@@ -327,12 +341,14 @@ public class BarrierBufferTest {
assertEquals(2L, handler.getNextExpectedCheckpointId());
check(sequence[7], buffer.getNextNonBlocked());
check(sequence[8], buffer.getNextNonBlocked());
-
+
// checkpoint 2 alignment
+ long startTs = System.nanoTime();
check(sequence[13], buffer.getNextNonBlocked());
check(sequence[14], buffer.getNextNonBlocked());
check(sequence[18], buffer.getNextNonBlocked());
check(sequence[19], buffer.getNextNonBlocked());
+ validateAlignmentTime(startTs, buffer);
// end of stream: remaining buffered contents
check(sequence[10], buffer.getNextNonBlocked());
@@ -343,7 +359,7 @@ public class BarrierBufferTest {
assertNull(buffer.getNextNonBlocked());
assertNull(buffer.getNextNonBlocked());
-
+
buffer.cleanup();
checkNoTempFilesRemain();
@@ -389,7 +405,7 @@ public class BarrierBufferTest {
createBarrier(3, 2),
createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
createBarrier(6, 1),
-
+
// complete checkpoint 4, checkpoint 5 remains not fully triggered
createBarrier(4, 2),
createBuffer(2),
@@ -419,10 +435,13 @@ public class BarrierBufferTest {
// alignment of checkpoint 2 - buffering also some barriers for
// checkpoints 3 and 4
+ long startTs = System.nanoTime();
check(sequence[13], buffer.getNextNonBlocked());
check(sequence[20], buffer.getNextNonBlocked());
check(sequence[23], buffer.getNextNonBlocked());
-
+
+ validateAlignmentTime(startTs, buffer);
+
// checkpoint 2 completed
check(sequence[12], buffer.getNextNonBlocked());
check(sequence[25], buffer.getNextNonBlocked());
@@ -613,17 +632,21 @@ public class BarrierBufferTest {
check(sequence[19], buffer.getNextNonBlocked());
check(sequence[21], buffer.getNextNonBlocked());
+ long startTs = System.nanoTime();
+
// checkpoint 2 aborted, checkpoint 4 started. replay buffered
check(sequence[12], buffer.getNextNonBlocked());
assertEquals(4L, buffer.getCurrentCheckpointId());
check(sequence[16], buffer.getNextNonBlocked());
check(sequence[18], buffer.getNextNonBlocked());
check(sequence[22], buffer.getNextNonBlocked());
-
+
// align checkpoint 4 remainder
check(sequence[25], buffer.getNextNonBlocked());
check(sequence[26], buffer.getNextNonBlocked());
-
+
+ validateAlignmentTime(startTs, buffer);
+
// checkpoint 4 aborted (due to end of partition)
check(sequence[24], buffer.getNextNonBlocked());
check(sequence[27], buffer.getNextNonBlocked());
@@ -926,12 +949,17 @@ public class BarrierBufferTest {
}
}
}
-
+
+ private static void validateAlignmentTime(long startTimestamp, BarrierBuffer buffer) {
+ final long elapsed = System.nanoTime() - startTimestamp;
+ assertTrue("wrong alignment time", buffer.getAlignmentDurationNanos() <= elapsed);
+ }
+
// ------------------------------------------------------------------------
// Testing Mocks
// ------------------------------------------------------------------------
- private static class ValidatingCheckpointHandler implements EventListener<CheckpointBarrier> {
+ private static class ValidatingCheckpointHandler implements StatefulTask {
private long nextExpectedCheckpointId = -1L;
@@ -944,11 +972,33 @@ public class BarrierBufferTest {
}
@Override
- public void onEvent(CheckpointBarrier barrier) {
- assertNotNull(barrier);
- assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == barrier.getId());
- assertTrue(barrier.getTimestamp() > 0);
+ public void setInitialState(
+ ChainedStateHandle<StreamStateHandle> chainedState,
+ List<KeyGroupsStateHandle> keyGroupsState) throws Exception {
+ throw new UnsupportedOperationException("should never be called");
+ }
+
+ @Override
+ public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+ throw new UnsupportedOperationException("should never be called");
+ }
+
+ @Override
+ public void triggerCheckpointOnBarrier(
+ long checkpointId, long timestamp,
+ long bytesAligned, long alignmentTimeNanos) throws Exception {
+
+ assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == checkpointId);
+ assertTrue(timestamp > 0);
+ assertTrue(bytesAligned >= 0);
+ assertTrue(alignmentTimeNanos >= 0);
+
nextExpectedCheckpointId++;
}
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ throw new UnsupportedOperationException("should never be called");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index b9b6e5f..314dcc4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -22,12 +22,16 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.junit.Test;
import java.util.Arrays;
+import java.util.List;
import static org.junit.Assert.*;
@@ -346,22 +350,42 @@ public class BarrierTrackerTest {
// Testing Mocks
// ------------------------------------------------------------------------
- private static class CheckpointSequenceValidator implements EventListener<CheckpointBarrier> {
+ private static class CheckpointSequenceValidator implements StatefulTask {
private final long[] checkpointIDs;
-
+
private int i = 0;
private CheckpointSequenceValidator(long... checkpointIDs) {
this.checkpointIDs = checkpointIDs;
}
-
+
@Override
- public void onEvent(CheckpointBarrier barrier) {
+ public void setInitialState(
+ ChainedStateHandle<StreamStateHandle> chainedState,
+ List<KeyGroupsStateHandle> keyGroupsState) throws Exception {
+
+ throw new UnsupportedOperationException("should never be called");
+ }
+
+ @Override
+ public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+ throw new UnsupportedOperationException("should never be called");
+ }
+
+ @Override
+ public void triggerCheckpointOnBarrier(
+ long checkpointId, long timestamp,
+ long bytesAligned, long alignmentTimeNanos) throws Exception {
+
assertTrue("More checkpoints than expected", i < checkpointIDs.length);
- assertNotNull(barrier);
- assertEquals("wrong checkpoint id", checkpointIDs[i++], barrier.getId());
- assertTrue(barrier.getTimestamp() > 0);
+ assertEquals("wrong checkpoint id", checkpointIDs[i++], checkpointId);
+ assertTrue(timestamp > 0);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ throw new UnsupportedOperationException("should never be called");
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 7ef0080..88fb383 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -478,27 +478,32 @@ public class OneInputStreamTaskTest extends TestLogger {
List<KeyGroupsStateHandle> getKeyGroupStates() {
List<KeyGroupsStateHandle> result = new ArrayList<>();
- for (int i = 0; i < keyGroupStates.size(); i++) {
- if (keyGroupStates.get(i) != null) {
- result.add(keyGroupStates.get(i));
+ for (KeyGroupsStateHandle keyGroupState : keyGroupStates) {
+ if (keyGroupState != null) {
+ result.add(keyGroupState);
}
}
return result;
}
- AcknowledgeStreamMockEnvironment(Configuration jobConfig, Configuration taskConfig,
- ExecutionConfig executionConfig, long memorySize,
- MockInputSplitProvider inputSplitProvider, int bufferSize) {
+ AcknowledgeStreamMockEnvironment(
+ Configuration jobConfig, Configuration taskConfig,
+ ExecutionConfig executionConfig, long memorySize,
+ MockInputSplitProvider inputSplitProvider, int bufferSize) {
super(jobConfig, taskConfig, executionConfig, memorySize, inputSplitProvider, bufferSize);
}
@Override
- public void acknowledgeCheckpoint(long checkpointId, ChainedStateHandle<StreamStateHandle> state,
- List<KeyGroupsStateHandle> keyGroupStates) {
+ public void acknowledgeCheckpoint(
+ long checkpointId,
+ ChainedStateHandle<StreamStateHandle> state, List<KeyGroupsStateHandle> keyGroupStates,
+ long syncDuration, long asymcDuration, long alignmentByte, long alignmentDuration) {
+
this.checkpointId = checkpointId;
this.state = state;
this.keyGroupStates = keyGroupStates;
+
checkpointLatch.trigger();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 0901b32..2036f69 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -308,14 +308,18 @@ public class StreamMockEnvironment implements Environment {
}
@Override
- public void acknowledgeCheckpoint(long checkpointId) {
+ public void acknowledgeCheckpoint(
+ long checkpointId,
+ long synchronousDurationMillis, long asynchronousDurationMillis,
+ long bytesBufferedInAlignment, long alignmentDurationNanos) {
}
@Override
- public void acknowledgeCheckpoint(long checkpointId,
- ChainedStateHandle<StreamStateHandle> chainedStateHandle,
- List<KeyGroupsStateHandle> keyGroupStateHandles) {
-
+ public void acknowledgeCheckpoint(
+ long checkpointId,
+ ChainedStateHandle<StreamStateHandle> chainedStateHandle, List<KeyGroupsStateHandle> keyGroupStateHandles,
+ long synchronousDurationMillis, long asynchronousDurationMillis,
+ long bytesBufferedInAlignment, long alignmentDurationNanos) {
}
@Override