You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/23 08:13:51 UTC

[2/2] flink git commit: [FLINK-5107] Handle evicted execution attempts in request handlers

[FLINK-5107] Handle evicted execution attempts in request handlers

If a prior execution attempt cannot be retrieved because it has been evicted before,
the request handler will now throw a meaningful exception to notify the requester
about the evicted execution attempt.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/871de0bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/871de0bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/871de0bf

Branch: refs/heads/release-1.1
Commit: 871de0bf7a28a79222406f73048432ab156c7b0f
Parents: 8989a9f
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 23 00:19:32 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 23 00:46:40 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    | 13 +++++++
 .../AbstractSubtaskAttemptRequestHandler.java   |  8 ++++-
 .../handlers/RequestHandlerException.java       | 31 ++++++++++++++++
 ...taskExecutionAttemptAccumulatorsHandler.java |  6 ----
 .../executiongraph/ExecutionJobVertex.java      |  8 +++--
 .../runtime/executiongraph/ExecutionVertex.java | 18 +++++-----
 .../runtime/jobmanager/JobManagerOptions.java   | 38 --------------------
 7 files changed, 66 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 1431eae..c70388a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -122,6 +122,13 @@ public final class ConfigConstants {
 	public static final String JOB_MANAGER_IPC_PORT_KEY = "jobmanager.rpc.port";
 
 	/**
+	 * The config parameter defining the number of prior execution attempt information being stored
+	 * on the job manager before the oldest execution attempt information is deleted.
+	 */
+	@PublicEvolving
+	public static final String JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE = "jobmanager.max-attempts-history-size";
+
+	/**
 	 * The config parameter defining the network port to connect to
 	 * for communication with the resource manager.
 	 */
@@ -777,6 +784,12 @@ public final class ConfigConstants {
 	public static final int DEFAULT_JOB_MANAGER_IPC_PORT = 6123;
 
 	/**
+	 * The default number of prior execution attempt information being stored on
+	 * the job manager before the oldest information is deleted.
+	 */
+	public static final int DEFAULT_JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE = 16;
+
+	/**
 	 * The default network port of the resource manager.
 	 */
 	public static final int DEFAULT_RESOURCE_MANAGER_IPC_PORT = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
index 672df16..0f906fe 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
@@ -57,7 +57,13 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta
 		}
 		else if (attempt >= 0 && attempt < currentAttempt.getAttemptNumber()) {
 			Execution exec = vertex.getPriorExecutionAttempt(attempt);
-			return handleRequest(exec, params);
+
+			if (exec != null) {
+				return handleRequest(exec, params);
+			} else {
+				throw new RequestHandlerException("Execution for attempt " + attempt +
+					" has already been deleted.");
+			}
 		}
 		else {
 			throw new RuntimeException("Attempt does not exist: " + attempt);

http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
new file mode 100644
index 0000000..bb61d16
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+/**
+ * Base class for request handler exceptions.
+ */
+public class RequestHandlerException extends Exception {
+
+	private static final long serialVersionUID = 7570352908725875886L;
+
+	public RequestHandlerException(String message) {
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index f661126..ade241a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -38,12 +38,6 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
 
 	@Override
 	public String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception {
-
-		// return empty string for pruned (== null) execution attempts
-		if (null == execAttempt) {
-			return "";
-		}
-
 		final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified();
 		
 		StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 7af9868..47aaa45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.io.StrictlyLocalAssignment;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
@@ -38,7 +39,6 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
@@ -154,8 +154,10 @@ public class ExecutionJobVertex implements Serializable {
 
 		Configuration jobConfiguration = graph.getJobConfiguration();
 		int maxPriorAttemptsHistoryLength = jobConfiguration != null ?
-				jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) :
-				JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue();
+				jobConfiguration.getInteger(
+					ConfigConstants.JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE,
+					ConfigConstants.DEFAULT_JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE)
+			: ConfigConstants.DEFAULT_JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE;
 
 		// create all task vertices
 		for (int i = 0; i < numTaskVertices; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 6e76d8f..b1e8475 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -36,7 +37,6 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
@@ -105,12 +105,12 @@ public class ExecutionVertex implements Serializable {
 			IntermediateResult[] producedDataSets,
 			FiniteDuration timeout) {
 		this(
-				jobVertex,
-				subTaskIndex,
-				producedDataSets,
-				timeout,
-				System.currentTimeMillis(),
-				JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
+			jobVertex,
+			subTaskIndex,
+			producedDataSets,
+			timeout,
+			System.currentTimeMillis(),
+			ConfigConstants.DEFAULT_JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE);
 	}
 
 	public ExecutionVertex(
@@ -126,7 +126,7 @@ public class ExecutionVertex implements Serializable {
 			ExecutionJobVertex jobVertex,
 			int subTaskIndex,
 			IntermediateResult[] producedDataSets,
-			Time timeout,
+			FiniteDuration timeout,
 			long createTimestamp,
 			int maxPriorExecutionHistoryLength) {
 		this.jobVertex = jobVertex;
@@ -254,6 +254,8 @@ public class ExecutionVertex implements Serializable {
 		synchronized (priorExecutions) {
 			return new EvictingBoundedList<>(priorExecutions);
 		}
+	}
+
 	public ExecutionGraph getExecutionGraph() {
 		return this.jobVertex.getGraph();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
deleted file mode 100644
index 279a70e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.ConfigOption;
-
-import static org.apache.flink.configuration.ConfigOptions.key;
-
-@PublicEvolving
-public class JobManagerOptions {
-
-	/**
-	 * The maximum number of prior execution attempts kept in history.
-	 */
-	public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE =
-			key("job-manager.max-attempts-history-size").defaultValue(16);
-
-	private JobManagerOptions() {
-		throw new IllegalAccessError();
-	}
-}