You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:52 UTC
[41/50] [abbrv] flink git commit: [hotfix] [dist. coordination] Small
code cleanups in ExecutionGraph and related classes
[hotfix] [dist. coordination] Small code cleanups in ExecutionGraph and related classes
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/874d9565
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/874d9565
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/874d9565
Branch: refs/heads/table-retraction
Commit: 874d956561f817a2578d7d7e6686d598323dc4c8
Parents: 69843fe
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 21 18:12:23 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 29 17:11:49 2017 +0200
----------------------------------------------------------------------
.../flink/runtime/executiongraph/Execution.java | 6 +-
.../runtime/executiongraph/ExecutionGraph.java | 140 +++++++++----------
.../flink/runtime/executiongraph/IOMetrics.java | 2 +
.../runtime/executiongraph/JobInformation.java | 8 ++
.../runtime/taskmanager/TaskExecutionState.java | 3 +-
...ExecutionGraphCheckpointCoordinatorTest.java | 12 +-
.../ExecutionGraphMetricsTest.java | 2 +-
7 files changed, 87 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index e17a3e5..1a3ef11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -820,7 +820,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
void sendPartitionInfos() {
// check if the ExecutionVertex has already been archived and thus cleared the
// partial partition infos queue
- if(partialInputChannelDeploymentDescriptors != null && !partialInputChannelDeploymentDescriptors.isEmpty()) {
+ if (partialInputChannelDeploymentDescriptors != null && !partialInputChannelDeploymentDescriptors.isEmpty()) {
PartialInputChannelDeploymentDescriptor partialInputChannelDeploymentDescriptor;
@@ -931,7 +931,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
else if (currentState == CANCELING || currentState == FAILED) {
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Concurrent canceling/failing of %s while deployment was in progress.", getVertexWithAttempt()));
+ // this log statement is guarded because the 'getVertexWithAttempt()' method
+ // performs string concatenations
+ LOG.debug("Concurrent canceling/failing of {} while deployment was in progress.", getVertexWithAttempt());
}
sendCancelRpcCall();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 6bb3455..e911f49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
@@ -63,7 +64,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
@@ -92,6 +92,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -131,7 +132,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/** The lock used to secure all access to mutable fields, especially the tracking of progress
* within the job. */
- private final SerializableObject progressLock = new SerializableObject();
+ private final Object progressLock = new Object();
/** Job specific information like the job id, job name, job configuration, etc. */
private final JobInformation jobInformation;
@@ -222,7 +223,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/** Checkpoint stats tracker separate from the coordinator in order to be
* available after archiving. */
- @SuppressWarnings("NonSerializableFieldInSerializableClass")
private CheckpointStatsTracker checkpointStatsTracker;
// ------ Fields that are only relevant for archived execution graphs ------------
@@ -235,6 +235,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/**
* This constructor is for tests only, because it does not include class loading information.
*/
+ @VisibleForTesting
ExecutionGraph(
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
@@ -369,24 +370,16 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
CheckpointStatsTracker statsTracker) {
// simple sanity checks
- if (interval < 10 || checkpointTimeout < 10) {
- throw new IllegalArgumentException();
- }
- if (state != JobStatus.CREATED) {
- throw new IllegalStateException("Job must be in CREATED state");
- }
+ checkArgument(interval >= 10, "checkpoint interval must not be below 10ms");
+ checkArgument(checkpointTimeout >= 10, "checkpoint timeout must not be below 10ms");
+
+ checkState(state == JobStatus.CREATED, "Job must be in CREATED state");
+ checkState(checkpointCoordinator == null, "checkpointing already enabled");
ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
- // disable to make sure existing checkpoint coordinators are cleared
- try {
- disableSnaphotCheckpointing();
- } catch (Throwable t) {
- LOG.error("Error while shutting down checkpointer.");
- }
-
checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
// create the coordinator that triggers and commits checkpoints and holds the state
@@ -416,24 +409,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
}
- /**
- * Disables checkpointing.
- *
- * <p>The shutdown of the checkpoint coordinator might block. Make sure that calls to this
- * method don't block the job manager actor and run asynchronously.
- */
- public void disableSnaphotCheckpointing() throws Exception {
- if (state != JobStatus.CREATED) {
- throw new IllegalStateException("Job must be in CREATED state");
- }
-
- if (checkpointCoordinator != null) {
- checkpointCoordinator.shutdown(state);
- checkpointCoordinator = null;
- checkpointStatsTracker = null;
- }
- }
-
@Override
public CheckpointCoordinator getCheckpointCoordinator() {
return checkpointCoordinator;
@@ -761,7 +736,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
// simply take the vertices without inputs.
- for (ExecutionJobVertex ejv : this.tasks.values()) {
+ for (ExecutionJobVertex ejv : verticesInCreationOrder) {
if (ejv.getJobVertex().isInputVertex()) {
ejv.scheduleAll(slotProvider, allowQueuedScheduling);
}
@@ -932,9 +907,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
public void stop() throws StoppingException {
- if(this.isStoppable) {
- for(ExecutionVertex ev : this.getAllExecutionVertices()) {
- if(ev.getNumberOfInputs() == 0) { // send signal to sources only
+ if (isStoppable) {
+ for (ExecutionVertex ev : this.getAllExecutionVertices()) {
+ if (ev.getNumberOfInputs() == 0) { // send signal to sources only
ev.stop();
}
}
@@ -1011,7 +986,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return;
}
- // no need to treat other states
+ // else: concurrent change to execution state, retry
}
}
@@ -1273,35 +1248,47 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
* @return True, if the task update was properly applied, false, if the execution attempt was not found.
*/
public boolean updateState(TaskExecutionState state) {
- Execution attempt = this.currentExecutions.get(state.getID());
+ final Execution attempt = currentExecutions.get(state.getID());
+
if (attempt != null) {
+ try {
+ Map<String, Accumulator<?, ?>> accumulators;
+
+ switch (state.getExecutionState()) {
+ case RUNNING:
+ return attempt.switchToRunning();
+
+ case FINISHED:
+ // this deserialization is exception-free
+ accumulators = deserializeAccumulators(state);
+ attempt.markFinished(accumulators, state.getIOMetrics());
+ return true;
+
+ case CANCELED:
+ // this deserialization is exception-free
+ accumulators = deserializeAccumulators(state);
+ attempt.cancelingComplete(accumulators, state.getIOMetrics());
+ return true;
+
+ case FAILED:
+ // this deserialization is exception-free
+ accumulators = deserializeAccumulators(state);
+ attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics());
+ return true;
+
+ default:
+ // we mark as failed and return false, which triggers the TaskManager
+ // to remove the task
+ attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState()));
+ return false;
+ }
+ }
+ catch (Throwable t) {
+ ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
- switch (state.getExecutionState()) {
- case RUNNING:
- return attempt.switchToRunning();
- case FINISHED:
- try {
- Map<String, Accumulator<?, ?>> userAccumulators = deserializeAccumulators(state);
- attempt.markFinished(userAccumulators, state.getIOMetrics());
- }
- catch (Exception e) {
- LOG.error("Failed to deserialize final accumulator results.", e);
- attempt.markFailed(e);
- }
- return true;
- case CANCELED:
- Map<String, Accumulator<?, ?>> userAcc1 = deserializeAccumulators(state);
- attempt.cancelingComplete(userAcc1, state.getIOMetrics());
- return true;
- case FAILED:
- Map<String, Accumulator<?, ?>> userAcc2 = deserializeAccumulators(state);
- attempt.markFailed(state.getError(userClassLoader), userAcc2, state.getIOMetrics());
- return true;
- default:
- // we mark as failed and return false, which triggers the TaskManager
- // to remove the task
- attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState()));
- return false;
+ // failures during updates leave the ExecutionGraph inconsistent
+ fail(t);
+ return false;
}
}
else {
@@ -1309,17 +1296,28 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
}
+ /**
+ * Deserializes accumulators from a task state update.
+ *
+ * <p>This method never throws an exception!
+ *
+ * @param state The task execution state from which to deserialize the accumulators.
+ * @return The deserialized accumulators, of null, if there are no accumulators or an error occurred.
+ */
private Map<String, Accumulator<?, ?>> deserializeAccumulators(TaskExecutionState state) {
AccumulatorSnapshot serializedAccumulators = state.getAccumulators();
- Map<String, Accumulator<?, ?>> accumulators = null;
+
if (serializedAccumulators != null) {
try {
- accumulators = serializedAccumulators.deserializeUserAccumulators(userClassLoader);
- } catch (Exception e) {
- LOG.error("Failed to deserialize final accumulator results.", e);
+ return serializedAccumulators.deserializeUserAccumulators(userClassLoader);
+ }
+ catch (Throwable t) {
+ // we catch Throwable here to include all form of linking errors that may
+ // occur if user classes are missing in the classpath
+ LOG.error("Failed to deserialize final accumulator results.", t);
}
}
- return accumulators;
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
index 82c376e..668418d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
@@ -25,7 +25,9 @@ import java.io.Serializable;
* An instance of this class represents a snapshot of the io-related metrics of a single task.
*/
public class IOMetrics implements Serializable {
+
private static final long serialVersionUID = -7208093607556457183L;
+
protected long numRecordsIn;
protected long numRecordsOut;
http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
index 6e3c1e8..f497f8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
@@ -93,4 +93,12 @@ public class JobInformation implements Serializable {
public Collection<URL> getRequiredClasspathURLs() {
return requiredClasspathURLs;
}
+
+ // ------------------------------------------------------------------------
+
+
+ @Override
+ public String toString() {
+ return "JobInformation for '" + jobName + "' (" + jobId + ')';
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index 5cc2484..9395435 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -49,8 +49,9 @@ public class TaskExecutionState implements Serializable {
private final SerializedThrowable throwable;
- /** Serialized flink and user-defined accumulators */
+ /** Serialized user-defined accumulators */
private final AccumulatorSnapshot accumulators;
+
private final IOMetrics ioMetrics;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 98b4c4d..81162b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -18,14 +18,11 @@
package org.apache.flink.runtime.checkpoint;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -37,7 +34,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
-import org.junit.AfterClass;
+
import org.junit.Test;
import org.mockito.Matchers;
@@ -50,13 +47,6 @@ import static org.mockito.Mockito.verify;
public class ExecutionGraphCheckpointCoordinatorTest {
- private static ActorSystem system = AkkaUtils.createLocalActorSystem(new Configuration());
-
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- }
-
/**
* Tests that a shut down checkpoint coordinator calls shutdown on
* the store and counter.
http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 203c547..5496e35 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -162,7 +162,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
testingRestartStrategy,
Collections.<BlobKey>emptyList(),
Collections.<URL>emptyList(),
- scheduler,
+ scheduler,
getClass().getClassLoader(),
metricGroup);