You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/29 22:51:10 UTC

[3/7] flink git commit: [hotfix] [dist. coordination] Small code cleanups in ExecutionGraph and related classes

[hotfix] [dist. coordination] Small code cleanups in ExecutionGraph and related classes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/874d9565
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/874d9565
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/874d9565

Branch: refs/heads/master
Commit: 874d956561f817a2578d7d7e6686d598323dc4c8
Parents: 69843fe
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 21 18:12:23 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 29 17:11:49 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |   6 +-
 .../runtime/executiongraph/ExecutionGraph.java  | 140 +++++++++----------
 .../flink/runtime/executiongraph/IOMetrics.java |   2 +
 .../runtime/executiongraph/JobInformation.java  |   8 ++
 .../runtime/taskmanager/TaskExecutionState.java |   3 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |  12 +-
 .../ExecutionGraphMetricsTest.java              |   2 +-
 7 files changed, 87 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index e17a3e5..1a3ef11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -820,7 +820,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	void sendPartitionInfos() {
 		// check if the ExecutionVertex has already been archived and thus cleared the
 		// partial partition infos queue
-		if(partialInputChannelDeploymentDescriptors != null && !partialInputChannelDeploymentDescriptors.isEmpty()) {
+		if (partialInputChannelDeploymentDescriptors != null && !partialInputChannelDeploymentDescriptors.isEmpty()) {
 
 			PartialInputChannelDeploymentDescriptor partialInputChannelDeploymentDescriptor;
 
@@ -931,7 +931,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			}
 			else if (currentState == CANCELING || currentState == FAILED) {
 				if (LOG.isDebugEnabled()) {
-					LOG.debug(String.format("Concurrent canceling/failing of %s while deployment was in progress.", getVertexWithAttempt()));
+					// this log statement is guarded because the 'getVertexWithAttempt()' method
+					// performs string concatenations 
+					LOG.debug("Concurrent canceling/failing of {} while deployment was in progress.", getVertexWithAttempt());
 				}
 				sendCancelRpcCall();
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 6bb3455..e911f49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.commons.lang3.StringUtils;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -63,7 +64,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
@@ -92,6 +92,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -131,7 +132,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 	/** The lock used to secure all access to mutable fields, especially the tracking of progress
 	 * within the job. */
-	private final SerializableObject progressLock = new SerializableObject();
+	private final Object progressLock = new Object();
 
 	/** Job specific information like the job id, job name, job configuration, etc. */
 	private final JobInformation jobInformation;
@@ -222,7 +223,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 	/** Checkpoint stats tracker separate from the coordinator in order to be
 	 * available after archiving. */
-	@SuppressWarnings("NonSerializableFieldInSerializableClass")
 	private CheckpointStatsTracker checkpointStatsTracker;
 
 	// ------ Fields that are only relevant for archived execution graphs ------------
@@ -235,6 +235,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/**
 	 * This constructor is for tests only, because it does not include class loading information.
 	 */
+	@VisibleForTesting
 	ExecutionGraph(
 			ScheduledExecutorService futureExecutor,
 			Executor ioExecutor,
@@ -369,24 +370,16 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			CheckpointStatsTracker statsTracker) {
 
 		// simple sanity checks
-		if (interval < 10 || checkpointTimeout < 10) {
-			throw new IllegalArgumentException();
-		}
-		if (state != JobStatus.CREATED) {
-			throw new IllegalStateException("Job must be in CREATED state");
-		}
+		checkArgument(interval >= 10, "checkpoint interval must not be below 10ms");
+		checkArgument(checkpointTimeout >= 10, "checkpoint timeout must not be below 10ms");
+
+		checkState(state == JobStatus.CREATED, "Job must be in CREATED state");
+		checkState(checkpointCoordinator == null, "checkpointing already enabled");
 
 		ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
 		ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
 		ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
 
-		// disable to make sure existing checkpoint coordinators are cleared
-		try {
-			disableSnaphotCheckpointing();
-		} catch (Throwable t) {
-			LOG.error("Error while shutting down checkpointer.");
-		}
-
 		checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
 
 		// create the coordinator that triggers and commits checkpoints and holds the state
@@ -416,24 +409,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
-	/**
-	 * Disables checkpointing.
-	 *
-	 * <p>The shutdown of the checkpoint coordinator might block. Make sure that calls to this
-	 * method don't block the job manager actor and run asynchronously.
-	 */
-	public void disableSnaphotCheckpointing() throws Exception {
-		if (state != JobStatus.CREATED) {
-			throw new IllegalStateException("Job must be in CREATED state");
-		}
-
-		if (checkpointCoordinator != null) {
-			checkpointCoordinator.shutdown(state);
-			checkpointCoordinator = null;
-			checkpointStatsTracker = null;
-		}
-	}
-
 	@Override
 	public CheckpointCoordinator getCheckpointCoordinator() {
 		return checkpointCoordinator;
@@ -761,7 +736,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 	private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
 		// simply take the vertices without inputs.
-		for (ExecutionJobVertex ejv : this.tasks.values()) {
+		for (ExecutionJobVertex ejv : verticesInCreationOrder) {
 			if (ejv.getJobVertex().isInputVertex()) {
 				ejv.scheduleAll(slotProvider, allowQueuedScheduling);
 			}
@@ -932,9 +907,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	}
 
 	public void stop() throws StoppingException {
-		if(this.isStoppable) {
-			for(ExecutionVertex ev : this.getAllExecutionVertices()) {
-				if(ev.getNumberOfInputs() == 0) { // send signal to sources only
+		if (isStoppable) {
+			for (ExecutionVertex ev : this.getAllExecutionVertices()) {
+				if (ev.getNumberOfInputs() == 0) { // send signal to sources only
 					ev.stop();
 				}
 			}
@@ -1011,7 +986,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				return;
 			}
 
-			// no need to treat other states
+			// else: concurrent change to execution state, retry
 		}
 	}
 
@@ -1273,35 +1248,47 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * @return True, if the task update was properly applied, false, if the execution attempt was not found.
 	 */
 	public boolean updateState(TaskExecutionState state) {
-		Execution attempt = this.currentExecutions.get(state.getID());
+		final Execution attempt = currentExecutions.get(state.getID());
+
 		if (attempt != null) {
+			try {
+				Map<String, Accumulator<?, ?>> accumulators;
+
+				switch (state.getExecutionState()) {
+					case RUNNING:
+						return attempt.switchToRunning();
+	
+					case FINISHED:
+						// this deserialization is exception-free
+						accumulators = deserializeAccumulators(state);
+						attempt.markFinished(accumulators, state.getIOMetrics());
+						return true;
+	
+					case CANCELED:
+						// this deserialization is exception-free
+						accumulators = deserializeAccumulators(state);
+						attempt.cancelingComplete(accumulators, state.getIOMetrics());
+						return true;
+	
+					case FAILED:
+						// this deserialization is exception-free
+						accumulators = deserializeAccumulators(state);
+						attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics());
+						return true;
+	
+					default:
+						// we mark as failed and return false, which triggers the TaskManager
+						// to remove the task
+						attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState()));
+						return false;
+				}
+			}
+			catch (Throwable t) {
+				ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
 
-			switch (state.getExecutionState()) {
-				case RUNNING:
-					return attempt.switchToRunning();
-				case FINISHED:
-					try {
-						Map<String, Accumulator<?, ?>> userAccumulators = deserializeAccumulators(state);
-						attempt.markFinished(userAccumulators, state.getIOMetrics());
-					}
-					catch (Exception e) {
-						LOG.error("Failed to deserialize final accumulator results.", e);
-						attempt.markFailed(e);
-					}
-					return true;
-				case CANCELED:
-					Map<String, Accumulator<?, ?>> userAcc1 = deserializeAccumulators(state);
-					attempt.cancelingComplete(userAcc1, state.getIOMetrics());
-					return true;
-				case FAILED:
-					Map<String, Accumulator<?, ?>> userAcc2 = deserializeAccumulators(state);
-					attempt.markFailed(state.getError(userClassLoader), userAcc2, state.getIOMetrics());
-					return true;
-				default:
-					// we mark as failed and return false, which triggers the TaskManager
-					// to remove the task
-					attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState()));
-					return false;
+				// failures during updates leave the ExecutionGraph inconsistent
+				fail(t);
+				return false;
 			}
 		}
 		else {
@@ -1309,17 +1296,28 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
+	/**
+	 * Deserializes accumulators from a task state update.
+	 * 
+	 * <p>This method never throws an exception!
+	 * 
+	 * @param state The task execution state from which to deserialize the accumulators.
+	 * @return The deserialized accumulators, of null, if there are no accumulators or an error occurred.
+	 */
 	private Map<String, Accumulator<?, ?>> deserializeAccumulators(TaskExecutionState state) {
 		AccumulatorSnapshot serializedAccumulators = state.getAccumulators();
-		Map<String, Accumulator<?, ?>> accumulators = null;
+
 		if (serializedAccumulators != null) {
 			try {
-				accumulators = serializedAccumulators.deserializeUserAccumulators(userClassLoader);
-			} catch (Exception e) {
-				LOG.error("Failed to deserialize final accumulator results.", e);
+				return serializedAccumulators.deserializeUserAccumulators(userClassLoader);
+			}
+			catch (Throwable t) {
+				// we catch Throwable here to include all form of linking errors that may
+				// occur if user classes are missing in the classpath
+				LOG.error("Failed to deserialize final accumulator results.", t);
 			}
 		}
-		return accumulators;
+		return null;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
index 82c376e..668418d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
@@ -25,7 +25,9 @@ import java.io.Serializable;
  * An instance of this class represents a snapshot of the io-related metrics of a single task.
  */
 public class IOMetrics implements Serializable {
+
 	private static final long serialVersionUID = -7208093607556457183L;
+
 	protected long numRecordsIn;
 	protected long numRecordsOut;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
index 6e3c1e8..f497f8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
@@ -93,4 +93,12 @@ public class JobInformation implements Serializable {
 	public Collection<URL> getRequiredClasspathURLs() {
 		return requiredClasspathURLs;
 	}
+
+	// ------------------------------------------------------------------------
+
+
+	@Override
+	public String toString() {
+		return "JobInformation for '" + jobName + "' (" + jobId + ')';
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index 5cc2484..9395435 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -49,8 +49,9 @@ public class TaskExecutionState implements Serializable {
 
 	private final SerializedThrowable throwable;
 
-	/** Serialized flink and user-defined accumulators */
+	/** Serialized user-defined accumulators */
 	private final AccumulatorSnapshot accumulators;
+
 	private final IOMetrics ioMetrics;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 98b4c4d..81162b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -18,14 +18,11 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -37,7 +34,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
-import org.junit.AfterClass;
+
 import org.junit.Test;
 import org.mockito.Matchers;
 
@@ -50,13 +47,6 @@ import static org.mockito.Mockito.verify;
 
 public class ExecutionGraphCheckpointCoordinatorTest {
 
-	private static ActorSystem system = AkkaUtils.createLocalActorSystem(new Configuration());
-
-	@AfterClass
-	public static void teardown() {
-		JavaTestKit.shutdownActorSystem(system);
-	}
-
 	/**
 	 * Tests that a shut down checkpoint coordinator calls shutdown on
 	 * the store and counter.

http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 203c547..5496e35 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -162,7 +162,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 				testingRestartStrategy,
 				Collections.<BlobKey>emptyList(),
 				Collections.<URL>emptyList(),
-			scheduler,
+				scheduler,
 				getClass().getClassLoader(),
 				metricGroup);