You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:13:06 UTC

[42/63] [abbrv] Adjust ExecutionGraph state machine to TaskManager's failing model (direct transitions to canceled)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index fd9f10d..e8f8b72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -69,6 +69,7 @@ import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileRespon
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheUpdate;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.Hardware;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -218,7 +219,10 @@ public class TaskManager implements TaskOperationProtocol {
 
 		// Start local RPC server, give it the number of threads as we have slots
 		try {
-			this.taskManagerServer = RPC.getServer(this, taskManagerBindAddress.getHostAddress(), ipcPort, numberOfSlots);
+			// some magic number for the handler threads
+			final int numHandlers = Math.min(numberOfSlots, 2*Hardware.getNumberCPUCores());
+			
+			this.taskManagerServer = RPC.getServer(this, taskManagerBindAddress.getHostAddress(), ipcPort, numHandlers);
 			this.taskManagerServer.start();
 		} catch (IOException e) {
 			LOG.error("Failed to start TaskManager server. " + e.getMessage(), e);
@@ -396,6 +400,8 @@ public class TaskManager implements TaskOperationProtocol {
 
 		LOG.info("Shutting down TaskManager");
 		
+		cancelAndClearEverything();
+		
 		// first, stop the heartbeat thread and wait for it to terminate
 		this.heartbeatThread.interrupt();
 		try {
@@ -540,7 +546,13 @@ public class TaskManager implements TaskOperationProtocol {
 		final int taskIndex = tdd.getIndexInSubtaskGroup();
 		final int numSubtasks = tdd.getCurrentNumberOfSubtasks();
 		
+		boolean jarsRegistered = false;
+		
 		try {
+			// library and classloader issues first
+			LibraryCacheManager.register(jobID, tdd.getRequiredJarFiles());
+			jarsRegistered = true;
+			
 			final ClassLoader userCodeClassLoader = LibraryCacheManager.getClassLoader(jobID);
 			if (userCodeClassLoader == null) {
 				throw new Exception("No user code ClassLoader available.");
@@ -578,11 +590,17 @@ public class TaskManager implements TaskOperationProtocol {
 					cpTasks.put(e.getKey(), cp);
 				}
 				env.addCopyTasksForCacheFile(cpTasks);
-			
+				
 				if (!task.startExecution()) {
 					throw new Exception("Cannot start task. Task was canceled or failed.");
 				}
 			
+				// final check that we can go (we do this after the registration, so the the "happen's before"
+				// relationship ensures that either the shutdown removes this task, or we are aware of the shutdown
+				if (shutdownStarted.get()) {
+					throw new Exception("Task Manager is shut down.");
+				}
+				
 				success = true;
 				return new TaskOperationResult(executionId, true);
 			}
@@ -590,6 +608,7 @@ public class TaskManager implements TaskOperationProtocol {
 				if (!success) {
 					// remove task 
 					this.runningTasks.remove(executionId);
+					
 					// delete distributed cache files
 					for (Entry<String, DistributedCacheEntry> e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration())) {
 						this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), jobID);
@@ -600,11 +619,13 @@ public class TaskManager implements TaskOperationProtocol {
 		catch (Throwable t) {
 			LOG.error("Could not instantiate task", t);
 			
-			try {
-				LibraryCacheManager.unregister(jobID);
-			} catch (IOException e) {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Unregistering the execution " + executionId + " caused an IOException");
+			if (jarsRegistered) {
+				try {
+					LibraryCacheManager.unregister(jobID);
+				} catch (IOException e) {
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Unregistering the execution " + executionId + " caused an IOException");
+					}
 				}
 			}
 			
@@ -623,7 +644,9 @@ public class TaskManager implements TaskOperationProtocol {
 		// Task de-registration must be atomic
 		final Task task = this.runningTasks.remove(executionId);
 		if (task == null) {
-			LOG.error("Cannot find task with ID " + executionId + " to unregister");
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Cannot find task with ID " + executionId + " to unregister");
+			}
 			return;
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
index 91d9973..8e276e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
@@ -58,7 +58,9 @@ public class ExecutorThreadFactory implements ThreadFactory {
 
 		@Override
 		public void uncaughtException(Thread t, Throwable e) {
-			LOG.error("Thread '" + t.getName() + "' produced an uncaught exception.", e);
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Thread '" + t.getName() + "' produced an uncaught exception.", e);
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index ac76623..41ca7da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.operators.RegularPactTask;
 import org.apache.flink.runtime.protocols.TaskOperationProtocol;
 import org.apache.flink.runtime.taskmanager.TaskOperationResult;
@@ -257,7 +257,7 @@ public class ExecutionGraphDeploymentTest {
 			}
 		});
 		
-		DefaultScheduler scheduler = new DefaultScheduler();
+		Scheduler scheduler = new Scheduler();
 		for (int i = 0; i < dop1 + dop2; i++) {
 			scheduler.newInstanceAvailable(getInstance(taskManager));
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index ce1ab30..a351209 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.protocols.TaskOperationProtocol;
 import org.apache.flink.runtime.taskmanager.TaskOperationResult;
 
@@ -417,6 +417,45 @@ public class ExecutionVertexCancelTest {
 		}
 	}
 	
+	@Test
+	public void testSendCancelAndReceiveFail() {
+		
+		try {
+			final JobVertexID jid = new JobVertexID();
+			final ExecutionJobVertex ejv = getJobVertexNotExecuting(jid);
+			
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
+
+			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+			when(taskManager.cancelTask(execId)).thenThrow(new IOException("RPC call failed"));
+			
+			Instance instance = getInstance(taskManager);
+			AllocatedSlot slot = instance.allocateSlot(new JobID());
+
+			setVertexState(vertex, ExecutionState.RUNNING);
+			setVertexResource(vertex, slot);
+			
+			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+			
+			vertex.cancel();
+			
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+			
+			vertex.getCurrentExecutionAttempt().markFailed(new Throwable("test"));
+			
+			assertTrue(vertex.getExecutionState() == ExecutionState.CANCELED || vertex.getExecutionState() == ExecutionState.FAILED);
+			
+			assertTrue(slot.isReleased());
+			
+			assertEquals(0, vertex.getExecutionGraph().getRegisteredExecutions().size());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  Actions after a vertex has been canceled or while canceling
 	// --------------------------------------------------------------------------------------------
@@ -436,7 +475,7 @@ public class ExecutionVertexCancelTest {
 			// scheduling after being created should be tolerated (no exception) because
 			// it can occur as the result of races
 			{
-				DefaultScheduler scheduler = mock(DefaultScheduler.class);
+				Scheduler scheduler = mock(Scheduler.class);
 				vertex.scheduleForExecution(scheduler, false);
 				
 				assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
@@ -475,7 +514,7 @@ public class ExecutionVertexCancelTest {
 				ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
 				setVertexState(vertex, ExecutionState.CANCELING);
 				
-				DefaultScheduler scheduler = mock(DefaultScheduler.class);
+				Scheduler scheduler = mock(Scheduler.class);
 				vertex.scheduleForExecution(scheduler, false);
 				fail("Method should throw an exception");
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 43d6547..f59b326 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.protocols.TaskOperationProtocol;
@@ -54,7 +54,7 @@ public class ExecutionVertexSchedulingTest {
 			final ExecutionJobVertex ejv = getJobVertexNotExecuting(new JobVertexID());
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
 			
-			DefaultScheduler scheduler = mock(DefaultScheduler.class);
+			Scheduler scheduler = mock(Scheduler.class);
 			when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot);
 			
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -89,7 +89,7 @@ public class ExecutionVertexSchedulingTest {
 			final ExecutionJobVertex ejv = getJobVertexNotExecuting(new JobVertexID());
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
 			
-			DefaultScheduler scheduler = mock(DefaultScheduler.class);
+			Scheduler scheduler = mock(Scheduler.class);
 			when(scheduler.scheduleQueued(Matchers.any(ScheduledUnit.class))).thenReturn(future);
 			
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -124,7 +124,7 @@ public class ExecutionVertexSchedulingTest {
 			final ExecutionJobVertex ejv = getJobVertexNotExecuting(new JobVertexID());
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
 			
-			DefaultScheduler scheduler = mock(DefaultScheduler.class);
+			Scheduler scheduler = mock(Scheduler.class);
 			when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot);
 			
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index c7e3463..8e87e7b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -32,35 +32,51 @@ import org.apache.flink.runtime.ExecutionMode;
 import org.apache.flink.runtime.client.AbstractJobResult;
 import org.apache.flink.runtime.client.JobSubmissionResult;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.tasks.NoOpInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+import org.apache.flink.util.StringUtils;
+
 import org.junit.Test;
 
 /**
  * This test is intended to cover the basic functionality of the {@link JobManager}.
  */
 public class JobManagerITCase {
-
+	
 	@Test
-	public void testSingleVertexJob() {
+	public void testSingleVertexJobImmediately() {
+		
+		final int NUM_TASKS = 133;
+		
 		try {
 			final AbstractJobVertex vertex = new AbstractJobVertex("Test Vertex");
-			vertex.setParallelism(3);
+			vertex.setParallelism(NUM_TASKS);
 			vertex.setInvokableClass(NoOpInvokable.class);
 			
 			final JobGraph jobGraph = new JobGraph("Test Job", vertex);
 			
-			JobManager jm = startJobManager(3);
+			JobManager jm = startJobManager(NUM_TASKS);
 			try {
 				
+				assertEquals(NUM_TASKS, jm.getAvailableSlots());
+				
 				// we need to register the job at the library cache manager (with no libraries)
 				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
 				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 				
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
 				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
 				
 				// monitor the execution
@@ -77,7 +93,6 @@ public class JobManagerITCase {
 							success = true;
 							break;
 						}
-						
 						else if (state == JobStatus.FAILED || state == JobStatus.CANCELED) {
 							break;
 						}
@@ -102,6 +117,475 @@ public class JobManagerITCase {
 		}
 	}
 	
+	@Test
+	public void testSingleVertexJobQueued() {
+		
+		final int NUM_TASKS = 111;
+		
+		try {
+			final AbstractJobVertex vertex = new AbstractJobVertex("Test Vertex");
+			vertex.setParallelism(NUM_TASKS);
+			vertex.setInvokableClass(NoOpInvokable.class);
+			
+			final JobGraph jobGraph = new JobGraph("Test Job", vertex);
+			jobGraph.setAllowQueuedScheduling(true);
+			
+			JobManager jm = startJobManager(10);
+			try {
+				
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+				
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+				
+				// monitor the execution
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				if (eg != null) {
+					eg.waitForJobEnd();
+					assertEquals(JobStatus.FINISHED, eg.getState());
+				}
+				else {
+					// already done, that was fast;
+				}
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testForwardJob() {
+		
+		final int NUM_TASKS = 31;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			
+			sender.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(Receiver.class);
+			
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+			
+			JobManager jm = startJobManager(2 * NUM_TASKS);
+			try {
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+				
+				// monitor the execution
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				if (eg != null) {
+					eg.waitForJobEnd();
+					assertEquals(JobStatus.FINISHED, eg.getState());
+				}
+				else {
+					// already done, that was fast;
+				}
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBipartiteJob() {
+		
+		final int NUM_TASKS = 31;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			
+			sender.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(AgnosticReceiver.class);
+			
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			final JobGraph jobGraph = new JobGraph("Bipartite Job", sender, receiver);
+			
+			JobManager jm = startJobManager(2 * NUM_TASKS);
+			try {
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+				
+				// monitor the execution
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				if (eg != null) {
+					eg.waitForJobEnd();
+					assertEquals(JobStatus.FINISHED, eg.getState());
+				}
+				else {
+					// already done, that was fast;
+				}
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTwoInputJob() {
+		
+		final int NUM_TASKS = 13;
+		
+		try {
+			final AbstractJobVertex sender1 = new AbstractJobVertex("Sender1");
+			final AbstractJobVertex sender2 = new AbstractJobVertex("Sender2");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			
+			sender1.setInvokableClass(Sender.class);
+			sender2.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(AgnosticReceiver.class);
+			
+			sender1.setParallelism(NUM_TASKS);
+			sender2.setParallelism(2*NUM_TASKS);
+			receiver.setParallelism(3*NUM_TASKS);
+			
+			receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE);
+			receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE);
+			
+			final JobGraph jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2);
+			
+			JobManager jm = startJobManager(6 * NUM_TASKS);
+			try {
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+				
+				// monitor the execution
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				if (eg != null) {
+					eg.waitForJobEnd();
+					assertEquals(JobStatus.FAILED, eg.getState());
+				}
+				else {
+					// already done, that was fast;
+				}
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testJobFailingSender() {
+		
+		final int NUM_TASKS = 100;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			
+			sender.setInvokableClass(ExceptionSender.class);
+			receiver.setInvokableClass(Receiver.class);
+			
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+			
+			JobManager jm = startJobManager(NUM_TASKS);
+			try {
+				assertEquals(NUM_TASKS, jm.getAvailableSlots());
+				
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+				
+				// monitor the execution
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				if (eg != null) {
+					eg.waitForJobEnd();
+					assertEquals(JobStatus.FAILED, eg.getState());
+				}
+				else {
+					// already done, that was fast;
+				}
+				
+				assertEquals(0, eg.getRegisteredExecutions().size());
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testJobSometimesFailingSender() {
+		
+		final int NUM_TASKS = 100;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			
+			sender.setInvokableClass(SometimesExceptionSender.class);
+			receiver.setInvokableClass(Receiver.class);
+			
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+			
+			JobManager jm = startJobManager(NUM_TASKS);
+			try {
+				assertEquals(NUM_TASKS, jm.getAvailableSlots());
+				
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+				
+				// monitor the execution
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				if (eg != null) {
+					eg.waitForJobEnd();
+					assertEquals(JobStatus.FAILED, eg.getState());
+				}
+				else {
+					// already done, that was fast;
+				}
+				
+				assertEquals(0, eg.getRegisteredExecutions().size());
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testJobFailingReceiver() {
+		
+		final int NUM_TASKS = 200;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			
+			sender.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(ExceptionReceiver.class);
+			
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+			
+			JobManager jm = startJobManager(2 * NUM_TASKS);
+			try {
+				
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+				
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+				
+				// monitor the execution
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				if (eg != null) {
+					eg.waitForJobEnd();
+					assertEquals(JobStatus.FAILED, eg.getState());
+				}
+				else {
+					// already done, that was fast;
+				}
+				
+				assertEquals(0, eg.getRegisteredExecutions().size());
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	/**
+	 * Test failure in instantiation, where all fail by themselves
+	 */
+	@Test
+	public void testJobFailingInstantiation() {
+		
+		final int NUM_TASKS = 200;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			
+			sender.setInvokableClass(InstantiationErrorSender.class);
+			receiver.setInvokableClass(Receiver.class);
+			
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+			
+			JobManager jm = startJobManager(NUM_TASKS);
+			try {
+				assertEquals(NUM_TASKS, jm.getAvailableSlots());
+				
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+				
+				// monitor the execution
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				if (eg != null) {
+					eg.waitForJobEnd();
+					assertEquals(JobStatus.FAILED, eg.getState());
+				}
+				else {
+					// already done, that was fast;
+				}
+				
+				assertEquals(0, eg.getRegisteredExecutions().size());
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	/**
+	 * Test failure in instantiation, where some have to be canceled (not all fail by themselves)
+	 */
+	@Test
+	public void testJobFailingSomeInstantiations() {
+		
+		final int NUM_TASKS = 200;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			
+			sender.setInvokableClass(SometimesInstantiationErrorSender.class);
+			receiver.setInvokableClass(Receiver.class);
+			
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+			
+			JobManager jm = startJobManager(NUM_TASKS);
+			try {
+				assertEquals(NUM_TASKS, jm.getAvailableSlots());
+				
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+				
+				// monitor the execution
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				if (eg != null) {
+					eg.waitForJobEnd();
+					assertEquals(JobStatus.FAILED, eg.getState());
+				}
+				else {
+					// already done, that was fast;
+				}
+				
+				for (Execution e : eg.getRegisteredExecutions().values()) {
+					System.out.println(e + StringUtils.arrayAwareToString(e.getStateTimestamps()));
+				}
+				
+				assertEquals(0, eg.getRegisteredExecutions().size());
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	
 	private static final JobManager startJobManager(int numSlots) throws Exception {
@@ -114,6 +598,17 @@ public class JobManagerITCase {
 		GlobalConfiguration.includeConfiguration(cfg);
 		
 		JobManager jm = new JobManager(ExecutionMode.LOCAL);
+		
+		// we need to wait until the taskmanager is registered
+		// max time is 5 seconds
+		long deadline = System.currentTimeMillis() + 5000;
+		
+		while (jm.getAvailableSlots() < numSlots && System.currentTimeMillis() < deadline) {
+			Thread.sleep(10);
+		}
+		
+		assertEquals(numSlots, jm.getAvailableSlots());
+		
 		return jm;
 	}
 	
@@ -133,4 +628,169 @@ public class JobManagerITCase {
 		
 		throw new IOException("could not find free port");
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Simple test tasks
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class Sender extends AbstractInvokable {
+
+		private RecordWriter<IntegerRecord> writer;
+		
+		@Override
+		public void registerInputOutput() {
+			writer = new RecordWriter<IntegerRecord>(this);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			writer.initializeSerializers();
+			writer.emit(new IntegerRecord(42));
+			writer.emit(new IntegerRecord(1337));
+			writer.flush();
+		}
+	}
+	
+	public static final class Receiver extends AbstractInvokable {
+
+		private RecordReader<IntegerRecord> reader;
+		
+		@Override
+		public void registerInputOutput() {
+			reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			IntegerRecord i1 = reader.next();
+			IntegerRecord i2 = reader.next();
+			IntegerRecord i3 = reader.next();
+			
+			if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) {
+				throw new Exception("Wrong Data Received");
+			}
+		}
+	}
+	
+	public static final class AgnosticReceiver extends AbstractInvokable {
+
+		private RecordReader<IntegerRecord> reader;
+		
+		@Override
+		public void registerInputOutput() {
+			reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			while (reader.next() != null);
+		}
+	}
+	
+	public static final class AgnosticBinaryReceiver extends AbstractInvokable {
+
+		private RecordReader<IntegerRecord> reader1;
+		private RecordReader<IntegerRecord> reader2;
+		
+		@Override
+		public void registerInputOutput() {
+			reader1 = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+			reader2 = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			while (reader1.next() != null);
+			while (reader2.next() != null);
+		}
+	}
+	
+	public static final class ExceptionSender extends AbstractInvokable {
+
+		private RecordWriter<IntegerRecord> writer;
+		
+		@Override
+		public void registerInputOutput() {
+			writer = new RecordWriter<IntegerRecord>(this);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			writer.initializeSerializers();
+			
+			throw new Exception("Test Exception");
+		}
+	}
+	
+	public static final class ExceptionReceiver extends AbstractInvokable {
+		
+		@Override
+		public void registerInputOutput() {
+			new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			throw new Exception("Expected Test Exception");
+		}
+	}
+	
+	public static final class SometimesExceptionSender extends AbstractInvokable {
+
+		private RecordWriter<IntegerRecord> writer;
+		
+		@Override
+		public void registerInputOutput() {
+			writer = new RecordWriter<IntegerRecord>(this);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			writer.initializeSerializers();
+			
+			if (Math.random() < 0.05) {
+				throw new Exception("Test Exception");
+			} else {
+				Object o = new Object();
+				synchronized (o) {
+					o.wait();
+				}
+			}
+		}
+	}
+	
+	public static final class InstantiationErrorSender extends AbstractInvokable {
+
+		public InstantiationErrorSender() {
+			throw new RuntimeException("Test Exception in Constructior");
+		}
+		
+		@Override
+		public void registerInputOutput() {}
+
+		@Override
+		public void invoke() {}
+	}
+	
+	public static final class SometimesInstantiationErrorSender extends AbstractInvokable {
+		
+		public SometimesInstantiationErrorSender() {
+			if (Math.random() < 0.05) {
+				throw new RuntimeException("Test Exception in Constructior");
+			}
+		}
+		
+		@Override
+		public void registerInputOutput() {
+			new RecordWriter<IntegerRecord>(this);
+		}
+		
+		@Override
+		public void invoke() throws Exception {
+			Object o = new Object();
+			synchronized (o) {
+				o.wait();
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index b092312..2892384 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -40,14 +40,14 @@ import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 
 /**
- * Tests for the {@link DefaultScheduler} when scheduling individual tasks.
+ * Tests for the {@link Scheduler} when scheduling individual tasks.
  */
 public class SchedulerIsolatedTasksTest {
 	
 	@Test
 	public void testAddAndRemoveInstance() {
 		try {
-			DefaultScheduler scheduler = new DefaultScheduler();
+			Scheduler scheduler = new Scheduler();
 			
 			Instance i1 = getRandomInstance(2);
 			Instance i2 = getRandomInstance(2);
@@ -111,7 +111,7 @@ public class SchedulerIsolatedTasksTest {
 	@Test
 	public void testScheduleImmediately() {
 		try {
-			DefaultScheduler scheduler = new DefaultScheduler();
+			Scheduler scheduler = new Scheduler();
 			assertEquals(0, scheduler.getNumberOfAvailableSlots());
 			
 			scheduler.newInstanceAvailable(getRandomInstance(2));
@@ -181,7 +181,7 @@ public class SchedulerIsolatedTasksTest {
 		final int NUM_TASKS_TO_SCHEDULE = 2000;
 		
 		try {
-			DefaultScheduler scheduler = new DefaultScheduler();
+			Scheduler scheduler = new Scheduler();
 			
 			for (int i = 0;i < NUM_INSTANCES; i++) {
 				scheduler.newInstanceAvailable(getRandomInstance((int) (Math.random() * NUM_SLOTS_PER_INSTANCE) + 1));
@@ -270,7 +270,7 @@ public class SchedulerIsolatedTasksTest {
 	@Test
 	public void testScheduleWithDyingInstances() {
 		try {
-			DefaultScheduler scheduler = new DefaultScheduler();
+			Scheduler scheduler = new Scheduler();
 			
 			Instance i1 = getRandomInstance(2);
 			Instance i2 = getRandomInstance(2);
@@ -330,7 +330,7 @@ public class SchedulerIsolatedTasksTest {
 	@Test
 	public void testSchedulingLocation() {
 		try {
-			DefaultScheduler scheduler = new DefaultScheduler();
+			Scheduler scheduler = new Scheduler();
 			
 			Instance i1 = getRandomInstance(2);
 			Instance i2 = getRandomInstance(2);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index afc0db9..c641524 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -42,7 +42,7 @@ public class SchedulerSlotSharingTest {
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1);
 			
-			DefaultScheduler scheduler = new DefaultScheduler();
+			Scheduler scheduler = new Scheduler();
 			Instance i1 = getRandomInstance(2);
 			Instance i2 = getRandomInstance(2);
 			scheduler.newInstanceAvailable(i1);
@@ -124,7 +124,7 @@ public class SchedulerSlotSharingTest {
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
 			
-			DefaultScheduler scheduler = new DefaultScheduler();
+			Scheduler scheduler = new Scheduler();
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
@@ -240,7 +240,7 @@ public class SchedulerSlotSharingTest {
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3);
 			
-			DefaultScheduler scheduler = new DefaultScheduler();
+			Scheduler scheduler = new Scheduler();
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
@@ -346,7 +346,7 @@ public class SchedulerSlotSharingTest {
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
 			
-			DefaultScheduler scheduler = new DefaultScheduler();
+			Scheduler scheduler = new Scheduler();
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 1 tasks from the first vertex group and 2 from the second
@@ -393,7 +393,7 @@ public class SchedulerSlotSharingTest {
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
 			
-			DefaultScheduler scheduler = new DefaultScheduler();
+			Scheduler scheduler = new Scheduler();
 			scheduler.newInstanceAvailable(getRandomInstance(3));
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
@@ -535,7 +535,7 @@ public class SchedulerSlotSharingTest {
 			Instance i1 = getRandomInstance(2);
 			Instance i2 = getRandomInstance(2);
 			
-			DefaultScheduler scheduler = new DefaultScheduler();
+			Scheduler scheduler = new Scheduler();
 			scheduler.newInstanceAvailable(i1);
 			scheduler.newInstanceAvailable(i2);
 			
@@ -583,7 +583,7 @@ public class SchedulerSlotSharingTest {
 			Instance i1 = getRandomInstance(2);
 			Instance i2 = getRandomInstance(2);
 			
-			DefaultScheduler scheduler = new DefaultScheduler();
+			Scheduler scheduler = new Scheduler();
 			scheduler.newInstanceAvailable(i1);
 			scheduler.newInstanceAvailable(i2);
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 13261d7..ba08a9f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -114,11 +114,6 @@ public class TaskManagerTest {
 					Collections.<GateDeploymentDescriptor>emptyList(),
 					new String[0], 0);
 			
-			LibraryCacheManager.register(jid1, new String[0]);
-			LibraryCacheManager.register(jid2, new String[0]);
-			assertNotNull(LibraryCacheManager.getClassLoader(jid1));
-			assertNotNull(LibraryCacheManager.getClassLoader(jid2));
-			
 			TaskOperationResult result1 = tm.submitTask(tdd1);
 			TaskOperationResult result2 = tm.submitTask(tdd2);
 			
@@ -196,10 +191,6 @@ public class TaskManagerTest {
 					Collections.<GateDeploymentDescriptor>emptyList(),
 					new String[0], 0);
 			
-			LibraryCacheManager.register(jid, new String[0]);
-			LibraryCacheManager.register(jid, new String[0]);
-			assertNotNull(LibraryCacheManager.getClassLoader(jid));
-			
 			assertFalse(tm.submitTask(tdd1).isSuccess());
 			assertFalse(tm.submitTask(tdd2).isSuccess());
 			
@@ -250,11 +241,6 @@ public class TaskManagerTest {
 					Collections.singletonList(new GateDeploymentDescriptor(Collections.singletonList(cdd))),
 					new String[0], 0);
 			
-			// register the job twice (for two tasks) at the lib cache
-			LibraryCacheManager.register(jid, new String[0]);
-			LibraryCacheManager.register(jid, new String[0]);
-			assertNotNull(LibraryCacheManager.getClassLoader(jid));
-			
 			// deploy sender before receiver, so the target is online when the sender requests the connection info
 			TaskOperationResult result2 = tm.submitTask(tdd2);
 			TaskOperationResult result1 = tm.submitTask(tdd1);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
index 205ad00..c6379eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
@@ -1,17 +1,20 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.flink.runtime.util;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
index 27fa540..e2312ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.util;
 
 import java.io.IOException;