You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/08 20:25:09 UTC
[3/4] flink git commit: [FLINK-4985] [checkpointing] Report canceled
/ declined checkpoints to the Checkpoint Coordinator
[FLINK-4985] [checkpointing] Report canceled / declined checkpoints to the Checkpoint Coordinator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a4fdfff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a4fdfff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a4fdfff
Branch: refs/heads/release-1.1
Commit: 1a4fdfff5d364a35e935604c0a5058a1a9f242f7
Parents: a1f028d
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Nov 8 17:13:19 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 8 19:07:16 2016 +0100
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 18 +-
.../AlignmentLimitExceededException.java | 33 +++
.../decline/CheckpointDeclineException.java | 35 +++
...ntDeclineOnCancellationBarrierException.java | 32 +++
.../CheckpointDeclineSubsumedException.java | 32 +++
...intDeclineTaskNotCheckpointingException.java | 32 +++
.../CheckpointDeclineTaskNotReadyException.java | 32 +++
.../decline/InputEndOfStreamException.java | 32 +++
.../flink/runtime/execution/Environment.java | 9 +
.../runtime/jobgraph/tasks/StatefulTask.java | 3 +-
.../messages/checkpoint/DeclineCheckpoint.java | 65 +++---
.../runtime/taskmanager/RuntimeEnvironment.java | 7 +
.../apache/flink/runtime/taskmanager/Task.java | 22 +-
.../checkpoint/CheckpointCoordinatorTest.java | 41 +---
.../savepoint/SavepointCoordinatorTest.java | 2 +-
.../jobmanager/JobManagerHARecoveryTest.java | 2 +-
.../operators/testutils/DummyEnvironment.java | 5 +
.../operators/testutils/MockEnvironment.java | 5 +
.../runtime/taskmanager/TaskAsyncCallTest.java | 2 +-
.../streaming/runtime/io/BarrierBuffer.java | 28 ++-
.../streaming/runtime/io/BarrierTracker.java | 4 +-
.../streaming/runtime/tasks/StreamTask.java | 25 ++-
.../streaming/runtime/io/BarrierBufferTest.java | 31 +--
.../runtime/io/BarrierTrackerTest.java | 2 +-
.../runtime/tasks/OneInputStreamTaskTest.java | 2 +
.../runtime/tasks/StreamMockEnvironment.java | 3 +
.../StreamTaskCancellationBarrierTest.java | 221 +++++++++++++++++++
.../runtime/tasks/StreamTaskTestHarness.java | 31 ++-
.../runtime/tasks/TwoInputStreamTaskTest.java | 2 +
29 files changed, 632 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 409f05b..8661ddc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -575,6 +575,7 @@ public class CheckpointCoordinator {
}
final long checkpointId = message.getCheckpointId();
+ final String reason = (message.getReason() != null ? message.getReason().getMessage() : "");
PendingCheckpoint checkpoint;
@@ -594,8 +595,8 @@ public class CheckpointCoordinator {
if (checkpoint != null && !checkpoint.isDiscarded()) {
isPendingCheckpoint = true;
- LOG.info("Discarding checkpoint " + checkpointId
- + " because of checkpoint decline from task " + message.getTaskExecutionId());
+ LOG.info("Discarding checkpoint {} because of checkpoint decline from task {} : {}",
+ checkpointId, message.getTaskExecutionId(), reason);
pendingCheckpoints.remove(checkpointId);
checkpoint.discard(userClassLoader);
@@ -604,19 +605,14 @@ public class CheckpointCoordinator {
onCancelCheckpoint(checkpointId);
boolean haveMoreRecentPending = false;
- Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
- while (entries.hasNext()) {
- PendingCheckpoint p = entries.next().getValue();
- if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
+ for (PendingCheckpoint p : pendingCheckpoints.values()) {
+ if (!p.isDiscarded() && p.getCheckpointId() >= checkpoint.getCheckpointId()) {
haveMoreRecentPending = true;
break;
}
}
- if (!haveMoreRecentPending && !triggerRequestQueued) {
- LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId);
- triggerCheckpoint(System.currentTimeMillis());
- } else if (!haveMoreRecentPending) {
- LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId);
+
+ if (!haveMoreRecentPending) {
triggerQueuedRequests();
}
} else if (checkpoint != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
new file mode 100644
index 0000000..64d57bc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because too many bytes were
+ * buffered in the alignment phase.
+ */
+public final class AlignmentLimitExceededException extends CheckpointDeclineException {
+
+ private static final long serialVersionUID = 1L;
+
+ public AlignmentLimitExceededException(long numBytes) {
+ super("The checkpoint alignment phase needed to buffer more than the configured maximum ("
+ + numBytes + " bytes).");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
new file mode 100644
index 0000000..8a2802c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Base class of all exceptions that indicate a declined checkpoint.
+ */
+public abstract class CheckpointDeclineException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public CheckpointDeclineException(String message) {
+ super(message);
+ }
+
+ public CheckpointDeclineException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
new file mode 100644
index 0000000..9ae4096
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a cancellation
+ * barrier was received.
+ */
+public final class CheckpointDeclineOnCancellationBarrierException extends CheckpointDeclineException {
+
+ private static final long serialVersionUID = 1L;
+
+ public CheckpointDeclineOnCancellationBarrierException() {
+ super("Task received cancellation from one of its inputs");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
new file mode 100644
index 0000000..5380469
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a newer checkpoint
+ * barrier was received on an input before the pending checkpoint's barrier.
+ */
+public final class CheckpointDeclineSubsumedException extends CheckpointDeclineException {
+
+ private static final long serialVersionUID = 1L;
+
+ public CheckpointDeclineSubsumedException(long newCheckpointId) {
+ super("Checkpoint was canceled because a barrier from newer checkpoint " + newCheckpointId + " was received.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
new file mode 100644
index 0000000..e5773d1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a task does not support
+ * checkpointing.
+ */
+public final class CheckpointDeclineTaskNotCheckpointingException extends CheckpointDeclineException {
+
+ private static final long serialVersionUID = 1L;
+
+ public CheckpointDeclineTaskNotCheckpointingException(String taskName) {
+ super("Task '" + taskName + "'does not support checkpointing");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
new file mode 100644
index 0000000..a1214fe
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a task was not
+ * ready to perform a checkpoint.
+ */
+public final class CheckpointDeclineTaskNotReadyException extends CheckpointDeclineException {
+
+ private static final long serialVersionUID = 1L;
+
+ public CheckpointDeclineTaskNotReadyException(String taskName) {
+ super("Task " + taskName + " was not running");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
new file mode 100644
index 0000000..86b29dc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because one of the input
+ * stream reached its end before the alignment was complete.
+ */
+public final class InputEndOfStreamException extends CheckpointDeclineException {
+
+ private static final long serialVersionUID = 1L;
+
+ public InputEndOfStreamException() {
+ super("Checkpoint was declined because one input stream is finished");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 5ad5fe2..e84a5e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -167,6 +167,15 @@ public interface Environment {
void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state);
/**
+ * Declines a checkpoint. This tells the checkpoint coordinator that this task will
+ * not be able to successfully complete a certain checkpoint.
+ *
+ * @param checkpointId The ID of the declined checkpoint.
+ * @param cause An optional reason why the checkpoint was declined.
+ */
+ void declineCheckpoint(long checkpointId, Throwable cause);
+
+ /**
* Marks task execution failed for an external reason (a reason other than the task code itself
* throwing an exception). If the task is already in a terminal state
* (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing.
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 7c581df..874ca4a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -66,8 +66,9 @@ public interface StatefulTask<T extends StateHandle<?>> {
* {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their outputs.
*
* @param checkpointId The ID of the checkpoint to be aborted.
+ * @param cause The reason why the checkpoint was aborted during alignment
*/
- void abortCheckpointOnBarrier(long checkpointId) throws Exception;
+ void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception;
/**
* Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
index f26d2fb..dca212c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
@@ -19,7 +19,14 @@
package org.apache.flink.runtime.messages.checkpoint;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
+import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.util.SerializedThrowable;
/**
* This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the
@@ -31,44 +38,48 @@ public class DeclineCheckpoint extends AbstractCheckpointMessage implements java
private static final long serialVersionUID = 2094094662279578953L;
- /** The timestamp associated with the checkpoint */
- private final long timestamp;
+ /** The reason why the checkpoint was declined */
+ private final Throwable reason;
- public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) {
- super(job, taskExecutionId, checkpointId);
- this.timestamp = timestamp;
+ public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
+ this(job, taskExecutionId, checkpointId, null);
}
- // --------------------------------------------------------------------------------------------
+ public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, Throwable reason) {
+ super(job, taskExecutionId, checkpointId);
- public long getTimestamp() {
- return timestamp;
+ if (reason == null ||
+ reason.getClass() == AlignmentLimitExceededException.class ||
+ reason.getClass() == CheckpointDeclineOnCancellationBarrierException.class ||
+ reason.getClass() == CheckpointDeclineSubsumedException.class ||
+ reason.getClass() == CheckpointDeclineTaskNotCheckpointingException.class ||
+ reason.getClass() == CheckpointDeclineTaskNotReadyException.class ||
+ reason.getClass() == InputEndOfStreamException.class)
+ {
+ // null or known common exceptions that cannot reference any dynamically loaded code
+ this.reason = reason;
+ } else {
+ // some other exception. replace with a serialized throwable, to be on the safe side
+ this.reason = new SerializedThrowable(reason);
+ }
}
// --------------------------------------------------------------------------------------------
- @Override
- public int hashCode() {
- return super.hashCode() + (int) (timestamp ^ (timestamp >>> 32));
+ /**
+ * Gets the reason why the checkpoint was declined.
+ *
+ * @return The reason why the checkpoint was declined
+ */
+ public Throwable getReason() {
+ return reason;
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- else if (o instanceof DeclineCheckpoint) {
- DeclineCheckpoint that = (DeclineCheckpoint) o;
- return this.timestamp == that.timestamp && super.equals(o);
- }
- else {
- return false;
- }
- }
+ // --------------------------------------------------------------------------------------------
@Override
public String toString() {
- return String.format("Declined Checkpoint %d@%d for (%s/%s)",
- getCheckpointId(), getTimestamp(), getJob(), getTaskExecutionId());
+ return String.format("Declined Checkpoint %d for (%s/%s): %s",
+ getCheckpointId(), getJob(), getTaskExecutionId(), reason);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 6fdf6f9..47149a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -268,6 +269,12 @@ public class RuntimeEnvironment implements Environment {
}
@Override
+ public void declineCheckpoint(long checkpointId, Throwable cause) {
+ DeclineCheckpoint message = new DeclineCheckpoint(jobId, executionId, checkpointId, cause);
+ jobManager.tell(message);
+ }
+
+ @Override
public void failExternally(Throwable cause) {
this.containingTask.failExternally(cause);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index ed15dbf..3eab7e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -29,6 +29,8 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -969,13 +971,16 @@ public class Task implements Runnable {
try {
boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
if (!success) {
- DeclineCheckpoint decline = new DeclineCheckpoint(jobId, getExecutionId(), checkpointID, checkpointTimestamp);
+ // task was not ready to trigger this checkpoint
+ DeclineCheckpoint decline = new DeclineCheckpoint(
+ jobId, getExecutionId(), checkpointID,
+ new CheckpointDeclineTaskNotReadyException(taskName));
jobManager.tell(decline);
}
}
catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
- failExternally(new RuntimeException(
+ failExternally(new Exception(
"Error while triggering checkpoint for " + taskName,
t));
}
@@ -987,10 +992,21 @@ public class Task implements Runnable {
else {
LOG.error("Task received a checkpoint request, but is not a checkpointing task - "
+ taskNameWithSubtask);
+
+ DeclineCheckpoint decline = new DeclineCheckpoint(
+ jobId, executionId, checkpointID,
+ new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask));
+ jobManager.tell(decline);
}
}
else {
- LOG.debug("Ignoring request to trigger a checkpoint for non-running task.");
+ LOG.debug("Declining checkpoint request for non-running task");
+
+ // send back a message that we did not do the checkpoint
+ DeclineCheckpoint decline = new DeclineCheckpoint(
+ jobId, executionId, checkpointID,
+ new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
+ jobManager.tell(decline);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 62af42b..f3f988a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -288,44 +288,19 @@ public class CheckpointCoordinatorTest {
// decline checkpoint from the other task, this should cancel the checkpoint
// and trigger a new one
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
+ coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
assertTrue(checkpoint.isDiscarded());
- // validate that we have a new pending checkpoint
- assertEquals(1, coord.getNumberOfPendingCheckpoints());
+ // validate that we have no new pending checkpoint
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
- long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
- PendingCheckpoint checkpointNew = coord.getPendingCheckpoints().get(checkpointIdNew);
-
- assertNotNull(checkpointNew);
- assertEquals(checkpointIdNew, checkpointNew.getCheckpointId());
- assertEquals(jid, checkpointNew.getJobId());
- assertEquals(2, checkpointNew.getNumberOfNonAcknowledgedTasks());
- assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
- assertEquals(0, checkpointNew.getTaskStates().size());
- assertFalse(checkpointNew.isDiscarded());
- assertFalse(checkpointNew.isFullyAcknowledged());
- assertNotEquals(checkpoint.getCheckpointId(), checkpointNew.getCheckpointId());
-
- // check that the vertices received the new trigger checkpoint message
- {
- TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointIdNew, checkpointNew.getCheckpointTimestamp());
- TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpointIdNew, checkpointNew.getCheckpointTimestamp());
- verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
- verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
- }
-
// decline again, nothing should happen
// decline from the other task, nothing should happen
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId, checkpoint.getCheckpointTimestamp()));
+ coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
+ coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId));
assertTrue(checkpoint.isDiscarded());
- // should still have the same second checkpoint pending
- long checkpointIdNew2 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
- assertEquals(checkpointIdNew2, checkpointIdNew);
-
coord.shutdown();
}
catch (Exception e) {
@@ -422,7 +397,7 @@ public class CheckpointCoordinatorTest {
// decline checkpoint from one of the tasks, this should cancel the checkpoint
// and trigger a new one
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
+ coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id));
assertTrue(checkpoint1.isDiscarded());
// validate that we have only one pending checkpoint left
@@ -446,8 +421,8 @@ public class CheckpointCoordinatorTest {
// decline again, nothing should happen
// decline from the other task, nothing should happen
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
+ coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id));
+ coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id));
assertTrue(checkpoint1.isDiscarded());
coord.shutdown();
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
index b1b384d..dc2b23f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
@@ -202,7 +202,7 @@ public class SavepointCoordinatorTest extends TestLogger {
coordinator.receiveDeclineMessage(new DeclineCheckpoint(
jobId, vertices[1].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId, 0));
+ checkpointId));
// The pending checkpoint is completed
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 4dfaf95..2e3bace 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -455,7 +455,7 @@ public class JobManagerHARecoveryTest {
}
@Override
- public void abortCheckpointOnBarrier(long checkpointId) {
+ public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
throw new UnsupportedOperationException("should not be called!");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 5af34fb..ca68683 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -140,6 +140,11 @@ public class DummyEnvironment implements Environment {
public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {}
@Override
+ public void declineCheckpoint(long checkpointId, Throwable cause) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void failExternally(Throwable cause) {
throw new UnsupportedOperationException("DummyEnvironment does not support external task failure.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 9dea324..22dee63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -291,6 +291,11 @@ public class MockEnvironment implements Environment {
}
@Override
+ public void declineCheckpoint(long checkpointId, Throwable cause) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void failExternally(Throwable cause) {
throw new UnsupportedOperationException("MockEnvironment does not support external task failure.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 5b344eb..3ace0f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -226,7 +226,7 @@ public class TaskAsyncCallTest {
}
@Override
- public void abortCheckpointOnBarrier(long checkpointId) {
+ public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
throw new UnsupportedOperationException("Should not be called");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 36de717..7a8e7d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -18,6 +18,10 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -142,7 +146,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
else {
if (next.getEvent().getClass() == EndOfPartitionEvent.class) {
- processEndOfPartition(next.getChannelIndex());
+ processEndOfPartition();
}
return next;
}
@@ -196,7 +200,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
"Skipping current checkpoint.", barrierId, currentCheckpointId);
// let the task know we are not completing this
- notifyAbort(currentCheckpointId);
+ notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
// abort the current checkpoint
releaseBlocksAndResetBarriers();
@@ -241,7 +245,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
if (barrierId > currentCheckpointId) {
// new checkpoint
currentCheckpointId = barrierId;
- notifyAbort(barrierId);
+ notifyAbortOnCancellationBarrier(barrierId);
}
return;
}
@@ -258,7 +262,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
releaseBlocksAndResetBarriers();
- notifyAbort(barrierId);
+ notifyAbortOnCancellationBarrier(barrierId);
}
else if (barrierId > currentCheckpointId) {
// we canceled the next which also cancels the current
@@ -272,7 +276,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
currentCheckpointId = barrierId;
startOfAlignmentTimestamp = 0L;
latestAlignmentDurationNanos = 0L;
- notifyAbort(barrierId);
+ notifyAbortOnCancellationBarrier(barrierId);
}
// else: ignore trailing (cancellation) barrier from an earlier checkpoint (obsolete now)
@@ -292,7 +296,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId);
}
- notifyAbort(barrierId);
+ notifyAbortOnCancellationBarrier(barrierId);
}
// else: trailing barrier from either
@@ -300,12 +304,12 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
// - the current checkpoint if it was already canceled
}
- private void processEndOfPartition(int channel) throws Exception {
+ private void processEndOfPartition() throws Exception {
numClosedChannels++;
if (numBarriersReceived > 0) {
// let the task know we skip a checkpoint
- notifyAbort(currentCheckpointId);
+ notifyAbort(currentCheckpointId, new InputEndOfStreamException());
// no chance to complete this checkpoint
releaseBlocksAndResetBarriers();
@@ -319,9 +323,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
}
- private void notifyAbort(long checkpointId) throws Exception {
+ private void notifyAbortOnCancellationBarrier(long checkpointId) throws Exception {
+ notifyAbort(checkpointId, new CheckpointDeclineOnCancellationBarrierException());
+ }
+
+ private void notifyAbort(long checkpointId, CheckpointDeclineException cause) throws Exception {
if (toNotifyOnCheckpoint != null) {
- toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+ toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 5157336..8b4cc48 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
@@ -227,7 +228,8 @@ public class BarrierTracker implements CheckpointBarrierHandler {
private void notifyAbort(long checkpointId) throws Exception {
if (toNotifyOnCheckpoint != null) {
- toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+ toNotifyOnCheckpoint.abortCheckpointOnBarrier(
+ checkpointId, new CheckpointDeclineOnCancellationBarrierException());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d55a9c5..4f0839f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,6 +25,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -592,13 +594,15 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
@Override
- public void abortCheckpointOnBarrier(long checkpointId) throws Exception {
+ public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception {
LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", checkpointId, getName());
+ // notify the coordinator that we decline this checkpoint
+ getEnvironment().declineCheckpoint(checkpointId, cause);
+
+ // notify all downstream operators that they should not wait for a barrier from us
synchronized (lock) {
- if (isRunning) {
- operatorChain.broadcastCheckpointCancelMarker(checkpointId);
- }
+ operatorChain.broadcastCheckpointCancelMarker(checkpointId);
}
}
@@ -669,7 +673,18 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
checkpointThread.start();
}
return true;
- } else {
+ }
+ else {
+ // we cannot perform our checkpoint - let the downstream operators know that they
+ // should not wait for any input from this operator
+
+ // we cannot broadcast the cancellation markers on the 'operator chain', because it may not
+ // yet be created
+ final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointId);
+ for (ResultPartitionWriter output : getEnvironment().getAllWriters()) {
+ output.writeEventToAllChannels(message);
+ }
+
return false;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index cf1f98e..20cb8f7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -45,6 +47,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
@@ -566,7 +569,7 @@ public class BarrierBufferTest {
check(sequence[12], buffer.getNextNonBlocked());
assertEquals(3L, buffer.getCurrentCheckpointId());
validateAlignmentTime(startTs, buffer);
- verify(toNotify).abortCheckpointOnBarrier(2L);
+ verify(toNotify).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineSubsumedException.class));
check(sequence[16], buffer.getNextNonBlocked());
// checkpoint 3 alignment in progress
@@ -574,7 +577,7 @@ public class BarrierBufferTest {
// checkpoint 3 aborted (end of partition)
check(sequence[20], buffer.getNextNonBlocked());
- verify(toNotify).abortCheckpointOnBarrier(3L);
+ verify(toNotify).abortCheckpointOnBarrier(eq(3L), any(CheckpointDeclineSubsumedException.class));
// replay buffered data from checkpoint 3
check(sequence[18], buffer.getNextNonBlocked());
@@ -999,13 +1002,13 @@ public class BarrierBufferTest {
check(sequence[6], buffer.getNextNonBlocked());
assertEquals(5L, buffer.getCurrentCheckpointId());
verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(2L), anyLong());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class));
verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), anyLong());
assertEquals(0L, buffer.getAlignmentDurationNanos());
check(sequence[8], buffer.getNextNonBlocked());
assertEquals(6L, buffer.getCurrentCheckpointId());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), any(CheckpointDeclineOnCancellationBarrierException.class));
assertEquals(0L, buffer.getAlignmentDurationNanos());
buffer.cleanup();
@@ -1073,7 +1076,7 @@ public class BarrierBufferTest {
// canceled checkpoint on last barrier
startTs = System.nanoTime();
check(sequence[12], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class));
validateAlignmentTime(startTs, buffer);
check(sequence[13], buffer.getNextNonBlocked());
@@ -1088,7 +1091,7 @@ public class BarrierBufferTest {
// this checkpoint gets immediately canceled
check(sequence[24], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class));
assertEquals(0L, buffer.getAlignmentDurationNanos());
// some buffers
@@ -1104,7 +1107,7 @@ public class BarrierBufferTest {
check(sequence[33], buffer.getNextNonBlocked());
check(sequence[37], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), any(CheckpointDeclineOnCancellationBarrierException.class));
assertEquals(0L, buffer.getAlignmentDurationNanos());
// all done
@@ -1167,7 +1170,7 @@ public class BarrierBufferTest {
// re-read the queued cancellation barriers
check(sequence[9], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class));
assertEquals(0L, buffer.getAlignmentDurationNanos());
check(sequence[10], buffer.getNextNonBlocked());
@@ -1184,7 +1187,7 @@ public class BarrierBufferTest {
// no further checkpoint (abort) notifications
verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(CheckpointDeclineOnCancellationBarrierException.class));
// all done
assertNull(buffer.getNextNonBlocked());
@@ -1253,7 +1256,7 @@ public class BarrierBufferTest {
// cancelled by cancellation barrier
check(sequence[4], buffer.getNextNonBlocked());
validateAlignmentTime(startTs, buffer);
- verify(toNotify, times(1)).abortCheckpointOnBarrier(1L);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(1L), any(CheckpointDeclineOnCancellationBarrierException.class));
// the next checkpoint alignment starts now
startTs = System.nanoTime();
@@ -1285,7 +1288,7 @@ public class BarrierBufferTest {
// check overall notifications
verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
}
/**
@@ -1337,7 +1340,7 @@ public class BarrierBufferTest {
// future barrier aborts checkpoint
startTs = System.nanoTime();
check(sequence[3], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(3L);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), any(CheckpointDeclineSubsumedException.class));
check(sequence[4], buffer.getNextNonBlocked());
// alignment of next checkpoint
@@ -1366,7 +1369,7 @@ public class BarrierBufferTest {
// check overall notifications
verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
}
// ------------------------------------------------------------------------
@@ -1471,7 +1474,7 @@ public class BarrierBufferTest {
}
@Override
- public void abortCheckpointOnBarrier(long checkpointId) {}
+ public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 903f585..978c212 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -481,7 +481,7 @@ public class BarrierTrackerTest {
}
@Override
- public void abortCheckpointOnBarrier(long checkpointId) {
+ public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
assertTrue("More checkpoints than expected", i < checkpointIDs.length);
final long expectedId = checkpointIDs[i++];
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 5fcc59e..9003f0e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -270,6 +271,7 @@ public class OneInputStreamTaskTest {
testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
+ expectedOutput.add(new CancelCheckpointMarker(0));
expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
expectedOutput.add(new CheckpointBarrier(1, 1));
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 7084208..36ad8ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -302,6 +302,9 @@ public class StreamMockEnvironment implements Environment {
}
@Override
+ public void declineCheckpoint(long checkpointId, Throwable cause) {}
+
+ @Override
public void failExternally(Throwable cause) {
throw new UnsupportedOperationException("StreamMockEnvironment does not support external task failure.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
new file mode 100644
index 0000000..8b8b659
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", "org.apache.log4j.*"})
+public class StreamTaskCancellationBarrierTest {
+
+ /**
+ * This test checks that tasks emit a proper cancel checkpoint barrier, if a "trigger checkpoint" message
+ * comes before they are ready.
+ */
+ @Test
+ public void testEmitCancellationBarrierWhenNotReady() throws Exception {
+ StreamTask<String, ?> task = new InitBlockingTask();
+ StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO);
+
+ // start the test - this cannot succeed across the 'init()' method
+ testHarness.invoke();
+
+ // tell the task to commence a checkpoint
+ boolean result = task.triggerCheckpoint(41L, System.currentTimeMillis());
+ assertFalse("task triggered checkpoint though not ready", result);
+
+ // a cancellation barrier should be downstream
+ Object emitted = testHarness.getOutput().poll();
+ assertNotNull("nothing emitted", emitted);
+ assertTrue("wrong type emitted", emitted instanceof CancelCheckpointMarker);
+ assertEquals("wrong checkpoint id", 41L, ((CancelCheckpointMarker) emitted).getCheckpointId());
+ }
+
+ /**
+ * This test verifies (for onw input tasks) that the Stream tasks react the following way to
+ * receiving a checkpoint cancellation barrier:
+ *
+ * - send a "decline checkpoint" notification out (to the JobManager)
+ * - emit a cancellation barrier downstream
+ */
+ @Test
+ public void testDeclineCallOnCancelBarrierOneInput() throws Exception {
+
+ OneInputStreamTask<String, String> task = new OneInputStreamTask<String, String>();
+ OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
+ task,
+ 1, 2,
+ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ StreamMap<String, String> mapOperator = new StreamMap<>(new IdentityMap());
+ streamConfig.setStreamOperator(mapOperator);
+
+ StreamMockEnvironment environment = spy(testHarness.createEnvironment());
+
+ // start the task
+ testHarness.invoke(environment);
+ testHarness.waitForTaskRunning();
+
+ // emit cancellation barriers
+ testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 1);
+ testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 0);
+ testHarness.waitForInputProcessing();
+
+ // the decline call should go to the coordinator
+ verify(environment, times(1)).declineCheckpoint(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class));
+
+ // a cancellation barrier should be downstream
+ Object result = testHarness.getOutput().poll();
+ assertNotNull("nothing emitted", result);
+ assertTrue("wrong type emitted", result instanceof CancelCheckpointMarker);
+ assertEquals("wrong checkpoint id", 2L, ((CancelCheckpointMarker) result).getCheckpointId());
+
+ // cancel and shutdown
+ testHarness.endInput();
+ testHarness.waitForTaskCompletion();
+ }
+
+ /**
+ * This test verifies (for onw input tasks) that the Stream tasks react the following way to
+ * receiving a checkpoint cancellation barrier:
+ *
+ * - send a "decline checkpoint" notification out (to the JobManager)
+ * - emit a cancellation barrier downstream
+ */
+ @Test
+ public void testDeclineCallOnCancelBarrierTwoInputs() throws Exception {
+
+ TwoInputStreamTask<String, String, String> task = new TwoInputStreamTask<String, String, String>();
+ TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>(
+ task,
+ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ CoStreamMap<String, String, String> op = new CoStreamMap<>(new UnionCoMap());
+ streamConfig.setStreamOperator(op);
+
+ StreamMockEnvironment environment = spy(testHarness.createEnvironment());
+
+ // start the task
+ testHarness.invoke(environment);
+ testHarness.waitForTaskRunning();
+
+ // emit cancellation barriers
+ testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 0);
+ testHarness.processEvent(new CancelCheckpointMarker(2L), 1, 0);
+ testHarness.waitForInputProcessing();
+
+ // the decline call should go to the coordinator
+ verify(environment, times(1)).declineCheckpoint(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class));
+
+ // a cancellation barrier should be downstream
+ Object result = testHarness.getOutput().poll();
+ assertNotNull("nothing emitted", result);
+ assertTrue("wrong type emitted", result instanceof CancelCheckpointMarker);
+ assertEquals("wrong checkpoint id", 2L, ((CancelCheckpointMarker) result).getCheckpointId());
+
+ // cancel and shutdown
+ testHarness.endInput();
+ testHarness.waitForTaskCompletion();
+ }
+
+ // ------------------------------------------------------------------------
+ // test tasks / functions
+ // ------------------------------------------------------------------------
+
+ private static class InitBlockingTask extends StreamTask<String, AbstractStreamOperator<String>> {
+
+ private final Object lock = new Object();
+ private volatile boolean running = true;
+
+ @Override
+ protected void init() throws Exception {
+ synchronized (lock) {
+ while (running) {
+ lock.wait();
+ }
+ }
+ }
+
+ @Override
+ protected void run() throws Exception {}
+
+ @Override
+ protected void cleanup() throws Exception {}
+
+ @Override
+ protected void cancelTask() throws Exception {
+ running = false;
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+ }
+
+ private static class IdentityMap implements MapFunction<String, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String map(String value) throws Exception {
+ return value;
+ }
+ }
+
+ private static class UnionCoMap implements CoMapFunction<String, String, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String map1(String value) throws Exception {
+ return value;
+ }
+
+ @Override
+ public String map2(String value) throws Exception {
+ return value;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 00e95b9..0bd8d9a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -150,21 +150,18 @@ public class StreamTaskTestHarness<OUT> {
}
+ public StreamMockEnvironment createEnvironment() {
+ return new StreamMockEnvironment(
+ jobConfig, taskConfig, executionConfig, memorySize, new MockInputSplitProvider(), bufferSize);
+ }
+
/**
* Invoke the Task. This resets the output of any previous invocation. This will start a new
* Thread to execute the Task in. Use {@link #waitForTaskCompletion()} to wait for the
* Task thread to finish running.
*/
public void invoke() throws Exception {
- mockEnv = new StreamMockEnvironment(jobConfig, taskConfig, executionConfig,
- memorySize, new MockInputSplitProvider(), bufferSize);
- task.setEnvironment(mockEnv);
-
- initializeInputs();
- initializeOutput();
-
- taskThread = new TaskThread(task);
- taskThread.start();
+ invoke(createEnvironment());
}
/**
@@ -205,7 +202,7 @@ public class StreamTaskTestHarness<OUT> {
if (taskThread.task instanceof StreamTask) {
StreamTask<?, ?> streamTask = (StreamTask<?, ?>) taskThread.task;
while (!streamTask.isRunning()) {
- Thread.sleep(100);
+ Thread.sleep(10);
if (!taskThread.isAlive()) {
if (taskThread.getError() != null) {
throw new Exception("Task Thread failed due to an error.", taskThread.getError());
@@ -282,15 +279,15 @@ public class StreamTaskTestHarness<OUT> {
/**
* This only returns after all input queues are empty.
*/
- public void waitForInputProcessing() {
-
-
+ public void waitForInputProcessing() throws Exception {
// first wait for all input queues to be empty
- try {
- Thread.sleep(1);
- } catch (InterruptedException ignored) {}
-
+
while (true) {
+ Throwable error = taskThread.getError();
+ if (error != null) {
+ throw new Exception("Exception in the task thread", error);
+ }
+
boolean allEmpty = true;
for (int i = 0; i < numInputGates; i++) {
if (!inputGates[i].allQueuesEmpty()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index b9211b1..92f8553 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -290,6 +291,7 @@ public class TwoInputStreamTaskTest {
testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
+ expectedOutput.add(new CancelCheckpointMarker(0));
expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
expectedOutput.add(new CheckpointBarrier(1, 1));