You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:13:08 UTC
[44/63] [abbrv] git commit: More graceful failing/errors/logging when
canceling in early job stages
More graceful failing/errors/logging when canceling in early job stages
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ae57c7c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ae57c7c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ae57c7c0
Branch: refs/heads/master
Commit: ae57c7c03dafcbbf728947ee453d29bdf42ee6bc
Parents: 9187175
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 15 02:43:18 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:49 2014 +0200
----------------------------------------------------------------------
.../runtime/execution/RuntimeEnvironment.java | 64 ++---
.../runtime/executiongraph/ExecutionGraph.java | 8 +-
.../runtime/executiongraph/ExecutionVertex.java | 10 +-
.../flink/runtime/jobmanager/JobManager.java | 20 +-
.../flink/runtime/taskmanager/TaskManager.java | 5 +-
.../jobmanager/ExceptionOutputFormat.java | 55 ----
.../flink/runtime/jobmanager/ExceptionTask.java | 70 -----
.../runtime/jobmanager/JobManagerITCase.java | 271 +++++++++++++++++--
.../jobmanager/tasks/BlockingNoOpInvokable.java | 38 +++
.../src/test/resources/logback-test.xml | 1 +
flink-runtime/src/test/resources/topology.txt | 16 --
11 files changed, 351 insertions(+), 207 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index 79a4aaa..ade878f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -55,7 +55,6 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,9 +66,13 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
/** The log object used for debugging. */
private static final Logger LOG = LoggerFactory.getLogger(RuntimeEnvironment.class);
+ private static final ThreadGroup TASK_THREADS = new ThreadGroup("Task Threads");
+
/** The interval to sleep in case a communication channel is not yet entirely set up (in milliseconds). */
private static final int SLEEPINTERVAL = 100;
+
+
// --------------------------------------------------------------------------------------------
/** The task that owns this environment */
@@ -235,33 +238,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
if (this.owner.isCanceled()) {
throw new CancelTaskException();
}
- }
- catch (Throwable t) {
- if (!this.owner.isCanceled()) {
-
- // Perform clean up when the task failed and has been not canceled by the user
- try {
- this.invokable.cancel();
- } catch (Throwable t2) {
- LOG.error(StringUtils.stringifyException(t2));
- }
- }
-
- // Release all resources that may currently be allocated by the individual channels
- releaseAllChannelResources();
-
- if (this.owner.isCanceled() || t instanceof CancelTaskException) {
- this.owner.cancelingDone();
- }
- else {
- this.owner.markFailed(t);
- }
-
- return;
- }
-
- try {
// If there is any unclosed input gate, close it and propagate close operation to corresponding output gate
closeInputGates();
@@ -273,9 +250,28 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
// Now we wait until all output channels have written out their data and are closed
waitForOutputChannelsToBeClosed();
+
+ if (this.owner.isCanceled()) {
+ throw new CancelTaskException();
+ }
+
+ // Finally, switch execution state to FINISHED and report to job manager
+ if (!owner.markAsFinished()) {
+ throw new Exception("Could notify job manager that the task is finished.");
+ }
}
catch (Throwable t) {
+ if (!this.owner.isCanceled()) {
+
+ // Perform clean up when the task failed and has been not canceled by the user
+ try {
+ this.invokable.cancel();
+ } catch (Throwable t2) {
+ LOG.error("Error while canceling the task", t2);
+ }
+ }
+
// Release all resources that may currently be allocated by the individual channels
releaseAllChannelResources();
@@ -285,16 +281,10 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
else {
this.owner.markFailed(t);
}
-
- return;
}
-
- // Release all resources that may currently be allocated by the individual channels
- releaseAllChannelResources();
-
- // Finally, switch execution state to FINISHED and report to job manager
- if (!owner.markAsFinished()) {
- owner.markFailed(new Exception());
+ finally {
+ // Release all resources that may currently be allocated by the individual channels
+ releaseAllChannelResources();
}
}
@@ -373,7 +363,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
if (this.executingThread == null) {
String name = owner.getTaskNameWithSubtasks();
- this.executingThread = new Thread(this, name);
+ this.executingThread = new Thread(TASK_THREADS, this, name);
}
return this.executingThread;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3dab13e..d916f74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -334,14 +334,18 @@ public class ExecutionGraph {
}
}
- public void waitForJobEnd() throws InterruptedException {
+ public void waitForJobEnd(long timeout) throws InterruptedException {
synchronized (progressLock) {
while (nextVertexToFinish < verticesInCreationOrder.size()) {
- progressLock.wait();
+ progressLock.wait(timeout);
}
}
}
+ public void waitForJobEnd() throws InterruptedException {
+ waitForJobEnd(0);
+ }
+
private boolean transitionState(JobStatus current, JobStatus newState) {
return transitionState(current, newState, null);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 3c65f2e..fcd21af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -274,10 +274,12 @@ public class ExecutionVertex {
ExecutionEdge[] sources = inputEdges[i];
if (sources != null) {
for (int k = 0; k < sources.length; k++) {
- Instance source = sources[k].getSource().getProducer().getCurrentAssignedResource().getInstance();
- locations.add(source);
- if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
- return null;
+ AllocatedSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource();
+ if (sourceSlot != null) {
+ locations.add(sourceSlot.getInstance());
+ if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
+ return null;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 3526e15..113f8fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -272,6 +272,8 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
@Override
public JobSubmissionResult submitJob(JobGraph job) throws IOException {
+
+ ExecutionGraph executionGraph = null;
boolean success = false;
try {
@@ -285,7 +287,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
}
// get the existing execution graph (if we attach), or construct a new empty one to attach
- ExecutionGraph executionGraph = this.currentJobs.get(job.getJobID());
+ executionGraph = this.currentJobs.get(job.getJobID());
if (executionGraph == null) {
if (LOG.isInfoEnabled()) {
LOG.info("Creating new execution graph for job " + job.getJobID() + " (" + job.getName() + ')');
@@ -331,7 +333,9 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
}
}
catch (FileNotFoundException e) {
- LOG.error("File-not-Found: " + e.getMessage());
+ String message = "File-not-Found: " + e.getMessage();
+ LOG.error(message);
+ executionGraph.fail(e);
return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, e.getMessage());
}
@@ -373,10 +377,22 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
}
catch (Throwable t) {
LOG.error("Job submission failed.", t);
+ executionGraph.fail(t);
return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(t));
}
finally {
if (!success) {
+ if (executionGraph != null) {
+ if (executionGraph.getState() != JobStatus.FAILING && executionGraph.getState() != JobStatus.FAILED) {
+ executionGraph.fail(new Exception("Could not set up and start execution graph on JobManager"));
+ }
+ try {
+ executionGraph.waitForJobEnd(10000);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for job to finish canceling.");
+ }
+ }
+
this.currentJobs.remove(job.getJobID());
try {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index e8f8b72..1fd5a71 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -667,9 +667,10 @@ public class TaskManager implements TaskOperationProtocol {
// Unregister task from library cache manager
try {
LibraryCacheManager.unregister(task.getJobID());
- } catch (IOException e) {
+ }
+ catch (Throwable t) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Unregistering the execution " + executionId + " caused an IOException");
+ LOG.debug("Unregistering the cached libraries caused an exception: ", t);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ExceptionOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ExceptionOutputFormat.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ExceptionOutputFormat.java
deleted file mode 100644
index 616eaf4..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ExceptionOutputFormat.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.io.InitializeOnMaster;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.StringRecord;
-
-
-public class ExceptionOutputFormat implements OutputFormat<StringRecord>, InitializeOnMaster {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * The message which is used for the test runtime exception.
- */
- public static final String RUNTIME_EXCEPTION_MESSAGE = "This is a test runtime exception";
-
- @Override
- public void configure(Configuration parameters) {}
-
- @Override
- public void open(int taskNumber, int numTasks) {}
-
- @Override
- public void writeRecord(StringRecord record) {}
-
- @Override
- public void close() {}
-
- @Override
- public void initializeGlobal(int parallelism) throws IOException {
- throw new RuntimeException(RUNTIME_EXCEPTION_MESSAGE);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ExceptionTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ExceptionTask.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ExceptionTask.java
deleted file mode 100644
index 7a0f9a5..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ExceptionTask.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.io.network.api.RecordReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-/**
- * This task is used during the unit tests to generate a custom exception and check the proper response of the execution
- * engine.
- */
-public class ExceptionTask extends AbstractInvokable {
-
- /**
- * The test error message included in the thrown exception
- */
- public static final String ERROR_MESSAGE = "This is an expected test exception";
-
- /**
- * The custom exception thrown by the this task.
- *
- */
- public static class TestException extends Exception {
-
- /**
- * The generated serial version UID.
- */
- private static final long serialVersionUID = -974961143742490972L;
-
- /**
- * Constructs a new test exception.
- *
- * @param msg
- * the error message
- */
- public TestException(String msg) {
- super(msg);
- }
- }
-
- @Override
- public void registerInputOutput() {
- new RecordReader<StringRecord>(this, StringRecord.class);
- new RecordWriter<StringRecord>(this);
- }
-
- @Override
- public void invoke() throws Exception {
- throw new TestException(ERROR_MESSAGE);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index 8e87e7b..f661ea0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -32,19 +32,19 @@ import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.client.AbstractJobResult;
import org.apache.flink.runtime.client.JobSubmissionResult;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
import org.apache.flink.runtime.io.network.api.RecordReader;
import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.tasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.jobmanager.tasks.NoOpInvokable;
import org.apache.flink.runtime.types.IntegerRecord;
-import org.apache.flink.util.StringUtils;
-
import org.junit.Test;
/**
@@ -53,6 +53,77 @@ import org.junit.Test;
public class JobManagerITCase {
@Test
+ public void testScheduleNotEnoughSlots() {
+
+ try {
+ final AbstractJobVertex vertex = new AbstractJobVertex("Test Vertex");
+ vertex.setParallelism(2);
+ vertex.setInvokableClass(BlockingNoOpInvokable.class);
+
+ final JobGraph jobGraph = new JobGraph("Test Job", vertex);
+
+ final JobManager jm = startJobManager(1);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
+ try {
+
+ assertEquals(1, jm.getAvailableSlots());
+
+ // we need to register the job at the library cache manager (with no libraries)
+ LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+ assertEquals(AbstractJobResult.ReturnCode.ERROR, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+
+ long deadline = System.currentTimeMillis() + 60*1000;
+ boolean success = false;
+
+ while (System.currentTimeMillis() < deadline) {
+ JobStatus state = eg.getState();
+ if (state == JobStatus.FINISHED) {
+ success = true;
+ break;
+ }
+ else if (state == JobStatus.FAILED || state == JobStatus.CANCELED) {
+ break;
+ }
+ else {
+ Thread.sleep(200);
+ }
+ }
+
+ assertTrue("The job did not finish successfully.", success);
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
+ }
+ else {
+ // already done, that was fast;
+ }
+
+
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
+ }
+ finally {
+ jm.shutdown();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
public void testSingleVertexJobImmediately() {
final int NUM_TASKS = 133;
@@ -64,7 +135,11 @@ public class JobManagerITCase {
final JobGraph jobGraph = new JobGraph("Test Job", vertex);
- JobManager jm = startJobManager(NUM_TASKS);
+ final JobManager jm = startJobManager(NUM_TASKS);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
try {
assertEquals(NUM_TASKS, jm.getAvailableSlots());
@@ -106,6 +181,12 @@ public class JobManagerITCase {
else {
// already done, that was fast;
}
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
}
finally {
jm.shutdown();
@@ -130,7 +211,11 @@ public class JobManagerITCase {
final JobGraph jobGraph = new JobGraph("Test Job", vertex);
jobGraph.setAllowQueuedScheduling(true);
- JobManager jm = startJobManager(10);
+ final JobManager jm = startJobManager(10);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
try {
// we need to register the job at the library cache manager (with no libraries)
@@ -150,6 +235,12 @@ public class JobManagerITCase {
else {
// already done, that was fast;
}
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
}
finally {
jm.shutdown();
@@ -180,7 +271,11 @@ public class JobManagerITCase {
final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
- JobManager jm = startJobManager(2 * NUM_TASKS);
+ final JobManager jm = startJobManager(2 * NUM_TASKS);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
try {
// we need to register the job at the library cache manager (with no libraries)
LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
@@ -199,6 +294,11 @@ public class JobManagerITCase {
else {
// already done, that was fast;
}
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(0, eg.getRegisteredExecutions().size());
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
}
finally {
jm.shutdown();
@@ -229,7 +329,11 @@ public class JobManagerITCase {
final JobGraph jobGraph = new JobGraph("Bipartite Job", sender, receiver);
- JobManager jm = startJobManager(2 * NUM_TASKS);
+ final JobManager jm = startJobManager(2 * NUM_TASKS);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
try {
// we need to register the job at the library cache manager (with no libraries)
LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
@@ -248,6 +352,12 @@ public class JobManagerITCase {
else {
// already done, that was fast;
}
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
}
finally {
jm.shutdown();
@@ -260,9 +370,9 @@ public class JobManagerITCase {
}
@Test
- public void testTwoInputJob() {
+ public void testTwoInputJobFailingEdgeMismatch() {
- final int NUM_TASKS = 13;
+ final int NUM_TASKS = 2;
try {
final AbstractJobVertex sender1 = new AbstractJobVertex("Sender1");
@@ -274,6 +384,68 @@ public class JobManagerITCase {
receiver.setInvokableClass(AgnosticReceiver.class);
sender1.setParallelism(NUM_TASKS);
+ sender2.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+
+ receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE);
+ receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE);
+
+ final JobGraph jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2);
+
+ final JobManager jm = startJobManager(3 * NUM_TASKS);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
+ try {
+ // we need to register the job at the library cache manager (with no libraries)
+ LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FAILED, eg.getState());
+ }
+ else {
+ // already done, that was fast;
+ }
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(0, eg.getRegisteredExecutions().size());
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
+ }
+ finally {
+ jm.shutdown();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTwoInputJob() {
+
+ final int NUM_TASKS = 11;
+
+ try {
+ final AbstractJobVertex sender1 = new AbstractJobVertex("Sender1");
+ final AbstractJobVertex sender2 = new AbstractJobVertex("Sender2");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+
+ sender1.setInvokableClass(Sender.class);
+ sender2.setInvokableClass(Sender.class);
+ receiver.setInvokableClass(AgnosticBinaryReceiver.class);
+
+ sender1.setParallelism(NUM_TASKS);
sender2.setParallelism(2*NUM_TASKS);
receiver.setParallelism(3*NUM_TASKS);
@@ -283,6 +455,10 @@ public class JobManagerITCase {
final JobGraph jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2);
JobManager jm = startJobManager(6 * NUM_TASKS);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
try {
// we need to register the job at the library cache manager (with no libraries)
LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
@@ -296,11 +472,17 @@ public class JobManagerITCase {
if (eg != null) {
eg.waitForJobEnd();
- assertEquals(JobStatus.FAILED, eg.getState());
+ assertEquals(JobStatus.FINISHED, eg.getState());
}
else {
// already done, that was fast;
}
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
}
finally {
jm.shutdown();
@@ -331,7 +513,11 @@ public class JobManagerITCase {
final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
- JobManager jm = startJobManager(NUM_TASKS);
+ final JobManager jm = startJobManager(NUM_TASKS);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
try {
assertEquals(NUM_TASKS, jm.getAvailableSlots());
@@ -354,6 +540,10 @@ public class JobManagerITCase {
}
assertEquals(0, eg.getRegisteredExecutions().size());
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
}
finally {
jm.shutdown();
@@ -384,7 +574,11 @@ public class JobManagerITCase {
final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
- JobManager jm = startJobManager(NUM_TASKS);
+ final JobManager jm = startJobManager(NUM_TASKS);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
try {
assertEquals(NUM_TASKS, jm.getAvailableSlots());
@@ -407,6 +601,10 @@ public class JobManagerITCase {
}
assertEquals(0, eg.getRegisteredExecutions().size());
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
}
finally {
jm.shutdown();
@@ -437,7 +635,11 @@ public class JobManagerITCase {
final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
- JobManager jm = startJobManager(2 * NUM_TASKS);
+ final JobManager jm = startJobManager(2 * NUM_TASKS);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
try {
// we need to register the job at the library cache manager (with no libraries)
@@ -459,6 +661,10 @@ public class JobManagerITCase {
}
assertEquals(0, eg.getRegisteredExecutions().size());
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
}
finally {
jm.shutdown();
@@ -492,7 +698,11 @@ public class JobManagerITCase {
final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
- JobManager jm = startJobManager(NUM_TASKS);
+ final JobManager jm = startJobManager(NUM_TASKS);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
try {
assertEquals(NUM_TASKS, jm.getAvailableSlots());
@@ -515,6 +725,10 @@ public class JobManagerITCase {
}
assertEquals(0, eg.getRegisteredExecutions().size());
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
}
finally {
jm.shutdown();
@@ -548,7 +762,11 @@ public class JobManagerITCase {
final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
- JobManager jm = startJobManager(NUM_TASKS);
+ final JobManager jm = startJobManager(NUM_TASKS);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
try {
assertEquals(NUM_TASKS, jm.getAvailableSlots());
@@ -570,11 +788,11 @@ public class JobManagerITCase {
// already done, that was fast;
}
- for (Execution e : eg.getRegisteredExecutions().values()) {
- System.out.println(e + StringUtils.arrayAwareToString(e.getStateTimestamps()));
- }
-
assertEquals(0, eg.getRegisteredExecutions().size());
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
}
finally {
jm.shutdown();
@@ -629,6 +847,21 @@ public class JobManagerITCase {
throw new IOException("could not find free port");
}
+ private static void waitForTaskThreadsToBeTerminated() throws InterruptedException {
+ Thread[] threads = new Thread[Thread.activeCount()];
+ Thread.enumerate(threads);
+
+ for (Thread t : threads) {
+ if (t == null) {
+ continue;
+ }
+ ThreadGroup tg = t.getThreadGroup();
+ if (tg != null && tg.getName() != null && tg.getName().equals("Task Threads")) {
+ t.join();
+ }
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Simple test tasks
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingNoOpInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingNoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingNoOpInvokable.java
new file mode 100644
index 0000000..c8d1c98
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingNoOpInvokable.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * An invokable that does nothing.
+ */
+public class BlockingNoOpInvokable extends AbstractInvokable {
+
+ @Override
+ public void registerInputOutput() {}
+
+ @Override
+ public void invoke() throws Exception {
+ Object o = new Object();
+ synchronized (o) {
+ o.wait();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/logback-test.xml b/flink-runtime/src/test/resources/logback-test.xml
index 7fb3387..f817d4d 100644
--- a/flink-runtime/src/test/resources/logback-test.xml
+++ b/flink-runtime/src/test/resources/logback-test.xml
@@ -37,4 +37,5 @@
<logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>
<logger name="org.apache.flink.runtime.taskmanager.TaskManager" level="OFF"/>
<logger name="org.apache.flink.runtime.executiongraph.ExecutionGraph" level="OFF"/>
+ <logger name="org.apache.flink.runtime.jobmanager.EventCollector" level="OFF"/>
</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/test/resources/topology.txt
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/topology.txt b/flink-runtime/src/test/resources/topology.txt
deleted file mode 100644
index b199929..0000000
--- a/flink-runtime/src/test/resources/topology.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-/mainswitch1/rackswitch1/node01
-/mainswitch1/rackswitch1/node02
-/mainswitch1/rackswitch1/node03
-/mainswitch1/rackswitch1/node04
-/mainswitch1/rackswitch2/node05
-/mainswitch1/rackswitch2/node06
-/mainswitch1/rackswitch2/node07
-/mainswitch1/rackswitch2/node08
-/mainswitch2/rackswitch3/node09
-/mainswitch2/rackswitch3/node10
-/mainswitch2/rackswitch3/node11
-/mainswitch2/rackswitch3/node12
-/mainswitch2/rackswitch4/node13
-/mainswitch2/rackswitch4/node14
-/mainswitch2/rackswitch4/node15
-/mainswitch2/rackswitch4/node16