You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:42 UTC

[15/24] flink git commit: [hotfix] StreamTask and OperatorChain properly clean up partially initialized resources upon failures during initialization

[hotfix] StreamTask and OperatorChain properly clean up partially initialized resources upon failures during initialization


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5a016cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5a016cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5a016cf

Branch: refs/heads/master
Commit: d5a016cff5c71dc6a6b995ebd6eb5757131f292a
Parents: f2d5038
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 9 18:21:35 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 16 15:26:10 2015 +0200

----------------------------------------------------------------------
 .../streaming/runtime/tasks/OperatorChain.java  |  56 ++++---
 .../streaming/runtime/tasks/StreamTask.java     | 149 ++++++++++++-------
 2 files changed, 135 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d5a016cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 9df3a5d..b42b888 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -74,29 +74,47 @@ public class OperatorChain<OUT> {
 		Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap = new HashMap<>(outEdgesInOrder.size());
 		this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
 		
-		for (int i = 0; i < outEdgesInOrder.size(); i++) {
-			StreamEdge outEdge = outEdgesInOrder.get(i);
+		// from here on, we need to make sure that the output writers are shut down again on failure
+		boolean success = false;
+		try {
+			for (int i = 0; i < outEdgesInOrder.size(); i++) {
+				StreamEdge outEdge = outEdgesInOrder.get(i);
+				
+				RecordWriterOutput<?> streamOutput = createStreamOutput(
+						outEdge, chainedConfigs.get(outEdge.getSourceId()), i,
+						containingTask.getEnvironment(), enableTimestamps, reporter, containingTask.getName());
+	
+				this.streamOutputs[i] = streamOutput;
+				streamOutputMap.put(outEdge, streamOutput);
+			}
+	
+			// we create the chain of operators and grab the collector that leads into the chain
+			List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
+			this.chainEntryPoint = createOutputCollector(containingTask, configuration,
+					chainedConfigs, userCodeClassloader, streamOutputMap, allOps);
 			
-			RecordWriterOutput<?> streamOutput = createStreamOutput(
-					outEdge, chainedConfigs.get(outEdge.getSourceId()), i,
-					containingTask.getEnvironment(), enableTimestamps, reporter, containingTask.getName());
-
-			streamOutputMap.put(outEdge, streamOutput);
-			this.streamOutputs[i] = streamOutput;
+			this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size() + 1]);
+			
+			// add the head operator to the end of the list
+			this.allOperators[this.allOperators.length - 1] = headOperator;
+			
+			success = true;
+		}
+		finally {
+			// make sure we clean up after ourselves in case of a failure after acquiring
+			// the first resources
+			if (!success) {
+				for (RecordWriterOutput<?> output : this.streamOutputs) {
+					if (output != null) {
+						output.close();
+						output.clearBuffers();
+					}
+				}
+			}
 		}
-
-		// we create the chain of operators and grab the collector that leads into the chain
-		List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
-		this.chainEntryPoint = createOutputCollector(containingTask, configuration,
-				chainedConfigs, userCodeClassloader, streamOutputMap, allOps);
-		
-		this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size() + 1]);
 		
-		// add the head operator to the end of the list
-		this.allOperators[this.allOperators.length - 1] = headOperator;
 	}
-
-	// 
+	
 	
 	public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException, InterruptedException {
 		CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);

http://git-wip-us.apache.org/repos/asf/flink/blob/d5a016cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 5bf7d8e..b53d9c4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -127,6 +127,9 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	
 	/** The map of user-defined accumulators of this task */
 	private Map<String, Accumulator<?, ?>> accumulatorMap;
+	
+	/** The state to be restored once the initialization is done */
+	private StreamTaskStateList lazyRestoreState;
 
 	/** This field is used to forward an exception that is caught in the timer thread. Subclasses
 	 * must ensure that exceptions stored here get thrown on the actual execution Thread. */
@@ -155,31 +158,44 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	
 	@Override
 	public final void registerInputOutput() throws Exception {
-		LOG.debug("Begin initialization for {}", getName());
+		LOG.debug("registerInputOutput for {}", getName());
 
-		AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
-		
-		userClassLoader = getUserCodeClassLoader();
-		configuration = new StreamConfig(getTaskConfiguration());
-		accumulatorMap = accumulatorRegistry.getUserMap();
-		
-		stateBackend = createStateBackend();
-		stateBackend.initializeForJob(getEnvironment().getJobID());
-		
-		headOperator = configuration.getStreamOperator(userClassLoader);
-		operatorChain = new OperatorChain<>(this, headOperator, accumulatorRegistry.getReadWriteReporter());
-		
-		if (headOperator != null) {
-			headOperator.setup(this, configuration, operatorChain.getChainEntryPoint());
-		}
+		boolean initializationCompleted = false;
+		try {
+			AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
 
-		timerService = Executors.newSingleThreadScheduledExecutor(
-				new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
+			userClassLoader = getUserCodeClassLoader();
+			configuration = new StreamConfig(getTaskConfiguration());
+			accumulatorMap = accumulatorRegistry.getUserMap();
 
-		// task specific initialization
-		init();
-		
-		LOG.debug("Finish initialization for {}", getName());
+			stateBackend = createStateBackend();
+			stateBackend.initializeForJob(getEnvironment().getJobID());
+
+			headOperator = configuration.getStreamOperator(userClassLoader);
+			operatorChain = new OperatorChain<>(this, headOperator, accumulatorRegistry.getReadWriteReporter());
+
+			if (headOperator != null) {
+				headOperator.setup(this, configuration, operatorChain.getChainEntryPoint());
+			}
+
+			timerService = Executors.newSingleThreadScheduledExecutor(
+					new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
+
+			// task specific initialization
+			init();
+			
+			initializationCompleted = true;
+		}
+		finally {
+			if (!initializationCompleted) {
+				if (timerService != null) {
+					timerService.shutdownNow();
+				}
+				if (operatorChain != null) {
+					operatorChain.releaseOutputs();
+				}
+			}
+		}
 	}
 	
 	@Override
@@ -188,6 +204,9 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		
 		boolean disposed = false;
 		try {
+			// first order of business is to ive operators back their state
+			restoreStateLazy();
+			
 			// we need to make sure that any triggers scheduled in open() cannot be
 			// executed before all operators are opened
 			synchronized (lock) {
@@ -223,7 +242,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		finally {
 			isRunning = false;
 
-			timerService.shutdown();
+			timerService.shutdownNow();
 			
 			// release the output resources. this method should never fail.
 			if (operatorChain != null) {
@@ -263,7 +282,9 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	
 	private void openAllOperators() throws Exception {
 		for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
-			operator.open();
+			if (operator != null) {
+				operator.open();
+			}
 		}
 	}
 
@@ -272,20 +293,27 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		// elements in their close methods.
 		StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
 		for (int i = allOperators.length - 1; i >= 0; i--) {
-			allOperators[i].close();
+			StreamOperator<?> operator = allOperators[i];
+			if (operator != null) {
+				operator.close();
+			}
 		}
 	}
 
 	private void tryDisposeAllOperators() throws Exception {
 		for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
-			operator.dispose();
+			if (operator != null) {
+				operator.dispose();
+			}
 		}
 	}
 	
 	private void disposeAllOperators() {
 		for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
 			try {
-				operator.dispose();
+				if (operator != null) {
+					operator.dispose();
+				}
 			}
 			catch (Throwable t) {
 				LOG.error("Error during disposal of stream operator.", t);
@@ -354,22 +382,36 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	// ------------------------------------------------------------------------
 	
 	@Override
-	public void setInitialState(StreamTaskStateList initialState) throws Exception {
-		LOG.info("Restoring checkpointed state to task {}", getName());
-		
-		final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
-		final StreamTaskState[] states = initialState.getState(userClassLoader);
-		
-		for (int i = 0; i < states.length; i++) {
-			StreamTaskState state = states[i];
-			StreamOperator<?> operator = allOperators[i];
+	public void setInitialState(StreamTaskStateList initialState) {
+		lazyRestoreState = initialState;
+	}
+	
+	public void restoreStateLazy() throws Exception {
+		if (lazyRestoreState != null) {
+			LOG.info("Restoring checkpointed state to task {}", getName());
 			
-			if (state != null && operator != null) {
-				LOG.debug("Task {} in chain ({}) has checkpointed state", i, getName());
-				operator.restoreState(state);
+			try {
+				final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
+				final StreamTaskState[] states = lazyRestoreState.getState(userClassLoader);
+				
+				// be GC friendly
+				lazyRestoreState = null;
+				
+				for (int i = 0; i < states.length; i++) {
+					StreamTaskState state = states[i];
+					StreamOperator<?> operator = allOperators[i];
+					
+					if (state != null && operator != null) {
+						LOG.debug("Task {} in chain ({}) has checkpointed state", i, getName());
+						operator.restoreState(state);
+					}
+					else if (operator != null) {
+						LOG.debug("Task {} in chain ({}) does not have checkpointed state", i, getName());
+					}
+				}
 			}
-			else if (operator != null) {
-				LOG.debug("Task {} in chain ({}) does not have checkpointed state", i, getName());
+			catch (Exception e) {
+				throw new Exception("Could not restore checkpointed state to operators and functions", e);
 			}
 		}
 	}
@@ -380,24 +422,27 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		
 		synchronized (lock) {
 			if (isRunning) {
-
-				// since both state checkpointing and downstream barrier emission occurs in this
-				// lock scope, they are an atomic operation regardless of the order in which they occur
-				// we immediately emit the checkpoint barriers, so the downstream operators can start
-				// their checkpoint work as soon as possible
-				operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
-				
 				// now draw the state snapshot
 				try {
 					final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
 					final StreamTaskState[] states = new StreamTaskState[allOperators.length];
 					
 					for (int i = 0; i < states.length; i++) {
-						StreamTaskState state = allOperators[i].snapshotOperatorState(checkpointId, timestamp);
-						states[i] = state.isEmpty() ? null : state;
+						StreamOperator<?> operator = allOperators[i];
+						if (operator != null) {
+							StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp);
+							states[i] = state.isEmpty() ? null : state;
+						}
 					}
 
 					StreamTaskStateList allStates = new StreamTaskStateList(states);
+
+					// since both state checkpointing and downstream barrier emission occurs in this
+					// lock scope, they are an atomic operation regardless of the order in which they occur
+					// we immediately emit the checkpoint barriers, so the downstream operators can start
+					// their checkpoint work as soon as possible
+					operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
+					
 					if (allStates.isEmpty()) {
 						getEnvironment().acknowledgeCheckpoint(checkpointId);
 					} else {
@@ -420,7 +465,9 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 				LOG.debug("Notification of complete checkpoint for task {}", getName());
 				
 				for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
-					operator.notifyOfCompletedCheckpoint(checkpointId);
+					if (operator != null) {
+						operator.notifyOfCompletedCheckpoint(checkpointId);
+					}
 				}
 			}
 			else {