You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:42 UTC
[15/24] flink git commit: [hotfix] StreamTask and OperatorChain
properly clean up partially initialized resources upon failures during
initialization
[hotfix] StreamTask and OperatorChain properly clean up partially initialized resources upon failures during initialization
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5a016cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5a016cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5a016cf
Branch: refs/heads/master
Commit: d5a016cff5c71dc6a6b995ebd6eb5757131f292a
Parents: f2d5038
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 9 18:21:35 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 16 15:26:10 2015 +0200
----------------------------------------------------------------------
.../streaming/runtime/tasks/OperatorChain.java | 56 ++++---
.../streaming/runtime/tasks/StreamTask.java | 149 ++++++++++++-------
2 files changed, 135 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d5a016cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 9df3a5d..b42b888 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -74,29 +74,47 @@ public class OperatorChain<OUT> {
Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap = new HashMap<>(outEdgesInOrder.size());
this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
- for (int i = 0; i < outEdgesInOrder.size(); i++) {
- StreamEdge outEdge = outEdgesInOrder.get(i);
+ // from here on, we need to make sure that the output writers are shut down again on failure
+ boolean success = false;
+ try {
+ for (int i = 0; i < outEdgesInOrder.size(); i++) {
+ StreamEdge outEdge = outEdgesInOrder.get(i);
+
+ RecordWriterOutput<?> streamOutput = createStreamOutput(
+ outEdge, chainedConfigs.get(outEdge.getSourceId()), i,
+ containingTask.getEnvironment(), enableTimestamps, reporter, containingTask.getName());
+
+ this.streamOutputs[i] = streamOutput;
+ streamOutputMap.put(outEdge, streamOutput);
+ }
+
+ // we create the chain of operators and grab the collector that leads into the chain
+ List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
+ this.chainEntryPoint = createOutputCollector(containingTask, configuration,
+ chainedConfigs, userCodeClassloader, streamOutputMap, allOps);
- RecordWriterOutput<?> streamOutput = createStreamOutput(
- outEdge, chainedConfigs.get(outEdge.getSourceId()), i,
- containingTask.getEnvironment(), enableTimestamps, reporter, containingTask.getName());
-
- streamOutputMap.put(outEdge, streamOutput);
- this.streamOutputs[i] = streamOutput;
+ this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size() + 1]);
+
+ // add the head operator to the end of the list
+ this.allOperators[this.allOperators.length - 1] = headOperator;
+
+ success = true;
+ }
+ finally {
+ // make sure we clean up after ourselves in case of a failure after acquiring
+ // the first resources
+ if (!success) {
+ for (RecordWriterOutput<?> output : this.streamOutputs) {
+ if (output != null) {
+ output.close();
+ output.clearBuffers();
+ }
+ }
+ }
}
-
- // we create the chain of operators and grab the collector that leads into the chain
- List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
- this.chainEntryPoint = createOutputCollector(containingTask, configuration,
- chainedConfigs, userCodeClassloader, streamOutputMap, allOps);
-
- this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size() + 1]);
- // add the head operator to the end of the list
- this.allOperators[this.allOperators.length - 1] = headOperator;
}
-
- //
+
public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException, InterruptedException {
CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
http://git-wip-us.apache.org/repos/asf/flink/blob/d5a016cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 5bf7d8e..b53d9c4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -127,6 +127,9 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
/** The map of user-defined accumulators of this task */
private Map<String, Accumulator<?, ?>> accumulatorMap;
+
+ /** The state to be restored once the initialization is done */
+ private StreamTaskStateList lazyRestoreState;
/** This field is used to forward an exception that is caught in the timer thread. Subclasses
* must ensure that exceptions stored here get thrown on the actual execution Thread. */
@@ -155,31 +158,44 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
@Override
public final void registerInputOutput() throws Exception {
- LOG.debug("Begin initialization for {}", getName());
+ LOG.debug("registerInputOutput for {}", getName());
- AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
-
- userClassLoader = getUserCodeClassLoader();
- configuration = new StreamConfig(getTaskConfiguration());
- accumulatorMap = accumulatorRegistry.getUserMap();
-
- stateBackend = createStateBackend();
- stateBackend.initializeForJob(getEnvironment().getJobID());
-
- headOperator = configuration.getStreamOperator(userClassLoader);
- operatorChain = new OperatorChain<>(this, headOperator, accumulatorRegistry.getReadWriteReporter());
-
- if (headOperator != null) {
- headOperator.setup(this, configuration, operatorChain.getChainEntryPoint());
- }
+ boolean initializationCompleted = false;
+ try {
+ AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
- timerService = Executors.newSingleThreadScheduledExecutor(
- new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
+ userClassLoader = getUserCodeClassLoader();
+ configuration = new StreamConfig(getTaskConfiguration());
+ accumulatorMap = accumulatorRegistry.getUserMap();
- // task specific initialization
- init();
-
- LOG.debug("Finish initialization for {}", getName());
+ stateBackend = createStateBackend();
+ stateBackend.initializeForJob(getEnvironment().getJobID());
+
+ headOperator = configuration.getStreamOperator(userClassLoader);
+ operatorChain = new OperatorChain<>(this, headOperator, accumulatorRegistry.getReadWriteReporter());
+
+ if (headOperator != null) {
+ headOperator.setup(this, configuration, operatorChain.getChainEntryPoint());
+ }
+
+ timerService = Executors.newSingleThreadScheduledExecutor(
+ new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
+
+ // task specific initialization
+ init();
+
+ initializationCompleted = true;
+ }
+ finally {
+ if (!initializationCompleted) {
+ if (timerService != null) {
+ timerService.shutdownNow();
+ }
+ if (operatorChain != null) {
+ operatorChain.releaseOutputs();
+ }
+ }
+ }
}
@Override
@@ -188,6 +204,9 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
boolean disposed = false;
try {
+ // first order of business is to ive operators back their state
+ restoreStateLazy();
+
// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
synchronized (lock) {
@@ -223,7 +242,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
finally {
isRunning = false;
- timerService.shutdown();
+ timerService.shutdownNow();
// release the output resources. this method should never fail.
if (operatorChain != null) {
@@ -263,7 +282,9 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
private void openAllOperators() throws Exception {
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
- operator.open();
+ if (operator != null) {
+ operator.open();
+ }
}
}
@@ -272,20 +293,27 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
// elements in their close methods.
StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
for (int i = allOperators.length - 1; i >= 0; i--) {
- allOperators[i].close();
+ StreamOperator<?> operator = allOperators[i];
+ if (operator != null) {
+ operator.close();
+ }
}
}
private void tryDisposeAllOperators() throws Exception {
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
- operator.dispose();
+ if (operator != null) {
+ operator.dispose();
+ }
}
}
private void disposeAllOperators() {
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
try {
- operator.dispose();
+ if (operator != null) {
+ operator.dispose();
+ }
}
catch (Throwable t) {
LOG.error("Error during disposal of stream operator.", t);
@@ -354,22 +382,36 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
// ------------------------------------------------------------------------
@Override
- public void setInitialState(StreamTaskStateList initialState) throws Exception {
- LOG.info("Restoring checkpointed state to task {}", getName());
-
- final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
- final StreamTaskState[] states = initialState.getState(userClassLoader);
-
- for (int i = 0; i < states.length; i++) {
- StreamTaskState state = states[i];
- StreamOperator<?> operator = allOperators[i];
+ public void setInitialState(StreamTaskStateList initialState) {
+ lazyRestoreState = initialState;
+ }
+
+ public void restoreStateLazy() throws Exception {
+ if (lazyRestoreState != null) {
+ LOG.info("Restoring checkpointed state to task {}", getName());
- if (state != null && operator != null) {
- LOG.debug("Task {} in chain ({}) has checkpointed state", i, getName());
- operator.restoreState(state);
+ try {
+ final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
+ final StreamTaskState[] states = lazyRestoreState.getState(userClassLoader);
+
+ // be GC friendly
+ lazyRestoreState = null;
+
+ for (int i = 0; i < states.length; i++) {
+ StreamTaskState state = states[i];
+ StreamOperator<?> operator = allOperators[i];
+
+ if (state != null && operator != null) {
+ LOG.debug("Task {} in chain ({}) has checkpointed state", i, getName());
+ operator.restoreState(state);
+ }
+ else if (operator != null) {
+ LOG.debug("Task {} in chain ({}) does not have checkpointed state", i, getName());
+ }
+ }
}
- else if (operator != null) {
- LOG.debug("Task {} in chain ({}) does not have checkpointed state", i, getName());
+ catch (Exception e) {
+ throw new Exception("Could not restore checkpointed state to operators and functions", e);
}
}
}
@@ -380,24 +422,27 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
synchronized (lock) {
if (isRunning) {
-
- // since both state checkpointing and downstream barrier emission occurs in this
- // lock scope, they are an atomic operation regardless of the order in which they occur
- // we immediately emit the checkpoint barriers, so the downstream operators can start
- // their checkpoint work as soon as possible
- operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
-
// now draw the state snapshot
try {
final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
final StreamTaskState[] states = new StreamTaskState[allOperators.length];
for (int i = 0; i < states.length; i++) {
- StreamTaskState state = allOperators[i].snapshotOperatorState(checkpointId, timestamp);
- states[i] = state.isEmpty() ? null : state;
+ StreamOperator<?> operator = allOperators[i];
+ if (operator != null) {
+ StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp);
+ states[i] = state.isEmpty() ? null : state;
+ }
}
StreamTaskStateList allStates = new StreamTaskStateList(states);
+
+ // since both state checkpointing and downstream barrier emission occurs in this
+ // lock scope, they are an atomic operation regardless of the order in which they occur
+ // we immediately emit the checkpoint barriers, so the downstream operators can start
+ // their checkpoint work as soon as possible
+ operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
+
if (allStates.isEmpty()) {
getEnvironment().acknowledgeCheckpoint(checkpointId);
} else {
@@ -420,7 +465,9 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
LOG.debug("Notification of complete checkpoint for task {}", getName());
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
- operator.notifyOfCompletedCheckpoint(checkpointId);
+ if (operator != null) {
+ operator.notifyOfCompletedCheckpoint(checkpointId);
+ }
}
}
else {