You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/10 13:55:52 UTC
[2/3] flink git commit: [FLINK-2645] [jobmanager] Fail job execution
if final accumulators cannot be merged and forward exceptions.
[FLINK-2645] [jobmanager] Fail job execution if final accumulators cannot be merged and forward exceptions.
This closes #1112
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/16fb4e91
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/16fb4e91
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/16fb4e91
Branch: refs/heads/master
Commit: 16fb4e919f4a76b8fe4910435b2183fa172f6e24
Parents: 9c2791b
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 9 13:49:40 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Sep 10 12:35:34 2015 +0200
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionGraph.java | 15 +-
.../executiongraph/ExecutionJobVertex.java | 2 +-
.../flink/runtime/jobmanager/JobManager.scala | 8 +-
.../accumulators/AccumulatorErrorITCase.java | 193 +++++++++++++++++++
4 files changed, 207 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/16fb4e91/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index a44fc82..1d8a37c 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -827,7 +827,7 @@ public class ExecutionGraph implements Serializable {
}
}
- void jobVertexInFinalState(ExecutionJobVertex ev) {
+ void jobVertexInFinalState() {
synchronized (progressLock) {
if (numFinishedJobVertices >= verticesInCreationOrder.size()) {
throw new IllegalStateException("All vertices are already finished, cannot transition vertex to finished.");
@@ -927,19 +927,18 @@ public class ExecutionGraph implements Serializable {
case RUNNING:
return attempt.switchToRunning();
case FINISHED:
- Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators = null;
- Map<String, Accumulator<?, ?>> userAccumulators = null;
try {
AccumulatorSnapshot accumulators = state.getAccumulators();
- flinkAccumulators = accumulators.deserializeFlinkAccumulators();
- userAccumulators = accumulators.deserializeUserAccumulators(userClassLoader);
+ Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators =
+ accumulators.deserializeFlinkAccumulators();
+ Map<String, Accumulator<?, ?>> userAccumulators =
+ accumulators.deserializeUserAccumulators(userClassLoader);
+ attempt.markFinished(flinkAccumulators, userAccumulators);
}
catch (Exception e) {
- // we do not fail the job on deserialization problems of accumulators, but only log
LOG.error("Failed to deserialize final accumulator results.", e);
+ attempt.markFailed(e);
}
-
- attempt.markFinished(flinkAccumulators, userAccumulators);
return true;
case CANCELED:
attempt.cancelingComplete();
http://git-wip-us.apache.org/repos/asf/flink/blob/16fb4e91/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index dea619a..999ca1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -482,7 +482,7 @@ public class ExecutionJobVertex implements Serializable {
stateMonitor.notifyAll();
// tell the graph
- graph.jobVertexInFinalState(this);
+ graph.jobVertexInFinalState();
} else {
numSubtasksInFinalState++;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/16fb4e91/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 18c453f..27fc6e3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -384,10 +384,14 @@ class JobManager(
newJobStatus match {
case JobStatus.FINISHED =>
val accumulatorResults: java.util.Map[String, SerializedValue[AnyRef]] = try {
- executionGraph.getAccumulatorsSerialized()
+ executionGraph.getAccumulatorsSerialized()
} catch {
case e: Exception =>
- log.error(s"Cannot fetch serialized accumulators for job $jobID", e)
+ log.error(s"Cannot fetch final accumulators for job $jobID", e)
+ val exception = new JobExecutionException(jobID,
+ "Failed to retrieve accumulator results.", e)
+ jobInfo.client ! decorateMessage(JobResultFailure(
+ new SerializedThrowable(exception)))
Collections.emptyMap()
}
val result = new SerializedJobExecutionResult(
http://git-wip-us.apache.org/repos/asf/flink/blob/16fb4e91/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
new file mode 100644
index 0000000..cac8451
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.accumulators;
+
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests cases where Accumulator are
+ * a) throw errors during runtime
+ * b) is not compatible with existing accumulator
+ */
+public class AccumulatorErrorITCase {
+
+ private static ForkableFlinkMiniCluster cluster;
+
+ @BeforeClass
+ public static void startCluster() {
+ try {
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
+ config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+ cluster = new ForkableFlinkMiniCluster(config, false);
+
+ cluster.start();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Failed to start test cluster: " + e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void shutdownCluster() {
+ try {
+ cluster.shutdown();
+ cluster = null;
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Failed to stop test cluster: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testFaultyAccumulator() throws Exception {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+ env.getConfig().disableSysoutLogging();
+
+ // Test Exception forwarding with faulty Accumulator implementation
+ DataSet<Long> input = env.generateSequence(0, 10000);
+
+ DataSet<Long> map = input.map(new FaultyAccumulatorUsingMapper());
+
+ map.output(new DiscardingOutputFormat<Long>());
+
+ try {
+ env.execute();
+ fail("Should have failed.");
+ } catch (ProgramInvocationException e) {
+ Assert.assertTrue("Exception should be passed:",
+ e.getCause() instanceof JobExecutionException);
+ Assert.assertTrue("Root cause should be:",
+ e.getCause().getCause() instanceof CustomException);
+ }
+ }
+
+
+ @Test
+ public void testInvalidTypeAccumulator() throws Exception {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+ env.getConfig().disableSysoutLogging();
+
+ // Test Exception forwarding with faulty Accumulator implementation
+ DataSet<Long> input = env.generateSequence(0, 10000);
+
+ DataSet<Long> mappers = input.map(new IncompatibleAccumulatorTypesMapper())
+ .map(new IncompatibleAccumulatorTypesMapper2());
+
+ mappers.output(new DiscardingOutputFormat<Long>());
+
+ try {
+ env.execute();
+ fail("Should have failed.");
+ } catch (ProgramInvocationException e) {
+ Assert.assertTrue("Exception should be passed:",
+ e.getCause() instanceof JobExecutionException);
+ Assert.assertTrue("Root cause should be:",
+ e.getCause().getCause() instanceof Exception);
+ Assert.assertTrue("Root cause should be:",
+ e.getCause().getCause().getCause() instanceof UnsupportedOperationException);
+ }
+ }
+
+ /* testFaultyAccumulator */
+
+ private static class FaultyAccumulatorUsingMapper extends RichMapFunction<Long, Long> {
+
+ private static final long serialVersionUID = 42;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ getRuntimeContext().addAccumulator("test", new FaultyAccumulator());
+ }
+
+ @Override
+ public Long map(Long value) throws Exception {
+ return -1L;
+ }
+ }
+
+ private static class FaultyAccumulator extends LongCounter {
+
+ private static final long serialVersionUID = 42;
+
+ @Override
+ public LongCounter clone() {
+ throw new CustomException();
+ }
+ }
+
+ private static class CustomException extends RuntimeException {
+ private static final long serialVersionUID = 42;
+ }
+
+ /* testInvalidTypeAccumulator */
+
+ private static class IncompatibleAccumulatorTypesMapper extends RichMapFunction<Long, Long> {
+
+ private static final long serialVersionUID = 42;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ getRuntimeContext().addAccumulator("test", new LongCounter());
+ }
+
+ @Override
+ public Long map(Long value) throws Exception {
+ return -1L;
+ }
+ }
+
+ private static class IncompatibleAccumulatorTypesMapper2 extends RichMapFunction<Long, Long> {
+
+ private static final long serialVersionUID = 42;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ getRuntimeContext().addAccumulator("test", new DoubleCounter());
+ }
+
+ @Override
+ public Long map(Long value) throws Exception {
+ return -1L;
+ }
+ }
+
+}