You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/23 08:13:51 UTC
[2/2] flink git commit: [FLINK-5107] Handle evicted execution
attempts in request handlers
[FLINK-5107] Handle evicted execution attempts in request handlers
If a prior execution attempt cannot be retrieved because it has been evicted before,
the request handler will now throw a meaningful exception to notify the requester
about the evicted execution attempt.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/871de0bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/871de0bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/871de0bf
Branch: refs/heads/release-1.1
Commit: 871de0bf7a28a79222406f73048432ab156c7b0f
Parents: 8989a9f
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 23 00:19:32 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 23 00:46:40 2016 +0100
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 13 +++++++
.../AbstractSubtaskAttemptRequestHandler.java | 8 ++++-
.../handlers/RequestHandlerException.java | 31 ++++++++++++++++
...taskExecutionAttemptAccumulatorsHandler.java | 6 ----
.../executiongraph/ExecutionJobVertex.java | 8 +++--
.../runtime/executiongraph/ExecutionVertex.java | 18 +++++-----
.../runtime/jobmanager/JobManagerOptions.java | 38 --------------------
7 files changed, 66 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 1431eae..c70388a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -122,6 +122,13 @@ public final class ConfigConstants {
public static final String JOB_MANAGER_IPC_PORT_KEY = "jobmanager.rpc.port";
/**
+ * The config parameter defining the number of prior execution attempt information being stored
+ * on the job manager before the oldest execution attempt information is deleted.
+ */
+ @PublicEvolving
+ public static final String JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE = "jobmanager.max-attempts-history-size";
+
+ /**
* The config parameter defining the network port to connect to
* for communication with the resource manager.
*/
@@ -777,6 +784,12 @@ public final class ConfigConstants {
public static final int DEFAULT_JOB_MANAGER_IPC_PORT = 6123;
/**
+ * The default number of prior execution attempt information being stored on
+ * the job manager before the oldest information is deleted.
+ */
+ public static final int DEFAULT_JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE = 16;
+
+ /**
* The default network port of the resource manager.
*/
public static final int DEFAULT_RESOURCE_MANAGER_IPC_PORT = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
index 672df16..0f906fe 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
@@ -57,7 +57,13 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta
}
else if (attempt >= 0 && attempt < currentAttempt.getAttemptNumber()) {
Execution exec = vertex.getPriorExecutionAttempt(attempt);
- return handleRequest(exec, params);
+
+ if (exec != null) {
+ return handleRequest(exec, params);
+ } else {
+ throw new RequestHandlerException("Execution for attempt " + attempt +
+ " has already been deleted.");
+ }
}
else {
throw new RuntimeException("Attempt does not exist: " + attempt);
http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
new file mode 100644
index 0000000..bb61d16
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+/**
+ * Base class for request handler exceptions.
+ */
+public class RequestHandlerException extends Exception {
+
+ private static final long serialVersionUID = 7570352908725875886L;
+
+ public RequestHandlerException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index f661126..ade241a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -38,12 +38,6 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
@Override
public String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception {
-
- // return empty string for pruned (== null) execution attempts
- if (null == execAttempt) {
- return "";
- }
-
final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified();
StringWriter writer = new StringWriter();
http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 7af9868..47aaa45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.io.StrictlyLocalAssignment;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
@@ -38,7 +39,6 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.JobManagerOptions;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
@@ -154,8 +154,10 @@ public class ExecutionJobVertex implements Serializable {
Configuration jobConfiguration = graph.getJobConfiguration();
int maxPriorAttemptsHistoryLength = jobConfiguration != null ?
- jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) :
- JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue();
+ jobConfiguration.getInteger(
+ ConfigConstants.JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE,
+ ConfigConstants.DEFAULT_JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE)
+ : ConfigConstants.DEFAULT_JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE;
// create all task vertices
for (int i = 0; i < numTaskVertices; i++) {
http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 6e76d8f..b1e8475 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -36,7 +37,6 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.JobManagerOptions;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
@@ -105,12 +105,12 @@ public class ExecutionVertex implements Serializable {
IntermediateResult[] producedDataSets,
FiniteDuration timeout) {
this(
- jobVertex,
- subTaskIndex,
- producedDataSets,
- timeout,
- System.currentTimeMillis(),
- JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
+ jobVertex,
+ subTaskIndex,
+ producedDataSets,
+ timeout,
+ System.currentTimeMillis(),
+ ConfigConstants.DEFAULT_JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE);
}
public ExecutionVertex(
@@ -126,7 +126,7 @@ public class ExecutionVertex implements Serializable {
ExecutionJobVertex jobVertex,
int subTaskIndex,
IntermediateResult[] producedDataSets,
- Time timeout,
+ FiniteDuration timeout,
long createTimestamp,
int maxPriorExecutionHistoryLength) {
this.jobVertex = jobVertex;
@@ -254,6 +254,8 @@ public class ExecutionVertex implements Serializable {
synchronized (priorExecutions) {
return new EvictingBoundedList<>(priorExecutions);
}
+ }
+
public ExecutionGraph getExecutionGraph() {
return this.jobVertex.getGraph();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/871de0bf/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
deleted file mode 100644
index 279a70e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.ConfigOption;
-
-import static org.apache.flink.configuration.ConfigOptions.key;
-
-@PublicEvolving
-public class JobManagerOptions {
-
- /**
- * The maximum number of prior execution attempts kept in history.
- */
- public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE =
- key("job-manager.max-attempts-history-size").defaultValue(16);
-
- private JobManagerOptions() {
- throw new IllegalAccessError();
- }
-}