You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/10 13:55:52 UTC

[2/3] flink git commit: [FLINK-2645] [jobmanager] Fail job execution if final accumulators cannot be merged and forward exceptions.

[FLINK-2645] [jobmanager] Fail job execution if final accumulators cannot be merged and forward exceptions.

This closes #1112


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/16fb4e91
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/16fb4e91
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/16fb4e91

Branch: refs/heads/master
Commit: 16fb4e919f4a76b8fe4910435b2183fa172f6e24
Parents: 9c2791b
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 9 13:49:40 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Sep 10 12:35:34 2015 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  15 +-
 .../executiongraph/ExecutionJobVertex.java      |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   8 +-
 .../accumulators/AccumulatorErrorITCase.java    | 193 +++++++++++++++++++
 4 files changed, 207 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/16fb4e91/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index a44fc82..1d8a37c 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -827,7 +827,7 @@ public class ExecutionGraph implements Serializable {
 		}
 	}
 
-	void jobVertexInFinalState(ExecutionJobVertex ev) {
+	void jobVertexInFinalState() {
 		synchronized (progressLock) {
 			if (numFinishedJobVertices >= verticesInCreationOrder.size()) {
 				throw new IllegalStateException("All vertices are already finished, cannot transition vertex to finished.");
@@ -927,19 +927,18 @@ public class ExecutionGraph implements Serializable {
 				case RUNNING:
 					return attempt.switchToRunning();
 				case FINISHED:
-					Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators = null;
-					Map<String, Accumulator<?, ?>> userAccumulators = null;
 					try {
 						AccumulatorSnapshot accumulators = state.getAccumulators();
-						flinkAccumulators = accumulators.deserializeFlinkAccumulators();
-						userAccumulators = accumulators.deserializeUserAccumulators(userClassLoader);
+						Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators =
+							accumulators.deserializeFlinkAccumulators();
+						Map<String, Accumulator<?, ?>> userAccumulators =
+							accumulators.deserializeUserAccumulators(userClassLoader);
+						attempt.markFinished(flinkAccumulators, userAccumulators);
 					}
 					catch (Exception e) {
-						// we do not fail the job on deserialization problems of accumulators, but only log
 						LOG.error("Failed to deserialize final accumulator results.", e);
+						attempt.markFailed(e);
 					}
-
-					attempt.markFinished(flinkAccumulators, userAccumulators);
 					return true;
 				case CANCELED:
 					attempt.cancelingComplete();

http://git-wip-us.apache.org/repos/asf/flink/blob/16fb4e91/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index dea619a..999ca1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -482,7 +482,7 @@ public class ExecutionJobVertex implements Serializable {
 					stateMonitor.notifyAll();
 					
 					// tell the graph
-					graph.jobVertexInFinalState(this);
+					graph.jobVertexInFinalState();
 				} else {
 					numSubtasksInFinalState++;
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/16fb4e91/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 18c453f..27fc6e3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -384,10 +384,14 @@ class JobManager(
               newJobStatus match {
                 case JobStatus.FINISHED =>
                   val accumulatorResults: java.util.Map[String, SerializedValue[AnyRef]] = try {
-                  executionGraph.getAccumulatorsSerialized()
+                    executionGraph.getAccumulatorsSerialized()
                   } catch {
                     case e: Exception =>
-                      log.error(s"Cannot fetch serialized accumulators for job $jobID", e)
+                      log.error(s"Cannot fetch final accumulators for job $jobID", e)
+                      val exception = new JobExecutionException(jobID,
+                        "Failed to retrieve accumulator results.", e)
+                      jobInfo.client ! decorateMessage(JobResultFailure(
+                        new SerializedThrowable(exception)))
                       Collections.emptyMap()
                   }
                 val result = new SerializedJobExecutionResult(

http://git-wip-us.apache.org/repos/asf/flink/blob/16fb4e91/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
new file mode 100644
index 0000000..cac8451
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.accumulators;
+
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests cases where Accumulator are
+ *  a) throw errors during runtime
+ *  b) is not compatible with existing accumulator
+ */
+public class AccumulatorErrorITCase {
+
+	private static ForkableFlinkMiniCluster cluster;
+
+	@BeforeClass
+	public static void startCluster() {
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
+			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+			cluster = new ForkableFlinkMiniCluster(config, false);
+
+			cluster.start();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Failed to start test cluster: " + e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void shutdownCluster() {
+		try {
+			cluster.shutdown();
+			cluster = null;
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Failed to stop test cluster: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testFaultyAccumulator() throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+		env.getConfig().disableSysoutLogging();
+
+		// Test Exception forwarding with faulty Accumulator implementation
+		DataSet<Long> input = env.generateSequence(0, 10000);
+
+		DataSet<Long> map = input.map(new FaultyAccumulatorUsingMapper());
+
+		map.output(new DiscardingOutputFormat<Long>());
+
+		try {
+			env.execute();
+			fail("Should have failed.");
+		} catch (ProgramInvocationException e) {
+			Assert.assertTrue("Exception should be passed:",
+					e.getCause() instanceof JobExecutionException);
+			Assert.assertTrue("Root cause should be:",
+					e.getCause().getCause() instanceof CustomException);
+		}
+	}
+
+
+	@Test
+	public void testInvalidTypeAccumulator() throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+		env.getConfig().disableSysoutLogging();
+
+		// Test Exception forwarding with faulty Accumulator implementation
+		DataSet<Long> input = env.generateSequence(0, 10000);
+
+		DataSet<Long> mappers = input.map(new IncompatibleAccumulatorTypesMapper())
+				.map(new IncompatibleAccumulatorTypesMapper2());
+
+		mappers.output(new DiscardingOutputFormat<Long>());
+
+		try {
+			env.execute();
+			fail("Should have failed.");
+		} catch (ProgramInvocationException e) {
+			Assert.assertTrue("Exception should be passed:",
+					e.getCause() instanceof JobExecutionException);
+			Assert.assertTrue("Root cause should be:",
+					e.getCause().getCause() instanceof Exception);
+			Assert.assertTrue("Root cause should be:",
+					e.getCause().getCause().getCause() instanceof UnsupportedOperationException);
+		}
+	}
+
+	/* testFaultyAccumulator */
+
+	private static class FaultyAccumulatorUsingMapper extends RichMapFunction<Long, Long> {
+
+		private static final long serialVersionUID = 42;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			getRuntimeContext().addAccumulator("test", new FaultyAccumulator());
+		}
+
+		@Override
+		public Long map(Long value) throws Exception {
+			return -1L;
+		}
+	}
+
+	private static class FaultyAccumulator extends LongCounter {
+
+		private static final long serialVersionUID = 42;
+
+		@Override
+		public LongCounter clone() {
+			throw new CustomException();
+		}
+	}
+
+	private static class CustomException extends RuntimeException {
+		private static final long serialVersionUID = 42;
+	}
+
+	/* testInvalidTypeAccumulator */
+
+	private static class IncompatibleAccumulatorTypesMapper extends RichMapFunction<Long, Long> {
+
+		private static final long serialVersionUID = 42;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			getRuntimeContext().addAccumulator("test", new LongCounter());
+		}
+
+		@Override
+		public Long map(Long value) throws Exception {
+			return -1L;
+		}
+	}
+
+	private static class IncompatibleAccumulatorTypesMapper2 extends RichMapFunction<Long, Long> {
+
+		private static final long serialVersionUID = 42;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			getRuntimeContext().addAccumulator("test", new DoubleCounter());
+		}
+
+		@Override
+		public Long map(Long value) throws Exception {
+			return -1L;
+		}
+	}
+
+}