You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:26 UTC

[02/63] [abbrv] Refactor job graph construction to incremental attachment based

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 8b891a9..78c9288 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.util;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -30,7 +29,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
 import org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator;
 import org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator;
 import org.apache.flink.runtime.operators.sort.MergeMatchIterator;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java
new file mode 100644
index 0000000..3f87040
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+final class OneShotLatch {
+	
+	private final Object lock = new Object();
+	
+	private boolean triggered;
+	
+	public void trigger() {
+		synchronized (lock) {
+			triggered = true;
+			lock.notifyAll();
+		}
+	}
+	
+	public void await() throws InterruptedException {
+		synchronized (lock) {
+			while (!triggered) {
+				lock.wait();
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
new file mode 100644
index 0000000..91dc5b3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.junit.Test;
+
+public class TaskExecutionStateTest {
+
+	@Test
+	public void testEqualsHashCode() {
+		try {
+			final JobID jid = new JobID();
+			final ExecutionAttemptID executionId = new ExecutionAttemptID();
+			final ExecutionState2 state = ExecutionState2.RUNNING;
+			final String description = "some test description";
+			
+			TaskExecutionState s1 = new TaskExecutionState(jid, executionId, state, description);
+			TaskExecutionState s2 = new TaskExecutionState(jid, executionId, state, description);
+			
+			assertEquals(s1.hashCode(), s2.hashCode());
+			assertEquals(s1, s2);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSerialization() {
+		try {
+			final JobID jid = new JobID();
+			final ExecutionAttemptID executionId = new ExecutionAttemptID();
+			final ExecutionState2 state = ExecutionState2.DEPLOYING;
+			final String description = "foo bar";
+			
+			TaskExecutionState original = new TaskExecutionState(jid, executionId, state, description);
+			
+			TaskExecutionState writableCopy = CommonTestUtils.createCopyWritable(original);
+			TaskExecutionState javaSerCopy = CommonTestUtils.createCopySerializable(original);
+			
+			assertEquals(original, writableCopy);
+			assertEquals(original, javaSerCopy);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
new file mode 100644
index 0000000..e7639e3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.ExecutionMode;
+import org.apache.flink.runtime.deployment.ChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.io.network.channels.ChannelID;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.types.IntegerRecord;
+import org.apache.flink.util.LogUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+
+public class TaskManagerTest {
+
+	@BeforeClass
+	public static void reduceLogLevel() {
+		LogUtils.initializeDefaultTestConsoleLogger();
+	}
+	
+	@Test
+	public void testSetupTaskManager() {
+		try {
+			JobManager jobManager = getJobManagerMockBase();
+			
+			TaskManager tm = createTaskManager(jobManager);
+
+			JobID jid = new JobID();
+			JobVertexID vid = new JobVertexID();
+			ExecutionAttemptID eid = new ExecutionAttemptID();
+			
+			TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7,
+					new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(),
+					Collections.<GateDeploymentDescriptor>emptyList(), 
+					Collections.<GateDeploymentDescriptor>emptyList(),
+					new String[0], 0);
+			
+			LibraryCacheManager.register(jid, new String[0]);
+			
+			TaskOperationResult result = tm.submitTask(tdd);
+			assertTrue(result.isSuccess());
+			assertEquals(eid, result.getExecutionId());
+			assertEquals(vid, result.getVertexId());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testJobSubmissionAndCanceling() {
+		try {
+			JobManager jobManager = getJobManagerMockBase();
+			
+			TaskManager tm = createTaskManager(jobManager);
+
+			JobID jid1 = new JobID();
+			JobID jid2 = new JobID();
+			
+			JobVertexID vid1 = new JobVertexID();
+			JobVertexID vid2 = new JobVertexID();
+			
+			ExecutionAttemptID eid1 = new ExecutionAttemptID();
+			ExecutionAttemptID eid2 = new ExecutionAttemptID();
+			
+			TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, "TestTask1", 1, 5,
+					new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
+					Collections.<GateDeploymentDescriptor>emptyList(), 
+					Collections.<GateDeploymentDescriptor>emptyList(),
+					new String[0], 0);
+			
+			TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7,
+					new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
+					Collections.<GateDeploymentDescriptor>emptyList(), 
+					Collections.<GateDeploymentDescriptor>emptyList(),
+					new String[0], 0);
+			
+			LibraryCacheManager.register(jid1, new String[0]);
+			LibraryCacheManager.register(jid2, new String[0]);
+			assertNotNull(LibraryCacheManager.getClassLoader(jid1));
+			assertNotNull(LibraryCacheManager.getClassLoader(jid2));
+			
+			TaskOperationResult result1 = tm.submitTask(tdd1);
+			TaskOperationResult result2 = tm.submitTask(tdd2);
+			
+			assertTrue(result1.isSuccess());
+			assertTrue(result2.isSuccess());
+			assertEquals(eid1, result1.getExecutionId());
+			assertEquals(eid2, result2.getExecutionId());
+			assertEquals(vid1, result1.getVertexId());
+			assertEquals(vid2, result2.getVertexId());
+			
+			Map<ExecutionAttemptID, Task> tasks = tm.getAllRunningTasks();
+			assertEquals(2, tasks.size());
+			
+			Task t1 = tasks.get(eid1);
+			Task t2 = tasks.get(eid2);
+			assertNotNull(t1);
+			assertNotNull(t2);
+			
+			assertEquals(ExecutionState2.RUNNING, t1.getExecutionState());
+			assertEquals(ExecutionState2.RUNNING, t2.getExecutionState());
+			
+			// cancel one task
+			assertTrue(tm.cancelTask(vid1, 1, eid1).isSuccess());
+			t1.getEnvironment().getExecutingThread().join();
+			assertEquals(ExecutionState2.CANCELED, t1.getExecutionState());
+			
+			tasks = tm.getAllRunningTasks();
+			assertEquals(1, tasks.size());
+			
+			// try to cancel a non existing task
+			assertFalse(tm.cancelTask(vid1, 1, eid1).isSuccess());
+			
+			// cancel the second task
+			assertTrue(tm.cancelTask(vid2, 2, eid2).isSuccess());
+			t2.getEnvironment().getExecutingThread().join();
+			assertEquals(ExecutionState2.CANCELED, t2.getExecutionState());
+			
+			tasks = tm.getAllRunningTasks();
+			assertEquals(0, tasks.size());
+			
+			// the class loaders should be de-registered
+			assertNull(LibraryCacheManager.getClassLoader(jid1));
+			assertNull(LibraryCacheManager.getClassLoader(jid2));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testGateChannelEdgeMismatch() {
+		try {
+			JobManager jobManager = getJobManagerMockBase();
+			
+			TaskManager tm = createTaskManager(jobManager);
+
+			JobID jid = new JobID();;
+			
+			JobVertexID vid1 = new JobVertexID();
+			JobVertexID vid2 = new JobVertexID();
+			
+			ExecutionAttemptID eid1 = new ExecutionAttemptID();
+			ExecutionAttemptID eid2 = new ExecutionAttemptID();
+			
+			TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
+					new Configuration(), new Configuration(), Sender.class.getName(),
+					Collections.<GateDeploymentDescriptor>emptyList(),
+					Collections.<GateDeploymentDescriptor>emptyList(),
+					new String[0], 0);
+			
+			TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
+					new Configuration(), new Configuration(), Receiver.class.getName(),
+					Collections.<GateDeploymentDescriptor>emptyList(),
+					Collections.<GateDeploymentDescriptor>emptyList(),
+					new String[0], 0);
+			
+			LibraryCacheManager.register(jid, new String[0]);
+			LibraryCacheManager.register(jid, new String[0]);
+			assertNotNull(LibraryCacheManager.getClassLoader(jid));
+			
+			assertFalse(tm.submitTask(tdd1).isSuccess());
+			assertFalse(tm.submitTask(tdd2).isSuccess());
+			
+			Map<ExecutionAttemptID, Task> tasks = tm.getAllRunningTasks();
+			assertEquals(0, tasks.size());
+			
+			// the class loaders should be de-registered
+			assertNull(LibraryCacheManager.getClassLoader(jid));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testRunJobWithForwardChannel() {
+		try {
+			JobID jid = new JobID();
+			
+			JobVertexID vid1 = new JobVertexID();
+			JobVertexID vid2 = new JobVertexID();
+			
+			ExecutionAttemptID eid1 = new ExecutionAttemptID();
+			ExecutionAttemptID eid2 = new ExecutionAttemptID();
+			
+			ChannelID senderId = new ChannelID();
+			ChannelID receiverId = new ChannelID();
+			
+			JobManager jobManager = getJobManagerMockBase();
+			when(jobManager.lookupConnectionInfo(Matchers.any(InstanceConnectionInfo.class), Matchers.eq(jid), Matchers.eq(senderId)))
+				.thenReturn(ConnectionInfoLookupResponse.createReceiverFoundAndReady(receiverId));
+			
+			TaskManager tm = createTaskManager(jobManager);
+			
+			ChannelDeploymentDescriptor cdd = new ChannelDeploymentDescriptor(senderId, receiverId);
+			
+			TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
+					new Configuration(), new Configuration(), Sender.class.getName(),
+					Collections.singletonList(new GateDeploymentDescriptor(Collections.singletonList(cdd))), 
+					Collections.<GateDeploymentDescriptor>emptyList(),
+					new String[0], 0);
+			
+			TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
+					new Configuration(), new Configuration(), Receiver.class.getName(),
+					Collections.<GateDeploymentDescriptor>emptyList(),
+					Collections.singletonList(new GateDeploymentDescriptor(Collections.singletonList(cdd))),
+					new String[0], 0);
+			
+			// register the job twice (for two tasks) at the lib cache
+			LibraryCacheManager.register(jid, new String[0]);
+			LibraryCacheManager.register(jid, new String[0]);
+			assertNotNull(LibraryCacheManager.getClassLoader(jid));
+			
+			// deploy sender before receiver, so the target is online when the sender requests the connection info
+			TaskOperationResult result2 = tm.submitTask(tdd2);
+			TaskOperationResult result1 = tm.submitTask(tdd1);
+			
+			assertTrue(result1.isSuccess());
+			assertTrue(result2.isSuccess());
+			assertEquals(eid1, result1.getExecutionId());
+			assertEquals(eid2, result2.getExecutionId());
+			assertEquals(vid1, result1.getVertexId());
+			assertEquals(vid2, result2.getVertexId());
+			
+			Map<ExecutionAttemptID, Task> tasks = tm.getAllRunningTasks();
+			
+			Task t1 = tasks.get(eid1);
+			Task t2 = tasks.get(eid2);
+			
+			// wait until the tasks are done
+			if (t1 != null) {
+				t1.getEnvironment().getExecutingThread().join();
+			}
+			if (t2 != null) {
+				t2.getEnvironment().getExecutingThread().join();
+			}
+			
+			assertEquals(ExecutionState2.FINISHED, t1.getExecutionState());
+			assertEquals(ExecutionState2.FINISHED, t2.getExecutionState());
+			
+			tasks = tm.getAllRunningTasks();
+			assertEquals(0, tasks.size());
+			
+			// the class loaders should be de-registered
+			assertNull(LibraryCacheManager.getClassLoader(jid));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static JobManager getJobManagerMockBase() {
+		JobManager jm = mock(JobManager.class);
+		
+		final InstanceID iid = new InstanceID();
+		
+		when(jm.registerTaskManager(Matchers.any(InstanceConnectionInfo.class), Matchers.any(HardwareDescription.class), Matchers.anyInt()))
+			.thenReturn(iid);
+		
+		when(jm.sendHeartbeat(iid)).thenReturn(true);
+		
+		return jm;
+	}
+	
+	public static TaskManager createTaskManager(JobManager jm) throws Exception {
+		InetAddress localhost = InetAddress.getLoopbackAddress();
+		InetSocketAddress jmMockAddress = new InetSocketAddress(localhost, 55443);
+		
+		Configuration cfg = new Configuration();
+		cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
+		GlobalConfiguration.includeConfiguration(cfg);
+		
+		return new TaskManager(ExecutionMode.LOCAL, jm, jm, jm, jm, jmMockAddress, localhost);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class TestInvokableCorrect extends AbstractInvokable {
+
+		@Override
+		public void registerInputOutput() {}
+
+		@Override
+		public void invoke() {}
+	}
+	
+	public static final class TestInvokableBlockingCancelable extends AbstractInvokable {
+
+		@Override
+		public void registerInputOutput() {}
+
+		@Override
+		public void invoke() throws Exception {
+			Object o = new Object();
+			synchronized (o) {
+				o.wait();
+			}
+		}
+	}
+	
+	public static final class Sender extends AbstractInvokable {
+
+		private RecordWriter<IntegerRecord> writer;
+		
+		@Override
+		public void registerInputOutput() {
+			writer = new RecordWriter<IntegerRecord>(this);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			writer.initializeSerializers();
+			writer.emit(new IntegerRecord(42));
+			writer.emit(new IntegerRecord(1337));
+			writer.flush();
+		}
+	}
+	
+	public static final class Receiver extends AbstractInvokable {
+
+		private RecordReader<IntegerRecord> reader;
+		
+		@Override
+		public void registerInputOutput() {
+			reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			IntegerRecord i1 = reader.next();
+			IntegerRecord i2 = reader.next();
+			IntegerRecord i3 = reader.next();
+			
+			if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) {
+				throw new Exception("Wrong Data Received");
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
new file mode 100644
index 0000000..d225483
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.execution.RuntimeEnvironment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.protocols.AccumulatorProtocol;
+import org.apache.flink.util.ExceptionUtils;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+public class TaskTest {
+
+	@Test
+	public void testTaskStates() {
+		try {
+			final JobID jid = new JobID();
+			final JobVertexID vid = new JobVertexID();
+			final ExecutionAttemptID eid = new ExecutionAttemptID();
+			
+			final TaskManager taskManager = mock(TaskManager.class);
+			final RuntimeEnvironment env = mock(RuntimeEnvironment.class);
+			
+			Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager);
+			task.setEnvironment(env);
+			
+			assertEquals(ExecutionState2.DEPLOYING, task.getExecutionState());
+			
+			// cancel
+			task.cancelExecution();
+			assertEquals(ExecutionState2.CANCELED, task.getExecutionState());
+			
+			// cannot go into running or finished state
+			
+			assertFalse(task.startExecution());
+			assertEquals(ExecutionState2.CANCELED, task.getExecutionState());
+			
+			assertFalse(task.markAsFinished());
+			assertEquals(ExecutionState2.CANCELED, task.getExecutionState());
+			
+			task.markFailed(new Exception("test"));
+			assertTrue(ExecutionState2.CANCELED == task.getExecutionState());
+			
+			verify(taskManager, times(1)).notifyExecutionStateChange(jid, eid, ExecutionState2.CANCELED, null);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTaskStartFinish() {
+		try {
+			final JobID jid = new JobID();
+			final JobVertexID vid = new JobVertexID();
+			final ExecutionAttemptID eid = new ExecutionAttemptID();
+			
+			final TaskManager taskManager = mock(TaskManager.class);
+			
+			
+			final Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager);
+			
+			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+			
+			Thread operation = new Thread() {
+				public void run() {
+					try {
+						assertTrue(task.markAsFinished());
+					}
+					catch (Throwable t) {
+						error.set(t);
+					}
+				}
+			};
+			
+			final RuntimeEnvironment env = mock(RuntimeEnvironment.class);
+			when(env.getExecutingThread()).thenReturn(operation);
+			
+			assertEquals(ExecutionState2.DEPLOYING, task.getExecutionState());
+			
+			// start the execution
+			task.setEnvironment(env);
+			task.startExecution();
+			
+			// wait for the execution to be finished
+			operation.join();
+			
+			if (error.get() != null) {
+				ExceptionUtils.rethrow(error.get());
+			}
+			
+			assertEquals(ExecutionState2.FINISHED, task.getExecutionState());
+			
+			verify(taskManager).notifyExecutionStateChange(jid, eid, ExecutionState2.FINISHED, null);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTaskFailesInRunning() {
+		try {
+			final JobID jid = new JobID();
+			final JobVertexID vid = new JobVertexID();
+			final ExecutionAttemptID eid = new ExecutionAttemptID();
+			
+			final TaskManager taskManager = mock(TaskManager.class);
+			
+			final Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager);
+			
+			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+			
+			Thread operation = new Thread() {
+				public void run() {
+					try {
+						task.markFailed(new Exception("test exception message"));
+					}
+					catch (Throwable t) {
+						error.set(t);
+					}
+				}
+			};
+			
+			final RuntimeEnvironment env = mock(RuntimeEnvironment.class);
+			when(env.getExecutingThread()).thenReturn(operation);
+			
+			assertEquals(ExecutionState2.DEPLOYING, task.getExecutionState());
+			
+			// start the execution
+			task.setEnvironment(env);
+			task.startExecution();
+			
+			// wait for the execution to be finished
+			operation.join();
+			
+			if (error.get() != null) {
+				ExceptionUtils.rethrow(error.get());
+			}
+			
+			// make sure the final state is correct and the task manager knows the changes
+			assertEquals(ExecutionState2.FAILED, task.getExecutionState());
+			verify(taskManager).notifyExecutionStateChange(Matchers.eq(jid), Matchers.eq(eid), Matchers.eq(ExecutionState2.FAILED), Matchers.anyString());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTaskCanceledInRunning() {
+		try {
+			final JobID jid = new JobID();
+			final JobVertexID vid = new JobVertexID();
+			final ExecutionAttemptID eid = new ExecutionAttemptID();
+			
+			final TaskManager taskManager = mock(TaskManager.class);
+			
+			final Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager);
+			
+			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+			
+			// latches to create a deterministic order of events
+			final OneShotLatch toRunning = new OneShotLatch();
+			final OneShotLatch afterCanceling = new OneShotLatch();
+			
+			Thread operation = new Thread() {
+				public void run() {
+					try {
+						toRunning.trigger();
+						afterCanceling.await();
+						assertFalse(task.markAsFinished());
+						task.cancelingDone();
+					}
+					catch (Throwable t) {
+						error.set(t);
+					}
+				}
+			};
+			
+			final RuntimeEnvironment env = mock(RuntimeEnvironment.class);
+			when(env.getExecutingThread()).thenReturn(operation);
+			
+			assertEquals(ExecutionState2.DEPLOYING, task.getExecutionState());
+			
+			// start the execution
+			task.setEnvironment(env);
+			task.startExecution();
+			
+			toRunning.await();
+			task.cancelExecution();
+			afterCanceling.trigger();
+			
+			// wait for the execution to be finished
+			operation.join();
+			
+			if (error.get() != null) {
+				ExceptionUtils.rethrow(error.get());
+			}
+			
+			// make sure the final state is correct and the task manager knows the changes
+			assertEquals(ExecutionState2.CANCELED, task.getExecutionState());
+			verify(taskManager).notifyExecutionStateChange(jid, eid, ExecutionState2.CANCELED, null);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTaskWithEnvironment() {
+		try {
+			final JobID jid = new JobID();
+			final JobVertexID vid = new JobVertexID();
+			final ExecutionAttemptID eid = new ExecutionAttemptID();
+			
+			final TaskManager taskManager = mock(TaskManager.class);
+			
+			TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7,
+					new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(),
+					Collections.<GateDeploymentDescriptor>emptyList(), 
+					Collections.<GateDeploymentDescriptor>emptyList(),
+					new String[0], 0);
+			
+			Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager);
+			
+			RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, getClass().getClassLoader(),
+					mock(MemoryManager.class), mock(IOManager.class), mock(InputSplitProvider.class),
+					mock(AccumulatorProtocol.class));
+			
+			task.setEnvironment(env);
+			
+			assertEquals(ExecutionState2.DEPLOYING, task.getExecutionState());
+			
+			task.startExecution();
+			task.getEnvironment().getExecutingThread().join();
+			
+			assertEquals(ExecutionState2.FINISHED, task.getExecutionState());
+			
+			verify(taskManager).notifyExecutionStateChange(jid, eid, ExecutionState2.FINISHED, null);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTaskWithEnvironmentAndException() {
+		try {
+			final JobID jid = new JobID();
+			final JobVertexID vid = new JobVertexID();
+			final ExecutionAttemptID eid = new ExecutionAttemptID();
+			
+			final TaskManager taskManager = mock(TaskManager.class);
+			
+			TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7,
+					new Configuration(), new Configuration(), TestInvokableWithException.class.getName(),
+					Collections.<GateDeploymentDescriptor>emptyList(), 
+					Collections.<GateDeploymentDescriptor>emptyList(),
+					new String[0], 0);
+			
+			Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager);
+			
+			RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, getClass().getClassLoader(),
+					mock(MemoryManager.class), mock(IOManager.class), mock(InputSplitProvider.class),
+					mock(AccumulatorProtocol.class));
+			
+			task.setEnvironment(env);
+			
+			assertEquals(ExecutionState2.DEPLOYING, task.getExecutionState());
+			
+			task.startExecution();
+			task.getEnvironment().getExecutingThread().join();
+			
+			assertEquals(ExecutionState2.FAILED, task.getExecutionState());
+			
+			verify(taskManager).notifyExecutionStateChange(Matchers.eq(jid), Matchers.eq(eid), Matchers.eq(ExecutionState2.FAILED), Matchers.anyString());
+			verify(taskManager, times(0)).notifyExecutionStateChange(jid, eid, ExecutionState2.CANCELING, null);
+			verify(taskManager, times(0)).notifyExecutionStateChange(jid, eid, ExecutionState2.CANCELED, null);
+			verify(taskManager, times(0)).notifyExecutionStateChange(jid, eid, ExecutionState2.FINISHED, null);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class TestInvokableCorrect extends AbstractInvokable {
+
+		@Override
+		public void registerInputOutput() {}
+
+		@Override
+		public void invoke() {}
+	}
+	
+	public static final class TestInvokableWithException extends AbstractInvokable {
+
+		@Override
+		public void registerInputOutput() {}
+
+		@Override
+		public void invoke() throws Exception {
+			throw new Exception("test exception");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/DoubleSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/DoubleSourceTask.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/DoubleSourceTask.java
deleted file mode 100644
index c98536e..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/DoubleSourceTask.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.testutils.tasks;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.fs.LineReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-
-public class DoubleSourceTask extends AbstractInvokable {
-
-	private RecordWriter<StringRecord> output1 = null;
-
-	private RecordWriter<StringRecord> output2 = null;
-
-	@Override
-	public void invoke() throws Exception {
-		this.output1.initializeSerializers();
-		this.output2.initializeSerializers();
-
-		final Iterator<FileInputSplit> splitIterator = getInputSplits();
-
-		while (splitIterator.hasNext()) {
-
-			final FileInputSplit split = splitIterator.next();
-
-			final long start = split.getStart();
-			final long length = split.getLength();
-
-			final FileSystem fs = FileSystem.get(split.getPath().toUri());
-
-			final FSDataInputStream fdis = fs.open(split.getPath());
-
-			final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
-
-			byte[] line = lineReader.readLine();
-
-			while (line != null) {
-
-				// Create a string object from the data read
-				StringRecord str = new StringRecord();
-				str.set(line);
-
-				// Send out string
-				output1.emit(str);
-				output2.emit(str);
-
-				line = lineReader.readLine();
-			}
-
-			// Close the stream;
-			lineReader.close();
-		}
-
-		this.output1.flush();
-		this.output2.flush();
-	}
-
-	@Override
-	public void registerInputOutput() {
-		this.output1 = new RecordWriter<StringRecord>(this);
-		this.output2 = new RecordWriter<StringRecord>(this);
-	}
-
-	private Iterator<FileInputSplit> getInputSplits() {
-
-		final InputSplitProvider provider = getEnvironment().getInputSplitProvider();
-
-		return new Iterator<FileInputSplit>() {
-
-			private FileInputSplit nextSplit;
-			
-			private boolean exhausted;
-
-			@Override
-			public boolean hasNext() {
-				if (exhausted) {
-					return false;
-				}
-				
-				if (nextSplit != null) {
-					return true;
-				}
-				
-				FileInputSplit split = (FileInputSplit) provider.getNextInputSplit();
-				
-				if (split != null) {
-					this.nextSplit = split;
-					return true;
-				}
-				else {
-					exhausted = true;
-					return false;
-				}
-			}
-
-			@Override
-			public FileInputSplit next() {
-				if (this.nextSplit == null && !hasNext()) {
-					throw new NoSuchElementException();
-				}
-
-				final FileInputSplit tmp = this.nextSplit;
-				this.nextSplit = null;
-				return tmp;
-			}
-
-			@Override
-			public void remove() {
-				throw new UnsupportedOperationException();
-			}
-		};
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/FileLineReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/FileLineReader.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/FileLineReader.java
deleted file mode 100644
index 500f9a1..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/FileLineReader.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.testutils.tasks;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.fs.LineReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-
-/**
- * A file line reader reads the associated file input splits line by line and outputs the lines as string records.
- * 
- */
-public class FileLineReader extends AbstractInvokable {
-
-	private RecordWriter<StringRecord> output = null;
-
-	@Override
-	public void invoke() throws Exception {
-
-		output.initializeSerializers();
-
-		final Iterator<FileInputSplit> splitIterator = getInputSplits();
-
-		while (splitIterator.hasNext()) {
-
-			final FileInputSplit split = splitIterator.next();
-
-			long start = split.getStart();
-			long length = split.getLength();
-
-			final FileSystem fs = FileSystem.get(split.getPath().toUri());
-
-			final FSDataInputStream fdis = fs.open(split.getPath());
-
-			final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
-
-			byte[] line = lineReader.readLine();
-
-			while (line != null) {
-
-				// Create a string object from the data read
-				StringRecord str = new StringRecord();
-				str.set(line);
-
-				// Send out string
-				output.emit(str);
-
-				line = lineReader.readLine();
-			}
-
-			// Close the stream;
-			lineReader.close();
-		}
-
-		this.output.flush();
-	}
-
-	@Override
-	public void registerInputOutput() {
-		output = new RecordWriter<StringRecord>(this);
-	}
-	
-	private Iterator<FileInputSplit> getInputSplits() {
-
-		final InputSplitProvider provider = getEnvironment().getInputSplitProvider();
-
-		return new Iterator<FileInputSplit>() {
-
-			private FileInputSplit nextSplit;
-			
-			private boolean exhausted;
-
-			@Override
-			public boolean hasNext() {
-				if (exhausted) {
-					return false;
-				}
-				
-				if (nextSplit != null) {
-					return true;
-				}
-				
-				FileInputSplit split = (FileInputSplit) provider.getNextInputSplit();
-				
-				if (split != null) {
-					this.nextSplit = split;
-					return true;
-				}
-				else {
-					exhausted = true;
-					return false;
-				}
-			}
-
-			@Override
-			public FileInputSplit next() {
-				if (this.nextSplit == null && !hasNext()) {
-					throw new NoSuchElementException();
-				}
-
-				final FileInputSplit tmp = this.nextSplit;
-				this.nextSplit = null;
-				return tmp;
-			}
-
-			@Override
-			public void remove() {
-				throw new UnsupportedOperationException();
-			}
-		};
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/FileLineWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/FileLineWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/FileLineWriter.java
deleted file mode 100644
index 529ae06..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/FileLineWriter.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.testutils.tasks;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.io.network.api.RecordReader;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-/**
- * A file line writer reads string records its input gate and writes them to the associated output file.
- * 
- */
-public class FileLineWriter extends AbstractInvokable {
-	/**
-	 * The record reader through which incoming string records are received.
-	 */
-	private RecordReader<StringRecord> input = null;
-
-
-	@Override
-	public void invoke() throws Exception {
-
-		final Configuration conf = getEnvironment().getTaskConfiguration();
-		final String outputPathString = conf.getString(JobFileOutputVertex.PATH_PROPERTY, null);
-		
-		Path outputPath = new Path(outputPathString);
-
-		FileSystem fs = FileSystem.get(outputPath.toUri());
-		if (fs.exists(outputPath)) {
-			FileStatus status = fs.getFileStatus(outputPath);
-
-			if (status.isDir()) {
-				outputPath = new Path(outputPath.toUri().toString() + "/file_" + getIndexInSubtaskGroup() + ".txt");
-			}
-		}
-
-		final FSDataOutputStream outputStream = fs.create(outputPath, true);
-
-		while (this.input.hasNext()) {
-
-			StringRecord record = this.input.next();
-			byte[] recordByte = (record.toString() + "\r\n").getBytes();
-			outputStream.write(recordByte, 0, recordByte.length);
-		}
-
-		outputStream.close();
-
-	}
-
-	@Override
-	public void registerInputOutput() {
-		this.input = new RecordReader<StringRecord>(this, StringRecord.class);
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/JobFileInputVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/JobFileInputVertex.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/JobFileInputVertex.java
deleted file mode 100644
index 944e8f1..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/JobFileInputVertex.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.testutils.tasks;
-
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.core.fs.BlockLocation;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.jobgraph.AbstractJobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-
-public final class JobFileInputVertex extends AbstractJobInputVertex {
-
-	/**
-	 * The fraction that the last split may be larger than the others.
-	 */
-	private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
-	
-	/**
-	 * The path pointing to the input file/directory.
-	 */
-	private Path path;
-
-
-	public JobFileInputVertex(String name, JobVertexID id, JobGraph jobGraph) {
-		super(name, id, jobGraph);
-	}
-	
-	/**
-	 * Creates a new job file input vertex with the specified name.
-	 * 
-	 * @param name
-	 *        the name of the new job file input vertex
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public JobFileInputVertex(String name, JobGraph jobGraph) {
-		this(name, null, jobGraph);
-	}
-
-	/**
-	 * Creates a new job file input vertex.
-	 * 
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public JobFileInputVertex(JobGraph jobGraph) {
-		this(null, jobGraph);
-	}
-
-	/**
-	 * Sets the path of the file the job file input vertex's task should read from.
-	 * 
-	 * @param path
-	 *        the path of the file the job file input vertex's task should read from
-	 */
-	public void setFilePath(final Path path) {
-		this.path = path;
-	}
-
-	/**
-	 * Returns the path of the file the job file input vertex's task should read from.
-	 * 
-	 * @return the path of the file the job file input vertex's task should read from or <code>null</code> if no path
-	 *         has yet been set
-	 */
-	public Path getFilePath() {
-		return this.path;
-	}
-
-	@Override
-	public void read(final DataInputView in) throws IOException {
-		super.read(in);
-
-		// Read path of the input file
-		final boolean isNotNull = in.readBoolean();
-		if (isNotNull) {
-			this.path = new Path();
-			this.path.read(in);
-		}
-	}
-
-	@Override
-	public void write(final DataOutputView out) throws IOException {
-		super.write(out);
-
-		// Write out the path of the input file
-		if (this.path == null) {
-			out.writeBoolean(false);
-		} else {
-			out.writeBoolean(true);
-			this.path.write(out);
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-
-	@Override
-	public InputSplit[] getInputSplits(int minNumSplits) throws Exception {
-		final Path path = this.path;
-		final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>();
-
-		// get all the files that are involved in the splits
-		final List<FileStatus> files = new ArrayList<FileStatus>();
-		long totalLength = 0;
-
-		final FileSystem fs = path.getFileSystem();
-		final FileStatus pathFile = fs.getFileStatus(path);
-
-		if (pathFile.isDir()) {
-			// input is directory. list all contained files
-			final FileStatus[] dir = fs.listStatus(path);
-			for (int i = 0; i < dir.length; i++) {
-				if (!dir[i].isDir()) {
-					files.add(dir[i]);
-					totalLength += dir[i].getLen();
-				}
-			}
-
-		} else {
-			files.add(pathFile);
-			totalLength += pathFile.getLen();
-		}
-
-		final long minSplitSize = 1;
-		final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE : (totalLength / minNumSplits +
-					(totalLength % minNumSplits == 0 ? 0 : 1));
-
-		// now that we have the files, generate the splits
-		int splitNum = 0;
-		for (final FileStatus file : files) {
-
-			final long len = file.getLen();
-			final long blockSize = file.getBlockSize();
-
-			final long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
-			final long halfSplit = splitSize >>> 1;
-
-			final long maxBytesForLastSplit = (long) (splitSize * MAX_SPLIT_SIZE_DISCREPANCY);
-
-			if (len > 0) {
-
-				// get the block locations and make sure they are in order with respect to their offset
-				final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len);
-				Arrays.sort(blocks);
-
-				long bytesUnassigned = len;
-				long position = 0;
-
-				int blockIndex = 0;
-
-				while (bytesUnassigned > maxBytesForLastSplit) {
-					// get the block containing the majority of the data
-					blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
-					// create a new split
-					final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position, splitSize,
-						blocks[blockIndex]
-							.getHosts());
-					inputSplits.add(fis);
-
-					// adjust the positions
-					position += splitSize;
-					bytesUnassigned -= splitSize;
-				}
-
-				// assign the last split
-				if (bytesUnassigned > 0) {
-					blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
-					final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position,
-						bytesUnassigned,
-						blocks[blockIndex].getHosts());
-					inputSplits.add(fis);
-				}
-			} else {
-				// special case with a file of zero bytes size
-				final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, 0);
-				String[] hosts;
-				if (blocks.length > 0) {
-					hosts = blocks[0].getHosts();
-				} else {
-					hosts = new String[0];
-				}
-				final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, 0, hosts);
-				inputSplits.add(fis);
-			}
-		}
-
-		return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
-	}
-
-	/**
-	 * Retrieves the index of the <tt>BlockLocation</tt> that contains the part of the file described by the given
-	 * offset.
-	 * 
-	 * @param blocks
-	 *        The different blocks of the file. Must be ordered by their offset.
-	 * @param offset
-	 *        The offset of the position in the file.
-	 * @param startIndex
-	 *        The earliest index to look at.
-	 * @return The index of the block containing the given position.
-	 */
-	private final int getBlockIndexForPosition(final BlockLocation[] blocks, final long offset,
-			final long halfSplitSize, final int startIndex) {
-		
-		// go over all indexes after the startIndex
-		for (int i = startIndex; i < blocks.length; i++) {
-			long blockStart = blocks[i].getOffset();
-			long blockEnd = blockStart + blocks[i].getLength();
-
-			if (offset >= blockStart && offset < blockEnd) {
-				// got the block where the split starts
-				// check if the next block contains more than this one does
-				if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
-					return i + 1;
-				} else {
-					return i;
-				}
-			}
-		}
-		throw new IllegalArgumentException("The given offset is not contained in the any block.");
-	}
-
-
-	@Override
-	public Class<FileInputSplit> getInputSplitType() {
-		return FileInputSplit.class;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/JobFileOutputVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/JobFileOutputVertex.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/JobFileOutputVertex.java
deleted file mode 100644
index 6f8ce85..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/tasks/JobFileOutputVertex.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.testutils.tasks;
-
-import java.io.IOException;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.jobgraph.AbstractJobOutputVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-
-public class JobFileOutputVertex extends AbstractJobOutputVertex {
-
-	public static final String PATH_PROPERTY = "outputPath";
-	
-	/**
-	 * The path pointing to the output file/directory.
-	 */
-	private Path path;
-
-
-	public JobFileOutputVertex(String name, JobVertexID id, JobGraph jobGraph) {
-		super(name, id, jobGraph);
-	}
-	
-	/**
-	 * Creates a new job file output vertex with the specified name.
-	 * 
-	 * @param name
-	 *        the name of the new job file output vertex
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public JobFileOutputVertex(String name, JobGraph jobGraph) {
-		this(name, null, jobGraph);
-	}
-
-	/**
-	 * Creates a new job file input vertex.
-	 * 
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public JobFileOutputVertex(JobGraph jobGraph) {
-		this(null, jobGraph);
-	}
-
-	/**
-	 * Sets the path of the file the job file input vertex's task should write to.
-	 * 
-	 * @param path
-	 *        the path of the file the job file input vertex's task should write to
-	 */
-	public void setFilePath(Path path) {
-		this.path = path;
-		getConfiguration().setString(PATH_PROPERTY, path.toString());
-	}
-
-	/**
-	 * Returns the path of the file the job file output vertex's task should write to.
-	 * 
-	 * @return the path of the file the job file output vertex's task should write to or <code>null</code> if no path
-	 *         has yet been set
-	 */
-	public Path getFilePath() {
-		return this.path;
-	}
-
-	@Override
-	public void read(final DataInputView in) throws IOException {
-		super.read(in);
-
-		// Read path of the input file
-		boolean isNotNull = in.readBoolean();
-		if (isNotNull) {
-			this.path = new Path();
-			this.path.read(in);
-		}
-	}
-
-	@Override
-	public void write(final DataOutputView out) throws IOException {
-		super.write(out);
-
-		// Write out the path of the input file
-		if (this.path == null) {
-			out.writeBoolean(false);
-		} else {
-			out.writeBoolean(true);
-			this.path.write(out);
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/util/DelegatingConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DelegatingConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DelegatingConfigurationTest.java
new file mode 100644
index 0000000..f55f1cf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DelegatingConfigurationTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.util;
+
+import static org.junit.Assert.assertTrue;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.util.TaskConfig.DelegatingConfiguration;
+import org.junit.Test;
+
+
+public class DelegatingConfigurationTest {
+
+	/**
+	 * http://stackoverflow.com/questions/22225663/checking-in-a-unit-test-whether-all-methods-are-delegated
+	 */
+	@Test
+	public void testIfDelegatesImplementAllMethods() throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+
+		Comparator<Method> methodComparator = new Comparator<Method>() {
+			@Override
+			public int compare(Method o1, Method o2) {
+				String o1Str = o1.getName() + typeParamToString(o1.getParameterTypes());
+				String o2Str = o2.getName() + typeParamToString(o2.getParameterTypes());
+				return o1Str.compareTo( o2Str ); 
+			}
+
+			private String typeParamToString(Class<?>[] classes) {
+				String str = "";
+				for(Object t : classes) {
+					str += t.toString();
+				}
+				return str;
+			}
+		};
+		
+		// For each method in the Configuration class...
+		Method[] confMethods = Configuration.class.getDeclaredMethods();
+		Method[] delegateMethods = DelegatingConfiguration.class.getDeclaredMethods();
+		Arrays.sort(confMethods, methodComparator);
+		Arrays.sort(delegateMethods, methodComparator);
+		match : for (Method configurationMethod : confMethods) {
+			boolean hasMethod = false;
+			if(!Modifier.isPublic(configurationMethod.getModifiers()) ) {
+				continue;
+			}
+			// Find matching method in wrapper class and call it
+			mismatch: for (Method wrapperMethod : delegateMethods) {
+				if (configurationMethod.getName().equals(wrapperMethod.getName())) {
+					
+					// Get parameters for method
+					Class<?>[] wrapperMethodParams = wrapperMethod.getParameterTypes();
+					Class<?>[] configMethodParams = configurationMethod.getParameterTypes();
+					if(wrapperMethodParams.length != configMethodParams.length) {
+						System.err.println("Length");
+						break mismatch;
+					}
+					for(int i = 0; i < wrapperMethodParams.length; i++) {
+						if(wrapperMethodParams[i] != configMethodParams[i]) {
+							break mismatch;
+						}
+					}
+					hasMethod = true;
+					break match;
+				}
+			}
+			assertTrue("Foo method '" + configurationMethod.getName() + "' has not been wrapped correctly in DelegatingConfiguration wrapper", hasMethod);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestDelegatingConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestDelegatingConfiguration.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestDelegatingConfiguration.java
deleted file mode 100644
index 5fdf433..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestDelegatingConfiguration.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.util;
-
-import static org.junit.Assert.assertTrue;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.Comparator;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.util.TaskConfig.DelegatingConfiguration;
-import org.junit.Test;
-
-
-public class TestDelegatingConfiguration {
-
-	/**
-	 * http://stackoverflow.com/questions/22225663/checking-in-a-unit-test-whether-all-methods-are-delegated
-	 */
-	@Test
-	public void testIfDelegatesImplementAllMethods() throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
-
-		Comparator<Method> methodComparator = new Comparator<Method>() {
-			@Override
-			public int compare(Method o1, Method o2) {
-				String o1Str = o1.getName() + typeParamToString(o1.getParameterTypes());
-				String o2Str = o2.getName() + typeParamToString(o2.getParameterTypes());
-				return o1Str.compareTo( o2Str ); 
-			}
-
-			private String typeParamToString(Class<?>[] classes) {
-				String str = "";
-				for(Object t : classes) {
-					str += t.toString();
-				}
-				return str;
-			}
-		};
-		
-		// For each method in the Configuration class...
-		Method[] confMethods = Configuration.class.getDeclaredMethods();
-		Method[] delegateMethods = DelegatingConfiguration.class.getDeclaredMethods();
-		Arrays.sort(confMethods, methodComparator);
-		Arrays.sort(delegateMethods, methodComparator);
-		match : for (Method configurationMethod : confMethods) {
-			boolean hasMethod = false;
-			if(!Modifier.isPublic(configurationMethod.getModifiers()) ) {
-				continue;
-			}
-			// Find matching method in wrapper class and call it
-			mismatch: for (Method wrapperMethod : delegateMethods) {
-				if (configurationMethod.getName().equals(wrapperMethod.getName())) {
-					
-					// Get parameters for method
-					Class<?>[] wrapperMethodParams = wrapperMethod.getParameterTypes();
-					Class<?>[] configMethodParams = configurationMethod.getParameterTypes();
-					if(wrapperMethodParams.length != configMethodParams.length) {
-						System.err.println("Length");
-						break mismatch;
-					}
-					for(int i = 0; i < wrapperMethodParams.length; i++) {
-						if(wrapperMethodParams[i] != configMethodParams[i]) {
-							break mismatch;
-						}
-					}
-					hasMethod = true;
-					break match;
-				}
-			}
-			assertTrue("Foo method '" + configurationMethod.getName() + "' has not been wrapped correctly in DelegatingConfiguration wrapper", hasMethod);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
index a409222..33112af 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
@@ -227,9 +227,9 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 	// -------------------------------------------------------------------------------------------------------------
 
 	@SuppressWarnings("unchecked")
-	private static InputFormatInputVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
+	private static InputFormatVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
 		CsvInputFormat pointsInFormat = new CsvInputFormat(' ', LongValue.class, LongValue.class, LongValue.class, LongValue.class);
-		InputFormatInputVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "Input[Points]", jobGraph, numSubTasks);
+		InputFormatVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "Input[Points]", jobGraph, numSubTasks);
 
 		{
 			TaskConfig taskConfig = new TaskConfig(pointsInput.getConfiguration());
@@ -241,9 +241,9 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 	}
 
 	@SuppressWarnings("unchecked")
-	private static InputFormatInputVertex createModelsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
+	private static InputFormatVertex createModelsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
 		CsvInputFormat modelsInFormat = new CsvInputFormat(' ', LongValue.class, LongValue.class, LongValue.class, LongValue.class);
-		InputFormatInputVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, pointsPath, "Input[Models]", jobGraph, numSubTasks);
+		InputFormatVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, pointsPath, "Input[Models]", jobGraph, numSubTasks);
 
 		{
 			TaskConfig taskConfig = new TaskConfig(modelsInput.getConfiguration());
@@ -278,8 +278,8 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 		return pointsInput;
 	}
 
-	private static OutputFormatOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
-		OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
+	private static OutputFormatVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
+		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
 
 		{
 			TaskConfig taskConfig = new TaskConfig(output.getConfiguration());
@@ -308,10 +308,10 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 		JobGraph jobGraph = new JobGraph("Distance Builder");
 
 		// -- vertices ---------------------------------------------------------------------------------------------
-		InputFormatInputVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
-		InputFormatInputVertex models = createModelsInput(jobGraph, centersPath, numSubTasks, serializer);
+		InputFormatVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
+		InputFormatVertex models = createModelsInput(jobGraph, centersPath, numSubTasks, serializer);
 		JobTaskVertex mapper = createMapper(jobGraph, numSubTasks, serializer);
-		OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
+		OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
 
 		// -- edges ------------------------------------------------------------------------------------------------
 		JobGraphUtils.connect(points, mapper, ChannelType.NETWORK, DistributionPattern.POINTWISE);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
index 4d46b16..678a7e5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
@@ -32,12 +32,9 @@ import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
 import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
 import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatInputVertex;
+import org.apache.flink.runtime.jobgraph.InputFormatVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobTaskVertex;
-import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex;
-import org.apache.flink.runtime.jobgraph.SimpleOutputVertex;
 import org.apache.flink.runtime.operators.CollectorMapDriver;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.GroupReduceDriver;
@@ -96,10 +93,10 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 	// Job vertex builder methods
 	// -------------------------------------------------------------------------------------------------------------
 
-	private static InputFormatInputVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
+	private static InputFormatVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
 		@SuppressWarnings("unchecked")
 		CsvInputFormat pointsInFormat = new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class);
-		InputFormatInputVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "[Points]", jobGraph, numSubTasks);
+		InputFormatVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "[Points]", jobGraph, numSubTasks);
 		{
 			TaskConfig taskConfig = new TaskConfig(pointsInput.getConfiguration());
 			taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
@@ -117,10 +114,10 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		return pointsInput;
 	}
 
-	private static InputFormatInputVertex createCentersInput(JobGraph jobGraph, String centersPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
+	private static InputFormatVertex createCentersInput(JobGraph jobGraph, String centersPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
 		@SuppressWarnings("unchecked")
 		CsvInputFormat modelsInFormat = new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class);
-		InputFormatInputVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, centersPath, "[Models]", jobGraph, numSubTasks);
+		InputFormatVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, centersPath, "[Models]", jobGraph, numSubTasks);
 
 		{
 			TaskConfig taskConfig = new TaskConfig(modelsInput.getConfiguration());
@@ -139,9 +136,9 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		return modelsInput;
 	}
 
-	private static OutputFormatOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
+	private static OutputFormatVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
 		
-		OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
+		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
 
 		{
 			TaskConfig taskConfig = new TaskConfig(output.getConfiguration());
@@ -250,13 +247,13 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		tailConfig.setOutputSerializer(outputSerializer);
 		
 		// the udf
-		tailConfig.setStubWrapper(new UserCodeObjectWrapper<WrappingReduceFunction>(new WrappingReduceFunction(new RecomputeClusterCenter())));
+		tailConfig.setStubWrapper(new UserCodeObjectWrapper<RecomputeClusterCenter>(new RecomputeClusterCenter()));
 		
 		return tail;
 	}
 	
-	private static SimpleOutputVertex createSync(JobGraph jobGraph, int numIterations, int dop) {
-		SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, dop);
+	private static OutputFormatVertex createSync(JobGraph jobGraph, int numIterations, int dop) {
+		OutputFormatVertex sync = JobGraphUtils.createSync(jobGraph, dop);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setNumberOfIterations(numIterations);
 		syncConfig.setIterationId(ITERATION_ID);
@@ -277,19 +274,19 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		JobGraph jobGraph = new JobGraph("KMeans Iterative");
 
 		// -- vertices ---------------------------------------------------------------------------------------------
-		InputFormatInputVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
-		InputFormatInputVertex centers = createCentersInput(jobGraph, centersPath, numSubTasks, serializer);
+		InputFormatVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
+		InputFormatVertex centers = createCentersInput(jobGraph, centersPath, numSubTasks, serializer);
 		
 		JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer);
 		JobTaskVertex mapper = createMapper(jobGraph, numSubTasks, serializer, serializer, serializer, int0Comparator);
 		
 		JobTaskVertex reducer = createReducer(jobGraph, numSubTasks, serializer, int0Comparator, serializer);
 		
-		SimpleOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
+		OutputFormatVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
 		
-		SimpleOutputVertex sync = createSync(jobGraph, numIterations, numSubTasks);
+		OutputFormatVertex sync = createSync(jobGraph, numIterations, numSubTasks);
 		
-		OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
+		OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
 
 		// -- edges ------------------------------------------------------------------------------------------------
 		JobGraphUtils.connect(points, mapper, ChannelType.NETWORK, DistributionPattern.POINTWISE);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
index 8da4e5c..dad2370 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
@@ -174,12 +174,12 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 	// Invariant vertices across all variants
 	// -----------------------------------------------------------------------------------------------------------------
 
-	private static InputFormatInputVertex createVerticesInput(JobGraph jobGraph, String verticesPath, int numSubTasks,
+	private static InputFormatVertex createVerticesInput(JobGraph jobGraph, String verticesPath, int numSubTasks,
 			TypeSerializerFactory<?> serializer,
 			TypeComparatorFactory<?> comparator) {
 		@SuppressWarnings("unchecked")
 		CsvInputFormat verticesInFormat = new CsvInputFormat(' ', LongValue.class);
-		InputFormatInputVertex verticesInput = JobGraphUtils.createInput(verticesInFormat, verticesPath, "VerticesInput",
+		InputFormatVertex verticesInput = JobGraphUtils.createInput(verticesInFormat, verticesPath, "VerticesInput",
 			jobGraph, numSubTasks);
 		TaskConfig verticesInputConfig = new TaskConfig(verticesInput.getConfiguration());
 		{
@@ -327,9 +327,9 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		return intermediate;
 	}
 
-	private static OutputFormatOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks,
+	private static OutputFormatVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks,
 			TypeSerializerFactory<?> serializer) {
-		OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Final Output", numSubTasks);
+		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "Final Output", numSubTasks);
 		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
 		{
 
@@ -352,14 +352,14 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		return output;
 	}
 
-	private static SimpleOutputVertex createFakeTail(JobGraph jobGraph, int numSubTasks) {
-		SimpleOutputVertex fakeTailOutput =
+	private static OutputFormatVertex createFakeTail(JobGraph jobGraph, int numSubTasks) {
+		OutputFormatVertex fakeTailOutput =
 			JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
 		return fakeTailOutput;
 	}
 
-	private static SimpleOutputVertex createSync(JobGraph jobGraph, int numSubTasks, int maxIterations) {
-		SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
+	private static OutputFormatVertex createSync(JobGraph jobGraph, int numSubTasks, int maxIterations) {
+		OutputFormatVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setNumberOfIterations(maxIterations);
 		syncConfig.setIterationId(ITERATION_ID);
@@ -389,16 +389,16 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		JobGraph jobGraph = new JobGraph("Connected Components (Unified Tails)");
 
 		// -- invariant vertices -----------------------------------------------------------------------------------
-		InputFormatInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
-		InputFormatInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
+		InputFormatVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
+		InputFormatVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
 		JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
 
 		JobTaskVertex intermediate = createIterationIntermediate(jobGraph, numSubTasks, serializer, comparator);
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 
-		OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
-		SimpleOutputVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
-		SimpleOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
+		OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
+		OutputFormatVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
+		OutputFormatVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
 
 		// --------------- the tail (solution set join) ---------------
 		JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
@@ -473,8 +473,8 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		JobGraph jobGraph = new JobGraph("Connected Components (Unified Tails)");
 
 		// input
-		InputFormatInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
-		InputFormatInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
+		InputFormatVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
+		InputFormatVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
 
 		// head
 		JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
@@ -486,10 +486,10 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 
 		// output and auxiliaries
-		OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
-		SimpleOutputVertex ssFakeTail = createFakeTail(jobGraph, numSubTasks);
-		SimpleOutputVertex wsFakeTail = createFakeTail(jobGraph, numSubTasks);
-		SimpleOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
+		OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
+		OutputFormatVertex ssFakeTail = createFakeTail(jobGraph, numSubTasks);
+		OutputFormatVertex wsFakeTail = createFakeTail(jobGraph, numSubTasks);
+		OutputFormatVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
 
 		// ------------------ the intermediate (ss join) ----------------------
 		JobTaskVertex ssJoinIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
@@ -624,8 +624,8 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		JobGraph jobGraph = new JobGraph("Connected Components (Intermediate Workset Update, Solution Set Tail)");
 
 		// input
-		InputFormatInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
-		InputFormatInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
+		InputFormatVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
+		InputFormatVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
 
 		// head
 		JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
@@ -637,9 +637,9 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 
 		// output and auxiliaries
-		OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
-		SimpleOutputVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
-		SimpleOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
+		OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
+		OutputFormatVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
+		OutputFormatVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
 
 		// ------------------ the intermediate (ws update) ----------------------
 		JobTaskVertex wsUpdateIntermediate =
@@ -750,8 +750,8 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		JobGraph jobGraph = new JobGraph("Connected Components (Intermediate Solution Set Update, Workset Tail)");
 
 		// input
-		InputFormatInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
-		InputFormatInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
+		InputFormatVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
+		InputFormatVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
 
 		// head
 		JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
@@ -761,9 +761,9 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 
 		// output and auxiliaries
-		OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
-		SimpleOutputVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
-		SimpleOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
+		OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
+		OutputFormatVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
+		OutputFormatVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
 
 		// ------------------ the intermediate (ss update) ----------------------
 		JobTaskVertex ssJoinIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,