You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:27 UTC

[03/63] [abbrv] Refactor job graph construction to incremental attachment based

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index c28f946..768ac82 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -18,98 +18,218 @@
 
 package org.apache.flink.runtime.jobgraph;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import java.util.List;
 
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobTaskVertex;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-/**
- * This class contains tests related to the JobGraph
- */
 public class JobGraphTest {
 
-	public JobGraphTest() {
-	}
-
-	@BeforeClass
-	public static void setUpClass() throws Exception {
-	}
-
-	@AfterClass
-	public static void tearDownClass() throws Exception {
-	}
-
-	@Before
-	public void setUp() {
+	@Test
+	public void testSerialization() {
+		try {
+			JobGraph jg = new JobGraph("The graph");
+			
+			// add some configuration values
+			{
+				jg.getJobConfiguration().setString("some key", "some value");
+				jg.getJobConfiguration().setDouble("Life of ", Math.PI);
+			}
+			
+			// add some vertices
+			{
+				AbstractJobVertex source1 = new AbstractJobVertex("source1");
+				AbstractJobVertex source2 = new AbstractJobVertex("source2");
+				AbstractJobVertex target = new AbstractJobVertex("target");
+				target.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
+				target.connectNewDataSetAsInput(source2, DistributionPattern.BIPARTITE);
+				
+				jg.addVertex(source1);
+				jg.addVertex(source2);
+				jg.addVertex(target);
+			}
+			
+			// de-/serialize and compare
+			JobGraph copy = CommonTestUtils.createCopyWritable(jg);
+			
+			assertEquals(jg.getName(), copy.getName());
+			assertEquals(jg.getJobID(), copy.getJobID());
+			assertEquals(jg.getJobConfiguration(), copy.getJobConfiguration());
+			assertEquals(jg.getNumberOfVertices(), copy.getNumberOfVertices());
+			
+			for (AbstractJobVertex vertex : copy.getVertices()) {
+				AbstractJobVertex original = jg.findVertexByID(vertex.getID());
+				assertNotNull(original);
+				assertEquals(original.getName(), vertex.getName());
+				assertEquals(original.getNumberOfInputs(), vertex.getNumberOfInputs());
+				assertEquals(original.getNumberOfProducedIntermediateDataSets(), vertex.getNumberOfProducedIntermediateDataSets());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
-
-	@After
-	public void tearDown() {
+	
+	@Test
+	public void testTopologicalSort1() {
+		try {
+			AbstractJobVertex source1 = new AbstractJobVertex("source1");
+			AbstractJobVertex source2 = new AbstractJobVertex("source2");
+			AbstractJobVertex target1 = new AbstractJobVertex("target1");
+			AbstractJobVertex target2 = new AbstractJobVertex("target2");
+			AbstractJobVertex intermediate1 = new AbstractJobVertex("intermediate1");
+			AbstractJobVertex intermediate2 = new AbstractJobVertex("intermediate2");
+			
+			target1.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
+			target2.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
+			target2.connectNewDataSetAsInput(intermediate2, DistributionPattern.POINTWISE);
+			intermediate2.connectNewDataSetAsInput(intermediate1, DistributionPattern.POINTWISE);
+			intermediate1.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
+			
+			JobGraph graph = new JobGraph("TestGraph", source1, source2, intermediate1, intermediate2, target1, target2);
+			List<AbstractJobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
+			
+			assertBefore(source1, target1, sorted);
+			assertBefore(source1, target2, sorted);
+			assertBefore(source2, target2, sorted);
+			assertBefore(source2, intermediate1, sorted);
+			assertBefore(source2, intermediate2, sorted);
+			assertBefore(intermediate1, target2, sorted);
+			assertBefore(intermediate2, target2, sorted);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
-
+	
 	@Test
-	/**
-	 * This test ensures that the JobGraph edges are correctly set (forward/backward edges)
-	 */
-	public void testJobGraph() {
-		// check if the backward edge really points to the preceding vertex
-		final JobGraph jg = new JobGraph();
-
-		final JobTaskVertex v1 = new JobTaskVertex(jg);
-		final JobTaskVertex v2 = new JobTaskVertex(jg);
-
+	public void testTopologicalSort2() {
 		try {
-			v1.connectTo(v2);
-		} catch (JobGraphDefinitionException ex) {
-			Logger.getLogger(JobGraphTest.class.getName()).log(Level.SEVERE, null, ex);
+			AbstractJobVertex source1 = new AbstractJobVertex("source1");
+			AbstractJobVertex source2 = new AbstractJobVertex("source2");
+			AbstractJobVertex root = new AbstractJobVertex("root");
+			AbstractJobVertex l11 = new AbstractJobVertex("layer 1 - 1");
+			AbstractJobVertex l12 = new AbstractJobVertex("layer 1 - 2");
+			AbstractJobVertex l13 = new AbstractJobVertex("layer 1 - 3");
+			AbstractJobVertex l2 = new AbstractJobVertex("layer 2");
+			
+			root.connectNewDataSetAsInput(l13, DistributionPattern.POINTWISE);
+			root.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
+			root.connectNewDataSetAsInput(l2, DistributionPattern.POINTWISE);
+			
+			l2.connectNewDataSetAsInput(l11, DistributionPattern.POINTWISE);
+			l2.connectNewDataSetAsInput(l12, DistributionPattern.POINTWISE);
+			
+			l11.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
+			
+			l12.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
+			l12.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
+			
+			l13.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
+			
+			JobGraph graph = new JobGraph("TestGraph", source1, source2, root, l11, l13, l12, l2);
+			List<AbstractJobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
+			
+			assertBefore(source1, root, sorted);
+			assertBefore(source2, root, sorted);
+			assertBefore(l11, root, sorted);
+			assertBefore(l12, root, sorted);
+			assertBefore(l13, root, sorted);
+			assertBefore(l2, root, sorted);
+			
+			assertBefore(l11, l2, sorted);
+			assertBefore(l12, l2, sorted);
+			assertBefore(l2, root, sorted);
+			
+			assertBefore(source1, l2, sorted);
+			assertBefore(source2, l2, sorted);
+			
+			assertBefore(source2, l13, sorted);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
 		}
-
-		assertEquals(v1, v2.getBackwardConnection(0).getConnectedVertex());
-
 	}
-
-	/**
-	 * In this test we construct a job graph and set the dependency chain for instance sharing in a way that a cycle is
-	 * created. The test is considered successful if the cycle is detected.
-	 */
+	
 	@Test
-	public void detectCycleInInstanceSharingDependencyChain() {
-
-		final JobGraph jg = new JobGraph();
-
-		final JobTaskVertex v1 = new JobTaskVertex("v1", jg);
-		final JobTaskVertex v2 = new JobTaskVertex("v2", jg);
-		final JobTaskVertex v3 = new JobTaskVertex("v3", jg);
-		final JobTaskVertex v4 = new JobTaskVertex("v4", jg);
-
+	public void testTopoSortCyclicGraphNoSources() {
 		try {
-			v1.connectTo(v2);
-			v2.connectTo(v3);
-			v3.connectTo(v4);
-		} catch (JobGraphDefinitionException ex) {
-			Logger.getLogger(JobGraphTest.class.getName()).log(Level.SEVERE, null, ex);
+			AbstractJobVertex v1 = new AbstractJobVertex("1");
+			AbstractJobVertex v2 = new AbstractJobVertex("2");
+			AbstractJobVertex v3 = new AbstractJobVertex("3");
+			AbstractJobVertex v4 = new AbstractJobVertex("4");
+			
+			v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE);
+			v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+			v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE);
+			v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE);
+			
+			JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, v4);
+			try {
+				jg.getVerticesSortedTopologicallyFromSources();
+				fail("Failed to raise error on topologically sorting cyclic graph.");
+			}
+			catch (InvalidProgramException e) {
+				// that what we wanted
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTopoSortCyclicGraphIntermediateCycle() {
+		try{ 
+			AbstractJobVertex source = new AbstractJobVertex("source");
+			AbstractJobVertex v1 = new AbstractJobVertex("1");
+			AbstractJobVertex v2 = new AbstractJobVertex("2");
+			AbstractJobVertex v3 = new AbstractJobVertex("3");
+			AbstractJobVertex v4 = new AbstractJobVertex("4");
+			AbstractJobVertex target = new AbstractJobVertex("target");
+			
+			v1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
+			v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE);
+			v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+			v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE);
+			v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE);
+			target.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE);
+			
+			JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, v4, source, target);
+			try {
+				jg.getVerticesSortedTopologicallyFromSources();
+				fail("Failed to raise error on topologically sorting cyclic graph.");
+			}
+			catch (InvalidProgramException e) {
+				// that what we wanted
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private static final void assertBefore(AbstractJobVertex v1, AbstractJobVertex v2, List<AbstractJobVertex> list) {
+		boolean seenFirst = false;
+		for (AbstractJobVertex v : list) {
+			if (v == v1) {
+				seenFirst = true;
+			}
+			else if (v == v2) {
+				if (!seenFirst) {
+					fail("The first vertex (" + v1 + ") is not before the second vertex (" + v2 + ")");
+				}
+				break;
+			}
 		}
-
-		// Dependency chain is acyclic
-		v1.setVertexToShareInstancesWith(v2);
-		v3.setVertexToShareInstancesWith(v2);
-		v4.setVertexToShareInstancesWith(v1);
-
-		assertEquals(jg.isInstanceDependencyChainAcyclic(), true);
-
-		// Create a cycle v4 -> v1 -> v2 -> v4
-		v2.setVertexToShareInstancesWith(v4);
-
-		assertEquals(jg.isInstanceDependencyChainAcyclic(), false);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
new file mode 100644
index 0000000..e1d862e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.common.io.InitializeOnMaster;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.java.io.DiscardingOuputFormat;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+@SuppressWarnings("serial")
+public class JobTaskVertexTest {
+
+	@Test
+	public void testConnectDirectly() {
+		AbstractJobVertex source = new AbstractJobVertex("source");
+		AbstractJobVertex target = new AbstractJobVertex("target");
+		target.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
+		
+		assertTrue(source.isInputVertex());
+		assertFalse(source.isOutputVertex());
+		assertFalse(target.isInputVertex());
+		assertTrue(target.isOutputVertex());
+		
+		assertEquals(1, source.getNumberOfProducedIntermediateDataSets());
+		assertEquals(1, target.getNumberOfInputs());
+		
+		assertEquals(target.getInputs().get(0).getSource(), source.getProducedDataSets().get(0));
+		
+		assertEquals(1, source.getProducedDataSets().get(0).getConsumers().size());
+		assertEquals(target, source.getProducedDataSets().get(0).getConsumers().get(0).getTarget());
+	}
+	
+	@Test
+	public void testConnectMultipleTargets() {
+		AbstractJobVertex source = new AbstractJobVertex("source");
+		AbstractJobVertex target1= new AbstractJobVertex("target1");
+		AbstractJobVertex target2 = new AbstractJobVertex("target2");
+		target1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
+		target2.connectDataSetAsInput(source.getProducedDataSets().get(0), DistributionPattern.BIPARTITE);
+		
+		assertTrue(source.isInputVertex());
+		assertFalse(source.isOutputVertex());
+		assertFalse(target1.isInputVertex());
+		assertTrue(target1.isOutputVertex());
+		assertFalse(target2.isInputVertex());
+		assertTrue(target2.isOutputVertex());
+		
+		assertEquals(1, source.getNumberOfProducedIntermediateDataSets());
+		assertEquals(2, source.getProducedDataSets().get(0).getConsumers().size());
+		
+		assertEquals(target1.getInputs().get(0).getSource(), source.getProducedDataSets().get(0));
+		assertEquals(target2.getInputs().get(0).getSource(), source.getProducedDataSets().get(0));
+	}
+	
+	@Test
+	public void testOutputFormatVertex() {
+		try {
+			final TestingOutputFormat outputFormat = new TestingOutputFormat();
+			final OutputFormatVertex of = new OutputFormatVertex("Name");
+			new TaskConfig(of.getConfiguration()).setStubWrapper(new UserCodeObjectWrapper<OutputFormat<?>>(outputFormat));
+			final ClassLoader cl = getClass().getClassLoader();
+			
+			try {
+				of.initializeOnMaster(cl);
+				fail("Did not throw expected exception.");
+			} catch (TestException e) {
+				// all good
+			}
+			
+			OutputFormatVertex copy = SerializationUtils.clone(of);
+			try {
+				copy.initializeOnMaster(cl);
+				fail("Did not throw expected exception.");
+			} catch (TestException e) {
+				// all good
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testInputFormatVertex() {
+		try {
+			final TestInputFormat inputFormat = new TestInputFormat();
+			final InputFormatVertex vertex = new InputFormatVertex("Name");
+			new TaskConfig(vertex.getConfiguration()).setStubWrapper(new UserCodeObjectWrapper<InputFormat<?, ?>>(inputFormat));
+			
+			final ClassLoader cl = getClass().getClassLoader();
+			
+			vertex.initializeOnMaster(cl);
+			InputSplit[] splits = vertex.getInputSplitSource().createInputSplits(77);
+			
+			assertNotNull(splits);
+			assertEquals(1, splits.length);
+			assertEquals(TestSplit.class, splits[0].getClass());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static final class TestingOutputFormat extends DiscardingOuputFormat<Object> implements InitializeOnMaster {
+		@Override
+		public void initializeGlobal(int parallelism) throws IOException {
+			throw new TestException();
+		}
+	}
+	
+	private static final class TestException extends IOException {}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static final class TestSplit extends GenericInputSplit {}
+	
+	private static final class TestInputFormat extends GenericInputFormat<Object> {
+
+		@Override
+		public boolean reachedEnd()  {
+			return false;
+		}
+
+		@Override
+		public Object nextRecord(Object reuse) {
+			return null;
+		}
+		
+		@Override
+		public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
+			return new GenericInputSplit[] { new TestSplit() };
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DoubleSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DoubleSourceTask.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DoubleSourceTask.java
deleted file mode 100644
index f021de7..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DoubleSourceTask.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.fs.LineReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-
-public class DoubleSourceTask extends AbstractInvokable {
-
-	private RecordWriter<StringRecord> output1 = null;
-
-	private RecordWriter<StringRecord> output2 = null;
-
-	@Override
-	public void invoke() throws Exception {
-		this.output1.initializeSerializers();
-		this.output2.initializeSerializers();
-
-		final Iterator<FileInputSplit> splitIterator = getInputSplits();
-
-		while (splitIterator.hasNext()) {
-
-			final FileInputSplit split = splitIterator.next();
-
-			final long start = split.getStart();
-			final long length = split.getLength();
-
-			final FileSystem fs = FileSystem.get(split.getPath().toUri());
-
-			final FSDataInputStream fdis = fs.open(split.getPath());
-
-			final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
-
-			byte[] line = lineReader.readLine();
-
-			while (line != null) {
-
-				// Create a string object from the data read
-				StringRecord str = new StringRecord();
-				str.set(line);
-
-				// Send out string
-				output1.emit(str);
-				output2.emit(str);
-
-				line = lineReader.readLine();
-			}
-
-			// Close the stream;
-			lineReader.close();
-		}
-
-		this.output1.flush();
-		this.output2.flush();
-	}
-
-	@Override
-	public void registerInputOutput() {
-		this.output1 = new RecordWriter<StringRecord>(this);
-		this.output2 = new RecordWriter<StringRecord>(this);
-	}
-
-	private Iterator<FileInputSplit> getInputSplits() {
-
-		final InputSplitProvider provider = getEnvironment().getInputSplitProvider();
-
-		return new Iterator<FileInputSplit>() {
-
-			private FileInputSplit nextSplit;
-			
-			private boolean exhausted;
-
-			@Override
-			public boolean hasNext() {
-				if (exhausted) {
-					return false;
-				}
-				
-				if (nextSplit != null) {
-					return true;
-				}
-				
-				FileInputSplit split = (FileInputSplit) provider.getNextInputSplit();
-				
-				if (split != null) {
-					this.nextSplit = split;
-					return true;
-				}
-				else {
-					exhausted = true;
-					return false;
-				}
-			}
-
-			@Override
-			public FileInputSplit next() {
-				if (this.nextSplit == null && !hasNext()) {
-					throw new NoSuchElementException();
-				}
-
-				final FileInputSplit tmp = this.nextSplit;
-				this.nextSplit = null;
-				return tmp;
-			}
-
-			@Override
-			public void remove() {
-				throw new UnsupportedOperationException();
-			}
-		};
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DoubleTargetTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DoubleTargetTask.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DoubleTargetTask.java
deleted file mode 100644
index 4caf479..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DoubleTargetTask.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.io.network.api.RecordReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-public class DoubleTargetTask extends AbstractInvokable {
-
-	private RecordReader<StringRecord> input1 = null;
-
-	private RecordReader<StringRecord> input2 = null;
-
-	private RecordWriter<StringRecord> output = null;
-
-	@Override
-	public void invoke() throws Exception {
-
-		this.output.initializeSerializers();
-
-		while (this.input1.hasNext()) {
-
-			StringRecord s = input1.next();
-			this.output.emit(s);
-		}
-
-		while (this.input2.hasNext()) {
-
-			StringRecord s = input2.next();
-			this.output.emit(s);
-		}
-
-		this.output.flush();
-
-	}
-
-	@Override
-	public void registerInputOutput() {
-		this.input1 = new RecordReader<StringRecord>(this, StringRecord.class);
-		this.input2 = new RecordReader<StringRecord>(this, StringRecord.class);
-		this.output = new RecordWriter<StringRecord>(this);
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ForwardTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ForwardTask.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ForwardTask.java
deleted file mode 100644
index 2d0d8aa..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ForwardTask.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.io.network.api.RecordReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-public class ForwardTask extends AbstractInvokable {
-
-	private RecordReader<StringRecord> input = null;
-	private RecordWriter<StringRecord> output = null;
-
-	@Override
-	public void invoke() throws Exception {
-
-		this.output.initializeSerializers();
-
-		while (this.input.hasNext()) {
-
-			StringRecord s = input.next();
-			this.output.emit(s);
-		}
-
-		this.output.flush();
-	}
-
-	@Override
-	public void registerInputOutput() {
-		this.input = new RecordReader<StringRecord>(this, StringRecord.class);
-		this.output = new RecordWriter<StringRecord>(this);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index 0b222f5..b8aac10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -16,48 +16,27 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.jobmanager;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
+import java.net.ServerSocket;
 
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.ExecutionMode;
-import org.apache.flink.runtime.client.JobClient;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.client.AbstractJobResult;
+import org.apache.flink.runtime.client.JobSubmissionResult;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
-import org.apache.flink.runtime.jobgraph.JobTaskVertex;
-import org.apache.flink.runtime.operators.DataSinkTask;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testutils.ServerTestUtils;
-import org.apache.flink.runtime.testutils.tasks.DoubleSourceTask;
-import org.apache.flink.runtime.testutils.tasks.FileLineReader;
-import org.apache.flink.runtime.testutils.tasks.FileLineWriter;
-import org.apache.flink.runtime.testutils.tasks.JobFileInputVertex;
-import org.apache.flink.runtime.testutils.tasks.JobFileOutputVertex;
-import org.apache.flink.runtime.util.JarFileCreator;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.tasks.NoOpInvokable;
 import org.junit.Test;
 
 /**
@@ -65,990 +44,92 @@ import org.junit.Test;
  */
 public class JobManagerITCase {
 
-	/**
-	 * The name of the test directory some tests read their input from.
-	 */
-	private static final String INPUT_DIRECTORY = "testDirectory";
-
-	private static Configuration configuration;
-
-	private static JobManager jobManager;
-
-	/**
-	 * Starts the JobManager in local mode.
-	 */
-	@BeforeClass
-	public static void startNephele() {
-		try {
-			Configuration cfg = new Configuration();
-			cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1");
-			cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 6123);
-			cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);
-			cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-			
-			GlobalConfiguration.includeConfiguration(cfg);
-			
-			configuration = GlobalConfiguration.getConfiguration(new String[] { ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY });
-			
-			jobManager = new JobManager(ExecutionMode.LOCAL);
-
-			// Wait for the local task manager to arrive
-			ServerTestUtils.waitForJobManagerToBecomeReady(jobManager);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Could not start job manager: " + e.getMessage());
-		}
-	}
-
-	/**
-	 * Stops the JobManager
-	 */
-	@AfterClass
-	public static void stopNephele() {
-		jobManager.shutdown();
-		jobManager = null;
-	}
-	
-	/**
-	 * Tests the correctness of the union record reader with non-empty inputs.
-	 */
-	@Test
-	public void testUnionWithNonEmptyInput() {
-		testUnion(1000000);
-	}
-
-	/**
-	 * Tests of the Nephele channels with a large (> 1 MB) file.
-	 */
 	@Test
-	public void testExecutionWithLargeInputFile() {
-		test(1000000);
-	}
-
-	/**
-	 * Tests of the Nephele channels with a file of zero bytes size.
-	 */
-	@Test
-	public void testExecutionWithZeroSizeInputFile() {
-		test(0);
-	}
-
-	/**
-	 * Tests the execution of a job with a directory as input. The test directory contains files of different length.
-	 */
-	@Test
-	public void testExecutionWithDirectoryInput() {
-
-		// Define size of input
-		final int sizeOfInput = 100;
-
-		// Create test directory
-		final String testDirectory = ServerTestUtils.getTempDir() + File.separator + INPUT_DIRECTORY;
-		final File td = new File(testDirectory);
-		if (!td.exists()) {
-			td.mkdir();
-		}
-
-		File inputFile1 = null;
-		File inputFile2 = null;
-		File outputFile = null;
-		File jarFile = null;
-		JobClient jobClient = null;
-
+	public void testSingleVertexJob() {
 		try {
-			// Get name of the forward class
-			final String forwardClassName = ForwardTask.class.getSimpleName();
-
-			// Create input and jar files
-			inputFile1 = ServerTestUtils.createInputFile(INPUT_DIRECTORY, 0);
-			inputFile2 = ServerTestUtils.createInputFile(INPUT_DIRECTORY, sizeOfInput);
-			outputFile = new File(ServerTestUtils.getTempDir() + File.separator + ServerTestUtils.getRandomFilename());
-			jarFile = ServerTestUtils.createJarFile(forwardClassName);
-
-			// Create job graph
-			final JobGraph jg = new JobGraph("Job Graph 1");
-
-			// input vertex
-			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
-			i1.setInvokableClass(FileLineReader.class);
-			i1.setFilePath(new Path(new File(testDirectory).toURI()));
-			i1.setNumberOfSubtasks(1);
-
-			// task vertex 1
-			final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
-			t1.setInvokableClass(ForwardTask.class);
-			t1.setNumberOfSubtasks(1);
-
-			// task vertex 2
-			final JobTaskVertex t2 = new JobTaskVertex("Task 2", jg);
-			t2.setInvokableClass(ForwardTask.class);
-			t2.setNumberOfSubtasks(1);
-
-			// output vertex
-			JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
-			o1.setInvokableClass(FileLineWriter.class);
-			o1.setFilePath(new Path(outputFile.toURI()));
-			o1.setNumberOfSubtasks(1);
-
-			t1.setVertexToShareInstancesWith(i1);
-			t2.setVertexToShareInstancesWith(i1);
-			o1.setVertexToShareInstancesWith(i1);
-
-			// connect vertices
-			try {
-				i1.connectTo(t1, ChannelType.NETWORK);
-				t1.connectTo(t2, ChannelType.IN_MEMORY);
-				t2.connectTo(o1, ChannelType.IN_MEMORY);
-			} catch (JobGraphDefinitionException e) {
-				e.printStackTrace();
-			}
-
-			// add jar
-			jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + forwardClassName + ".jar").toURI()));
-
-			// Create job client and launch job
-			jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
-			jobClient.submitJobAndWait();
-
-			// Finally, compare output file to initial number sequence
-			final BufferedReader bufferedReader = new BufferedReader(new FileReader(outputFile));
-			for (int i = 0; i < sizeOfInput; i++) {
-				final String number = bufferedReader.readLine();
-				try {
-					assertEquals(i, Integer.parseInt(number));
-				} catch (NumberFormatException e) {
-					fail(e.getMessage());
-				}
-			}
-
-			bufferedReader.close();
-
-		} catch (NumberFormatException e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		} catch (JobExecutionException e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		} catch (IOException ioe) {
-			ioe.printStackTrace();
-			fail(ioe.getMessage());
-		} finally {
-			// Remove temporary files
-			if (inputFile1 != null) {
-				inputFile1.delete();
-			}
-			if (inputFile2 != null) {
-				inputFile2.delete();
-			}
-			if (outputFile != null) {
-				outputFile.delete();
-			}
-			if (jarFile != null) {
-				jarFile.delete();
-			}
-
-			// Remove test directory
-			if (td != null) {
-				td.delete();
-			}
-
-			if (jobClient != null) {
-				jobClient.close();
-			}
-		}
-	}
-
-	/**
-	 * Tests the Nephele execution when an exception occurs. In particular, it is tested if the information that is
-	 * wrapped by the exception is correctly passed on to the client.
-	 */
-	@Test
-	public void testExecutionWithException() {
-
-		final String exceptionClassName = ExceptionTask.class.getSimpleName();
-		File inputFile = null;
-		File outputFile = null;
-		File jarFile = null;
-		JobClient jobClient = null;
-
-		try {
-
-			inputFile = ServerTestUtils.createInputFile(0);
-			outputFile = new File(ServerTestUtils.getTempDir() + File.separator + ServerTestUtils.getRandomFilename());
-			jarFile = ServerTestUtils.createJarFile(exceptionClassName);
-
-			// Create job graph
-			final JobGraph jg = new JobGraph("Job Graph for Exception Test");
-
-			// input vertex
-			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
-			i1.setInvokableClass(FileLineReader.class);
-			i1.setFilePath(new Path(inputFile.toURI()));
-
-			// task vertex 1
-			final JobTaskVertex t1 = new JobTaskVertex("Task with Exception", jg);
-			t1.setInvokableClass(ExceptionTask.class);
-
-			// output vertex
-			JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
-			o1.setInvokableClass(FileLineWriter.class);
-			o1.setFilePath(new Path(outputFile.toURI()));
-
-			t1.setVertexToShareInstancesWith(i1);
-			o1.setVertexToShareInstancesWith(i1);
-
-			// connect vertices
-			i1.connectTo(t1, ChannelType.IN_MEMORY);
-			t1.connectTo(o1, ChannelType.IN_MEMORY);
-
-			// add jar
-			jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + exceptionClassName + ".jar")
-				.toURI()));
-
-			// Create job client and launch job
-			jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
+			final AbstractJobVertex vertex = new AbstractJobVertex("Test Vertex");
+			vertex.setParallelism(3);
+			vertex.setInvokableClass(NoOpInvokable.class);
 			
-			try {
-				jobClient.submitJobAndWait();
-			} catch (JobExecutionException e) {
-				// Check if the correct error message is encapsulated in the exception
-				if (e.getMessage() == null) {
-					fail("JobExecutionException does not contain an error message");
-				}
-				if (!e.getMessage().contains(ExceptionTask.ERROR_MESSAGE)) {
-					fail("JobExecutionException does not contain the expected error message");
-				}
-
-				return;
-			}
-
-			fail("Expected exception but did not receive it");
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		} finally {
-
-			// Remove temporary files
-			if (inputFile != null) {
-				inputFile.delete();
-			}
-			if (outputFile != null) {
-				outputFile.delete();
-			}
-			if (jarFile != null) {
-				jarFile.delete();
-			}
-
-			if (jobClient != null) {
-				jobClient.close();
-			}
-		}
-	}
-
-	/**
-	 * Tests the Nephele execution when a runtime exception during the registration of the input/output gates occurs.
-	 */
-	@Test
-	public void testExecutionWithRuntimeException() {
-
-		final String runtimeExceptionClassName = RuntimeExceptionTask.class.getSimpleName();
-		File inputFile = null;
-		File outputFile = null;
-		File jarFile = null;
-		JobClient jobClient = null;
-
-		try {
-
-			inputFile = ServerTestUtils.createInputFile(100);
-			outputFile = new File(ServerTestUtils.getTempDir() + File.separator + ServerTestUtils.getRandomFilename());
-			jarFile = ServerTestUtils.createJarFile(runtimeExceptionClassName);
-
-			// Create job graph
-			final JobGraph jg = new JobGraph("Job Graph for Exception Test");
-
-			// input vertex
-			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
-			i1.setInvokableClass(FileLineReader.class);
-			i1.setFilePath(new Path(inputFile.toURI()));
-
-			// task vertex 1
-			final JobTaskVertex t1 = new JobTaskVertex("Task with Exception", jg);
-			t1.setInvokableClass(RuntimeExceptionTask.class);
-
-			// output vertex
-			JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
-			o1.setInvokableClass(FileLineWriter.class);
-			o1.setFilePath(new Path(outputFile.toURI()));
-
-			t1.setVertexToShareInstancesWith(i1);
-			o1.setVertexToShareInstancesWith(i1);
-
-			// connect vertices
-			i1.connectTo(t1, ChannelType.IN_MEMORY);
-			t1.connectTo(o1, ChannelType.IN_MEMORY);
-
-			// add jar
-			jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + runtimeExceptionClassName
-				+ ".jar").toURI()));
-
-			// Create job client and launch job
-			jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
+			final JobGraph jobGraph = new JobGraph("Test Job", vertex);
 			
+			JobManager jm = startJobManager();
 			try {
-				jobClient.submitJobAndWait();
-			} catch (JobExecutionException e) {
-
-				// Check if the correct error message is encapsulated in the exception
-				if (e.getMessage() == null) {
-					fail("JobExecutionException does not contain an error message");
-				}
-				if (!e.getMessage().contains(RuntimeExceptionTask.RUNTIME_EXCEPTION_MESSAGE)) {
-					fail("JobExecutionException does not contain the expected error message");
-				}
-
-				// Check if the correct error message is encapsulated in the exception
-				return;
-			}
-
-			fail("Expected exception but did not receive it");
-
-		} catch (JobGraphDefinitionException jgde) {
-			fail(jgde.getMessage());
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		} finally {
-
-			// Remove temporary files
-			if (inputFile != null) {
-				inputFile.delete();
-			}
-			if (outputFile != null) {
-				outputFile.delete();
-			}
-			if (jarFile != null) {
-				jarFile.delete();
-			}
-
-			if (jobClient != null) {
-				jobClient.close();
-			}
-		}
-	}
-
-	/**
-	 * Tests the Nephele execution when a runtime exception in the output format occurs.
-	 */
-	@Test
-	public void testExecutionWithRuntimeExceptionInOutputFormat() {
-
-		final String runtimeExceptionClassName = RuntimeExceptionTask.class.getSimpleName();
-		File inputFile = null;
-		File outputFile = null;
-		File jarFile = null;
-		JobClient jobClient = null;
-
-		try {
-
-			inputFile = ServerTestUtils.createInputFile(100);
-			outputFile = new File(ServerTestUtils.getTempDir() + File.separator + ServerTestUtils.getRandomFilename());
-			jarFile = ServerTestUtils.createJarFile(runtimeExceptionClassName);
-
-			// Create job graph
-			final JobGraph jg = new JobGraph("Job Graph for Exception Test");
-
-			// input vertex
-			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
-			i1.setInvokableClass(FileLineReader.class);
-			i1.setFilePath(new Path(inputFile.toURI()));
-			i1.setNumberOfSubtasks(1);
-
-			// task vertex 1
-			final JobTaskVertex t1 = new JobTaskVertex("Task with Exception", jg);
-			t1.setInvokableClass(ForwardTask.class);
-
-			// output vertex
-			JobOutputVertex o1 = new JobOutputVertex("Output 1", jg);
-			o1.setNumberOfSubtasks(1);
-			o1.setInvokableClass(DataSinkTask.class);
-			ExceptionOutputFormat outputFormat = new ExceptionOutputFormat();
-			o1.setOutputFormat(outputFormat);
-			TaskConfig outputConfig = new TaskConfig(o1.getConfiguration());
-			outputConfig.setStubWrapper(new UserCodeObjectWrapper<OutputFormat<?>>(outputFormat));
-//			outputConfig.addInputToGroup(0);
-//			
-//			ValueSerializer<StringRecord> serializer = new ValueSerializer<StringRecord>(StringRecord.class);
-//			RuntimeStatefulSerializerFactory<StringRecord> serializerFactory = new RuntimeStatefulSerializerFactory<StringRecord>(serializer, StringRecord.class);
-//			outputConfig.setInputSerializer(serializerFactory, 0);
-
-			t1.setVertexToShareInstancesWith(i1);
-			o1.setVertexToShareInstancesWith(i1);
-
-			// connect vertices
-			i1.connectTo(t1, ChannelType.IN_MEMORY);
-			t1.connectTo(o1, ChannelType.IN_MEMORY);
-
-			// add jar
-			jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + runtimeExceptionClassName
-					+ ".jar").toURI()));
-
-			// Create job client and launch job
-			jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
-
-			try {
-				jobClient.submitJobAndWait();
-			} catch (JobExecutionException e) {
-
-				// Check if the correct error message is encapsulated in the exception
-				if (e.getMessage() == null) {
-					fail("JobExecutionException does not contain an error message");
-				}
-				if (!e.getMessage().contains(RuntimeExceptionTask.RUNTIME_EXCEPTION_MESSAGE)) {
-					fail("JobExecutionException does not contain the expected error message, " +
-							"but instead: " + e.getMessage());
+				
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+				
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+				
+				// monitor the execution
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				if (eg != null) {
+					
+					long deadline = System.currentTimeMillis() + 60*1000;
+					boolean success = false;
+					
+					while (System.currentTimeMillis() < deadline) {
+						JobStatus state = eg.getState();
+						if (state == JobStatus.FINISHED) {
+							success = true;
+							break;
+						}
+						
+						else if (state == JobStatus.FAILED || state == JobStatus.CANCELED) {
+							break;
+						}
+						else {
+							Thread.sleep(200);
+						}
+					}
+					
+					assertTrue("The job did not finish successfully.", success);
 				}
-
-				// Check if the correct error message is encapsulated in the exception
-				return;
-			}
-
-			fail("Expected exception but did not receive it");
-
-		} catch (JobGraphDefinitionException jgde) {
-			fail(jgde.getMessage());
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		} finally {
-
-			// Remove temporary files
-			if (inputFile != null) {
-				inputFile.delete();
-			}
-			if (outputFile != null) {
-				outputFile.delete();
-			}
-			if (jarFile != null) {
-				jarFile.delete();
-			}
-
-			if (jobClient != null) {
-				jobClient.close();
-			}
-		}
-	}
-
-	/**
-	 * Creates a file with a sequence of 0 to <code>limit</code> integer numbers
-	 * and triggers a sample job. The sample reads all the numbers from the input file and pushes them through a
-	 * network, a file, and an in-memory channel. Eventually, the numbers are written back to an output file. The test
-	 * is considered successful if the input file equals the output file.
-	 * 
-	 * @param limit
-	 *        the upper bound for the sequence of numbers to be generated
-	 */
-	private void test(final int limit) {
-
-		JobClient jobClient = null;
-
-		try {
-
-			// Get name of the forward class
-			final String forwardClassName = ForwardTask.class.getSimpleName();
-
-			// Create input and jar files
-			final File inputFile = ServerTestUtils.createInputFile(limit);
-			final File outputFile = new File(ServerTestUtils.getTempDir() + File.separator
-				+ ServerTestUtils.getRandomFilename());
-			final File jarFile = ServerTestUtils.createJarFile(forwardClassName);
-
-			// Create job graph
-			final JobGraph jg = new JobGraph("Job Graph 1");
-
-			// input vertex
-			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
-			i1.setInvokableClass(FileLineReader.class);
-			i1.setFilePath(new Path(inputFile.toURI()));
-			i1.setNumberOfSubtasks(1);
-
-			// task vertex 1
-			final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
-			t1.setInvokableClass(ForwardTask.class);
-			t1.setNumberOfSubtasks(1);
-
-			// task vertex 2
-			final JobTaskVertex t2 = new JobTaskVertex("Task 2", jg);
-			t2.setInvokableClass(ForwardTask.class);
-			t2.setNumberOfSubtasks(1);
-
-			// output vertex
-			JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
-			o1.setInvokableClass(FileLineWriter.class);
-			o1.setFilePath(new Path(outputFile.toURI()));
-			o1.setNumberOfSubtasks(1);
-
-			t1.setVertexToShareInstancesWith(i1);
-			t2.setVertexToShareInstancesWith(i1);
-			o1.setVertexToShareInstancesWith(i1);
-
-			// connect vertices
-			try {
-				i1.connectTo(t1, ChannelType.NETWORK);
-				t1.connectTo(t2, ChannelType.IN_MEMORY);
-				t2.connectTo(o1, ChannelType.IN_MEMORY);
-			} catch (Exception e) {
-				e.printStackTrace();
-				fail(e.getMessage());
-			}
-
-			// add jar
-			jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + forwardClassName + ".jar")
-				.toURI()));
-
-			// Create job client and launch job
-			jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
-			
-			try {
-				jobClient.submitJobAndWait();
-			} catch (JobExecutionException e) {
-				fail(e.getMessage());
-			}
-
-			// Finally, compare output file to initial number sequence
-			final BufferedReader bufferedReader = new BufferedReader(new FileReader(outputFile));
-			for (int i = 0; i < limit; i++) {
-				final String number = bufferedReader.readLine();
-				try {
-					assertEquals(i, Integer.parseInt(number));
-				} catch (NumberFormatException e) {
-					fail(e.getMessage());
+				else {
+					// already done, that was fast;
 				}
 			}
-
-			bufferedReader.close();
-
-			// Remove temporary files
-			inputFile.delete();
-			outputFile.delete();
-			jarFile.delete();
-
-		} catch (IOException ioe) {
-			ioe.printStackTrace();
-			fail(ioe.getMessage());
-		} finally {
-			if (jobClient != null) {
-				jobClient.close();
+			finally {
+				jm.shutdown();
 			}
 		}
-	}
-
-	/**
-	 * Tests the Nephele execution with a job that has two vertices, that are connected twice with each other with
-	 * different channel types.
-	 */
-	@Test
-	public void testExecutionDoubleConnection() {
-
-		File inputFile = null;
-		File outputFile = null;
-		File jarFile = new File(ServerTestUtils.getTempDir() + File.separator + "doubleConnection.jar");
-		JobClient jobClient = null;
-
-		try {
-
-			inputFile = ServerTestUtils.createInputFile(0);
-			outputFile = new File(ServerTestUtils.getTempDir() + File.separator + ServerTestUtils.getRandomFilename());
-
-			// Create required jar file
-			JarFileCreator jfc = new JarFileCreator(jarFile);
-			jfc.addClass(DoubleSourceTask.class);
-			jfc.addClass(DoubleTargetTask.class);
-			jfc.createJarFile();
-
-			// Create job graph
-			final JobGraph jg = new JobGraph("Job Graph for Double Connection Test");
-
-			// input vertex
-			final JobFileInputVertex i1 = new JobFileInputVertex("Input with two Outputs", jg);
-			i1.setInvokableClass(DoubleSourceTask.class);
-			i1.setFilePath(new Path(inputFile.toURI()));
-
-			// task vertex 1
-			final JobTaskVertex t1 = new JobTaskVertex("Task with two Inputs", jg);
-			t1.setInvokableClass(DoubleTargetTask.class);
-
-			// output vertex
-			JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
-			o1.setInvokableClass(FileLineWriter.class);
-			o1.setFilePath(new Path(outputFile.toURI()));
-
-			t1.setVertexToShareInstancesWith(i1);
-			o1.setVertexToShareInstancesWith(i1);
-
-			// connect vertices
-			i1.connectTo(t1, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-			i1.connectTo(t1, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-			t1.connectTo(o1, ChannelType.IN_MEMORY);
-
-			// add jar
-			jg.addJar(new Path(jarFile.toURI()));
-
-			// Create job client and launch job
-			jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
-			jobClient.submitJobAndWait();
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		} finally {
-
-			// Remove temporary files
-			if (inputFile != null) {
-				inputFile.delete();
-			}
-			if (outputFile != null) {
-				outputFile.delete();
-			}
-			if (jarFile != null) {
-				jarFile.delete();
-			}
-
-			if (jobClient != null) {
-				jobClient.close();
-			}
-		}
-	}
-
-	/**
-	 * Tests the Nephele job execution when the graph and the tasks are given no specific name.
-	 */
-	@Test
-	public void testEmptyTaskNames() {
-
-		File inputFile = null;
-		File outputFile = null;
-		File jarFile = new File(ServerTestUtils.getTempDir() + File.separator + "emptyNames.jar");
-		JobClient jobClient = null;
-
-		try {
-
-			inputFile = ServerTestUtils.createInputFile(0);
-			outputFile = new File(ServerTestUtils.getTempDir() + File.separator + ServerTestUtils.getRandomFilename());
-
-			// Create required jar file
-			JarFileCreator jfc = new JarFileCreator(jarFile);
-			jfc.addClass(DoubleSourceTask.class);
-			jfc.addClass(DoubleTargetTask.class);
-			jfc.createJarFile();
-
-			// Create job graph
-			final JobGraph jg = new JobGraph();
-
-			// input vertex
-			final JobFileInputVertex i1 = new JobFileInputVertex(jg);
-			i1.setInvokableClass(FileLineReader.class);
-			i1.setFilePath(new Path(inputFile.toURI()));
-
-			// output vertex
-			JobFileOutputVertex o1 = new JobFileOutputVertex(jg);
-			o1.setInvokableClass(FileLineWriter.class);
-			o1.setFilePath(new Path(outputFile.toURI()));
-
-			o1.setVertexToShareInstancesWith(i1);
-
-			// connect vertices
-			i1.connectTo(o1, ChannelType.IN_MEMORY);
-
-			// add jar
-			jg.addJar(new Path(jarFile.toURI()));
-
-			// Create job client and launch job
-			jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
-			jobClient.submitJobAndWait();
-		} catch (Exception e) {
+		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
-		} finally {
-
-			// Remove temporary files
-			if (inputFile != null) {
-				inputFile.delete();
-			}
-			if (outputFile != null) {
-				outputFile.delete();
-			}
-			if (jarFile != null) {
-				jarFile.delete();
-			}
-
-			if (jobClient != null) {
-				jobClient.close();
-			}
 		}
 	}
-
-	/**
-	 * Tests the correctness of the union record reader with empty inputs.
-	 */
-	@Test
-	public void testUnionWithEmptyInput() {
-		testUnion(0);
-	}
-
-	/**
-	 * Tests the correctness of the union reader for different input sizes.
-	 * 
-	 * @param limit
-	 *        the upper bound for the sequence of numbers to be generated
-	 */
-	private void testUnion(final int limit) {
-
-		File inputFile1 = null;
-		File inputFile2 = null;
-		File outputFile = null;
-		File jarFile = new File(ServerTestUtils.getTempDir() + File.separator + "unionWithEmptyInput.jar");
-		JobClient jobClient = null;
-
-		try {
-
-			inputFile1 = ServerTestUtils.createInputFile(limit);
-			inputFile2 = ServerTestUtils.createInputFile(limit);
-			outputFile = new File(ServerTestUtils.getTempDir() + File.separator + ServerTestUtils.getRandomFilename());
-
-			// Create required jar file
-			JarFileCreator jfc = new JarFileCreator(jarFile);
-			jfc.addClass(UnionTask.class);
-			jfc.createJarFile();
-
-			// Create job graph
-			final JobGraph jg = new JobGraph("Union job " + limit);
-
-			// input vertex 1
-			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
-			i1.setInvokableClass(FileLineReader.class);
-			i1.setFilePath(new Path(inputFile1.toURI()));
-
-			// input vertex 2
-			final JobFileInputVertex i2 = new JobFileInputVertex("Input 2", jg);
-			i2.setInvokableClass(FileLineReader.class);
-			i2.setFilePath(new Path(inputFile2.toURI()));
-
-			// union task
-			final JobTaskVertex u1 = new JobTaskVertex("Union", jg);
-			u1.setInvokableClass(UnionTask.class);
-
-			// output vertex
-			JobFileOutputVertex o1 = new JobFileOutputVertex("Output", jg);
-			o1.setInvokableClass(FileLineWriter.class);
-			o1.setFilePath(new Path(outputFile.toURI()));
-			o1.setNumberOfSubtasks(1);
-
-			i1.setVertexToShareInstancesWith(o1);
-			i2.setVertexToShareInstancesWith(o1);
-			u1.setVertexToShareInstancesWith(o1);
-
-			// connect vertices
-			i1.connectTo(u1, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-			i2.connectTo(u1, ChannelType.IN_MEMORY);
-			u1.connectTo(o1, ChannelType.IN_MEMORY);
-
-			// add jar
-			jg.addJar(new Path(jarFile.toURI()));
-
-			// Create job client and launch job
-			jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
-
-			try {
-				jobClient.submitJobAndWait();
-			} catch (JobExecutionException e) {
-				fail(e.getMessage());
-			}
-
-			// Finally, check the output
-			final Map<Integer, Integer> expectedNumbers = new HashMap<Integer, Integer>();
-			final Integer two = Integer.valueOf(2);
-			for (int i = 0; i < limit; ++i) {
-				expectedNumbers.put(Integer.valueOf(i), two);
-			}
-
-			final BufferedReader bufferedReader = new BufferedReader(new FileReader(outputFile));
-			String line = bufferedReader.readLine();
-			while (line != null) {
-
-				final Integer number = Integer.valueOf(Integer.parseInt(line));
-				Integer val = expectedNumbers.get(number);
-				if (val == null) {
-					fail("Found unexpected number in union output: " + number);
-				} else {
-					val = Integer.valueOf(val.intValue() - 1);
-					if (val.intValue() < 0) {
-						fail(val + " occurred more than twice in union output");
-					}
-					if (val.intValue() == 0) {
-						expectedNumbers.remove(number);
-					} else {
-						expectedNumbers.put(number, val);
-					}
-				}
-
-				line = bufferedReader.readLine();
-			}
-
-			bufferedReader.close();
-
-			if (!expectedNumbers.isEmpty()) {
-				final StringBuilder str = new StringBuilder();
-				str.append("The following numbers have not been found in the union output:\n");
-				final Iterator<Map.Entry<Integer, Integer>> it = expectedNumbers.entrySet().iterator();
-				while (it.hasNext()) {
-					final Map.Entry<Integer, Integer> entry = it.next();
-					str.append(entry.getKey().toString());
-					str.append(" (");
-					str.append(entry.getValue().toString());
-					str.append("x)\n");
-				}
-
-				fail(str.toString());
-			}
-
-		} catch (JobGraphDefinitionException jgde) {
-			fail(jgde.getMessage());
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		} finally {
-
-			// Remove temporary files
-			if (inputFile1 != null) {
-				inputFile1.delete();
-			}
-			if (inputFile2 != null) {
-				inputFile2.delete();
-			}
-			if (outputFile != null) {
-				outputFile.delete();
-			}
-			if (jarFile != null) {
-				jarFile.delete();
-			}
-
-			if (jobClient != null) {
-				jobClient.close();
-			}
-		}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static final JobManager startJobManager() throws Exception {
+		Configuration cfg = new Configuration();
+		cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+		cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getAvailablePort());
+		cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
+		
+		GlobalConfiguration.includeConfiguration(cfg);
+		
+		JobManager jm = new JobManager(ExecutionMode.LOCAL);
+		return jm;
 	}
-
-	/**
-	 * Tests the execution of a job with a large degree of parallelism. In particular, the tests checks that the overall
-	 * runtime of the test does not exceed a certain time limit.
-	 */
-	@Test
-	public void testExecutionWithLargeDoP() {
-
-		// The degree of parallelism to be used by tasks in this job.
-		final int numberOfSubtasks = 64;
-
-		File inputFile1 = null;
-		File inputFile2 = null;
-		File outputFile = null;
-		File jarFile = new File(ServerTestUtils.getTempDir() + File.separator + "largeDoP.jar");
-		JobClient jobClient = null;
-
-		try {
-
-			inputFile1 = ServerTestUtils.createInputFile(0);
-			inputFile2 = ServerTestUtils.createInputFile(0);
-			outputFile = new File(ServerTestUtils.getTempDir() + File.separator + ServerTestUtils.getRandomFilename());
-
-			// Create required jar file
-			JarFileCreator jfc = new JarFileCreator(jarFile);
-			jfc.addClass(UnionTask.class);
-			jfc.createJarFile();
-
-			// Create job graph
-			final JobGraph jg = new JobGraph("Job with large DoP (" + numberOfSubtasks + ")");
-
-			// input vertex 1
-			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
-			i1.setInvokableClass(FileLineReader.class);
-			i1.setFilePath(new Path(inputFile1.toURI()));
-			i1.setNumberOfSubtasks(numberOfSubtasks);
-
-			// input vertex 2
-			final JobFileInputVertex i2 = new JobFileInputVertex("Input 2", jg);
-			i2.setInvokableClass(FileLineReader.class);
-			i2.setFilePath(new Path(inputFile2.toURI()));
-			i2.setNumberOfSubtasks(numberOfSubtasks);
-
-			// union task
-			final JobTaskVertex f1 = new JobTaskVertex("Forward 1", jg);
-			f1.setInvokableClass(DoubleTargetTask.class);
-			f1.setNumberOfSubtasks(numberOfSubtasks);
-
-			// output vertex
-			JobFileOutputVertex o1 = new JobFileOutputVertex("Output", jg);
-			o1.setInvokableClass(FileLineWriter.class);
-			o1.setFilePath(new Path(outputFile.toURI()));
-			o1.setNumberOfSubtasks(numberOfSubtasks);
-
-			i1.setVertexToShareInstancesWith(o1);
-			i2.setVertexToShareInstancesWith(o1);
-			f1.setVertexToShareInstancesWith(o1);
-
-			// connect vertices
-			i1.connectTo(f1, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-			i2.connectTo(f1, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-			f1.connectTo(o1, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-
-			// add jar
-			jg.addJar(new Path(jarFile.toURI()));
-
-			// Create job client and launch job
-			jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
-			
+	
+	private static int getAvailablePort() throws IOException {
+		for (int i = 0; i < 50; i++) {
+			ServerSocket serverSocket = null;
 			try {
-				
-				jobClient.submitJobAndWait();
-			} catch (JobExecutionException e) {
-				// Job execution should lead to an error due to lack of resources
-				return;
-			} catch (Exception e) {
-				e.printStackTrace();
-				fail(e.getMessage());
-			}
-
-			fail("Undetected lack of resources");
-
-		} catch (JobGraphDefinitionException jgde) {
-			fail(jgde.getMessage());
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		} finally {
-
-			// Remove temporary files
-			if (inputFile1 != null) {
-				inputFile1.delete();
-			}
-			if (inputFile2 != null) {
-				inputFile2.delete();
-			}
-			if (outputFile != null) {
-				if (outputFile.isDirectory()) {
-					final String[] files = outputFile.list();
-					final String outputDir = outputFile.getAbsolutePath();
-					for (final String file : files) {
-						new File(outputDir + File.separator + file).delete();
-					}
+				serverSocket = new ServerSocket(0);
+				int port = serverSocket.getLocalPort();
+				if (port != 0) {
+					return port;
 				}
-				outputFile.delete();
-			}
-			if (jarFile != null) {
-				jarFile.delete();
-			}
-
-			if (jobClient != null) {
-				jobClient.close();
+			} finally {
+				serverSocket.close();
 			}
 		}
+		
+		throw new IOException("could not find free port");
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
new file mode 100644
index 0000000..835af95
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+public class JobManagerTest {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RuntimeExceptionTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RuntimeExceptionTask.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RuntimeExceptionTask.java
deleted file mode 100644
index 6004bc1..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RuntimeExceptionTask.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-/**
- * This task throws a {@link RuntimeException} when the method <code>registerInputOutput</code> is called.
- */
-public class RuntimeExceptionTask extends AbstractInvokable {
-
-	/**
-	 * The message which is used for the test runtime exception.
-	 */
-	public static final String RUNTIME_EXCEPTION_MESSAGE = "This is a test runtime exception";
-
-
-	@Override
-	public void registerInputOutput() {
-		throw new RuntimeException(RUNTIME_EXCEPTION_MESSAGE);
-	}
-
-	@Override
-	public void invoke() {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/UnionTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/UnionTask.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/UnionTask.java
deleted file mode 100644
index 9d01907..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/UnionTask.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.io.network.api.MutableRecordReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.io.network.api.UnionRecordReader;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-/**
- * A simple implementation of a task using a {@link UnionRecordReader}.
- */
-public class UnionTask extends AbstractInvokable {
-
-	/**
-	 * The union record reader to be used during the tests.
-	 */
-	private UnionRecordReader<StringRecord> unionReader;
-
-	private RecordWriter<StringRecord> writer;
-	
-	
-	@Override
-	public void registerInputOutput() {
-
-		@SuppressWarnings("unchecked")
-		MutableRecordReader<StringRecord>[] recordReaders = (MutableRecordReader<StringRecord>[]) new MutableRecordReader<?>[2];
-		recordReaders[0] = new MutableRecordReader<StringRecord>(this);
-		recordReaders[1] = new MutableRecordReader<StringRecord>(this);
-		this.unionReader = new UnionRecordReader<StringRecord>(recordReaders, StringRecord.class);
-		
-		this.writer = new RecordWriter<StringRecord>(this);
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		this.writer.initializeSerializers();
-
-		while (this.unionReader.hasNext()) {
-			this.writer.emit(this.unionReader.next());
-		}
-
-		this.writer.flush();
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java
deleted file mode 100644
index 2b2c81d..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.splitassigner;
-
-import static org.junit.Assert.*;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.InputSplit;
-import org.junit.Test;
-
-
-public class DefaultSplitAssignerTest {
-
-	@Test
-	public void testSerialSplitAssignment() {
-		try {
-			final int NUM_SPLITS = 50;
-			
-			Set<InputSplit> splits = new HashSet<InputSplit>();
-			for (int i = 0; i < NUM_SPLITS; i++) {
-				splits.add(new GenericInputSplit(i, NUM_SPLITS));
-			}
-			
-			DefaultInputSplitAssigner ia = new DefaultInputSplitAssigner(splits);
-			InputSplit is = null;
-			while ((is = ia.getNextInputSplit("")) != null) {
-				assertTrue(splits.remove(is));
-			}
-			
-			assertTrue(splits.isEmpty());
-			assertNull(ia.getNextInputSplit(""));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testConcurrentSplitAssignment() {
-		try {
-			final int NUM_THREADS = 10;
-			final int NUM_SPLITS = 500;
-			final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
-			
-			Set<InputSplit> splits = new HashSet<InputSplit>();
-			for (int i = 0; i < NUM_SPLITS; i++) {
-				splits.add(new GenericInputSplit(i, NUM_SPLITS));
-			}
-			
-			final DefaultInputSplitAssigner ia = new DefaultInputSplitAssigner(splits);
-			
-			final AtomicInteger splitsRetrieved = new AtomicInteger(0);
-			final AtomicInteger sumOfIds = new AtomicInteger(0);
-			
-			Runnable retriever = new Runnable() {
-				
-				@Override
-				public void run() {
-					String host = "";
-					GenericInputSplit split;
-					while ((split = (GenericInputSplit) ia.getNextInputSplit(host)) != null) {
-						splitsRetrieved.incrementAndGet();
-						sumOfIds.addAndGet(split.getSplitNumber());
-					}
-				}
-			};
-			
-			// create the threads
-			Thread[] threads = new Thread[NUM_THREADS];
-			for (int i = 0; i < NUM_THREADS; i++) {
-				threads[i] = new Thread(retriever);
-				threads[i].setDaemon(true);
-			}
-			
-			// launch concurrently
-			for (int i = 0; i < NUM_THREADS; i++) {
-				threads[i].start();
-			}
-			
-			// sync
-			for (int i = 0; i < NUM_THREADS; i++) {
-				threads[i].join(5000);
-			}
-			
-			// verify
-			for (int i = 0; i < NUM_THREADS; i++) {
-				if (threads[i].isAlive()) {
-					fail("The concurrency test case is erroneous, the thread did not respond in time.");
-				}
-			}
-			
-			assertEquals(NUM_SPLITS, splitsRetrieved.get());
-			assertEquals(SUM_OF_IDS, sumOfIds.get());
-			
-			// nothing left
-			assertNull(ia.getNextInputSplit(""));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java
deleted file mode 100644
index fdc1cf4..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.splitassigner;
-
-import static org.junit.Assert.*;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.LocatableInputSplit;
-
-import org.junit.Test;
-
-
-public class LocatableSplitAssignerTest {
-	
-	@Test
-	public void testSerialSplitAssignmentWithNullHost() {
-		try {
-			final int NUM_SPLITS = 50;
-			final String[][] hosts = new String[][] {
-					new String[] { "localhost" },
-					new String[0],
-					null
-			};
-			
-			// load some splits
-			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
-			for (int i = 0; i < NUM_SPLITS; i++) {
-				splits.add(new LocatableInputSplit(i, hosts[i%3]));
-			}
-			
-			// get all available splits
-			LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
-			InputSplit is = null;
-			while ((is = ia.getNextInputSplit(null)) != null) {
-				assertTrue(splits.remove(is));
-			}
-			
-			// check we had all
-			assertTrue(splits.isEmpty());
-			assertNull(ia.getNextInputSplit(""));
-			assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
-			assertEquals(0, ia.getNumberOfLocalAssignments());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSerialSplitAssignmentAllForSameHost() {
-		try {
-			final int NUM_SPLITS = 50;
-			
-			// load some splits
-			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
-			for (int i = 0; i < NUM_SPLITS; i++) {
-				splits.add(new LocatableInputSplit(i, "testhost"));
-			}
-			
-			// get all available splits
-			LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
-			InputSplit is = null;
-			while ((is = ia.getNextInputSplit("testhost")) != null) {
-				assertTrue(splits.remove(is));
-			}
-			
-			// check we had all
-			assertTrue(splits.isEmpty());
-			assertNull(ia.getNextInputSplit(""));
-			
-			assertEquals(0, ia.getNumberOfRemoteAssignments());
-			assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSerialSplitAssignmentAllForRemoteHost() {
-		try {
-			final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" };
-			final int NUM_SPLITS = 10 * hosts.length;
-			
-			// load some splits
-			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
-			for (int i = 0; i < NUM_SPLITS; i++) {
-				splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
-			}
-			
-			// get all available splits
-			LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
-			InputSplit is = null;
-			while ((is = ia.getNextInputSplit("testhost")) != null) {
-				assertTrue(splits.remove(is));
-			}
-			
-			// check we had all
-			assertTrue(splits.isEmpty());
-			assertNull(ia.getNextInputSplit("anotherHost"));
-			
-			assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
-			assertEquals(0, ia.getNumberOfLocalAssignments());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSerialSplitAssignmentMixedLocalHost() {
-		try {
-			final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" };
-			final int NUM_SPLITS = 10 * hosts.length;
-			
-			// load some splits
-			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
-			for (int i = 0; i < NUM_SPLITS; i++) {
-				splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
-			}
-			
-			// get all available splits
-			LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
-			InputSplit is = null;
-			int i = 0;
-			while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length])) != null) {
-				assertTrue(splits.remove(is));
-			}
-			
-			// check we had all
-			assertTrue(splits.isEmpty());
-			assertNull(ia.getNextInputSplit("anotherHost"));
-			
-			assertEquals(0, ia.getNumberOfRemoteAssignments());
-			assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testConcurrentSplitAssignmentNullHost() {
-		try {
-			final int NUM_THREADS = 10;
-			final int NUM_SPLITS = 500;
-			final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
-			
-			final String[][] hosts = new String[][] {
-					new String[] { "localhost" },
-					new String[0],
-					null
-			};
-			
-			// load some splits
-			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
-			for (int i = 0; i < NUM_SPLITS; i++) {
-				splits.add(new LocatableInputSplit(i, hosts[i%3]));
-			}
-			
-			final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
-			
-			final AtomicInteger splitsRetrieved = new AtomicInteger(0);
-			final AtomicInteger sumOfIds = new AtomicInteger(0);
-			
-			Runnable retriever = new Runnable() {
-				
-				@Override
-				public void run() {
-					LocatableInputSplit split;
-					while ((split = ia.getNextInputSplit(null)) != null) {
-						splitsRetrieved.incrementAndGet();
-						sumOfIds.addAndGet(split.getSplitNumber());
-					}
-				}
-			};
-			
-			// create the threads
-			Thread[] threads = new Thread[NUM_THREADS];
-			for (int i = 0; i < NUM_THREADS; i++) {
-				threads[i] = new Thread(retriever);
-				threads[i].setDaemon(true);
-			}
-			
-			// launch concurrently
-			for (int i = 0; i < NUM_THREADS; i++) {
-				threads[i].start();
-			}
-			
-			// sync
-			for (int i = 0; i < NUM_THREADS; i++) {
-				threads[i].join(5000);
-			}
-			
-			// verify
-			for (int i = 0; i < NUM_THREADS; i++) {
-				if (threads[i].isAlive()) {
-					fail("The concurrency test case is erroneous, the thread did not respond in time.");
-				}
-			}
-			
-			assertEquals(NUM_SPLITS, splitsRetrieved.get());
-			assertEquals(SUM_OF_IDS, sumOfIds.get());
-			
-			// nothing left
-			assertNull(ia.getNextInputSplit(""));
-			
-			assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
-			assertEquals(0, ia.getNumberOfLocalAssignments());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testConcurrentSplitAssignmentForSingleHost() {
-		try {
-			final int NUM_THREADS = 10;
-			final int NUM_SPLITS = 500;
-			final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
-			
-			// load some splits
-			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
-			for (int i = 0; i < NUM_SPLITS; i++) {
-				splits.add(new LocatableInputSplit(i, "testhost"));
-			}
-			
-			final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
-			
-			final AtomicInteger splitsRetrieved = new AtomicInteger(0);
-			final AtomicInteger sumOfIds = new AtomicInteger(0);
-			
-			Runnable retriever = new Runnable() {
-				
-				@Override
-				public void run() {
-					LocatableInputSplit split;
-					while ((split = ia.getNextInputSplit("testhost")) != null) {
-						splitsRetrieved.incrementAndGet();
-						sumOfIds.addAndGet(split.getSplitNumber());
-					}
-				}
-			};
-			
-			// create the threads
-			Thread[] threads = new Thread[NUM_THREADS];
-			for (int i = 0; i < NUM_THREADS; i++) {
-				threads[i] = new Thread(retriever);
-				threads[i].setDaemon(true);
-			}
-			
-			// launch concurrently
-			for (int i = 0; i < NUM_THREADS; i++) {
-				threads[i].start();
-			}
-			
-			// sync
-			for (int i = 0; i < NUM_THREADS; i++) {
-				threads[i].join(5000);
-			}
-			
-			// verify
-			for (int i = 0; i < NUM_THREADS; i++) {
-				if (threads[i].isAlive()) {
-					fail("The concurrency test case is erroneous, the thread did not respond in time.");
-				}
-			}
-			
-			assertEquals(NUM_SPLITS, splitsRetrieved.get());
-			assertEquals(SUM_OF_IDS, sumOfIds.get());
-			
-			// nothing left
-			assertNull(ia.getNextInputSplit("testhost"));
-			
-			assertEquals(0, ia.getNumberOfRemoteAssignments());
-			assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testConcurrentSplitAssignmentForMultipleHosts() {
-		try {
-			final int NUM_THREADS = 10;
-			final int NUM_SPLITS = 500;
-			final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
-			
-			final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" };
-			
-			// load some splits
-			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
-			for (int i = 0; i < NUM_SPLITS; i++) {
-				splits.add(new LocatableInputSplit(i, hosts[i%hosts.length]));
-			}
-			
-			final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
-			
-			final AtomicInteger splitsRetrieved = new AtomicInteger(0);
-			final AtomicInteger sumOfIds = new AtomicInteger(0);
-			
-			Runnable retriever = new Runnable() {
-				
-				@Override
-				public void run() {
-					final String threadHost = hosts[(int) (Math.random() * hosts.length)];
-					
-					LocatableInputSplit split;
-					while ((split = ia.getNextInputSplit(threadHost)) != null) {
-						splitsRetrieved.incrementAndGet();
-						sumOfIds.addAndGet(split.getSplitNumber());
-					}
-				}
-			};
-			
-			// create the threads
-			Thread[] threads = new Thread[NUM_THREADS];
-			for (int i = 0; i < NUM_THREADS; i++) {
-				threads[i] = new Thread(retriever);
-				threads[i].setDaemon(true);
-			}
-			
-			// launch concurrently
-			for (int i = 0; i < NUM_THREADS; i++) {
-				threads[i].start();
-			}
-			
-			// sync
-			for (int i = 0; i < NUM_THREADS; i++) {
-				threads[i].join(5000);
-			}
-			
-			// verify
-			for (int i = 0; i < NUM_THREADS; i++) {
-				if (threads[i].isAlive()) {
-					fail("The concurrency test case is erroneous, the thread did not respond in time.");
-				}
-			}
-			
-			assertEquals(NUM_SPLITS, splitsRetrieved.get());
-			assertEquals(SUM_OF_IDS, sumOfIds.get());
-			
-			// nothing left
-			assertNull(ia.getNextInputSplit("testhost"));
-			
-			// at least one fraction of hosts needs be local, no matter how bad the thread races
-			assertTrue(ia.getNumberOfLocalAssignments() >= NUM_SPLITS / hosts.length);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/NoOpInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/NoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/NoOpInvokable.java
new file mode 100644
index 0000000..c58da13
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/NoOpInvokable.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * An invokable that does nothing.
+ */
+public class NoOpInvokable extends AbstractInvokable {
+
+	@Override
+	public void registerInputOutput() {}
+
+	@Override
+	public void invoke() {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
index 93e52f6..c20e5dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.sort;
 
 import java.util.ArrayList;
@@ -37,7 +36,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
 import org.apache.flink.runtime.operators.sort.MergeIterator;
 import org.apache.flink.runtime.operators.sort.MergeMatchIterator;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index ccb5215..86c32ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.testutils;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index aed4c78..a4d0f32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -250,16 +250,6 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 	}
 
 	@Override
-	public void userThreadStarted(final Thread userThread) {
-		// Nothing to do here
-	}
-
-	@Override
-	public void userThreadFinished(final Thread userThread) {
-		// Nothing to do here
-	}
-
-	@Override
 	public InputSplitProvider getInputSplitProvider() {
 		return this.inputSplitProvider;
 	}