You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/26 11:46:40 UTC

[15/53] [abbrv] Removed RuntimeEnvironment instantiation from execution graph construction. Removed legacy job vertex classes and input/output tasks.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
index 82359f5..cbe1766 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
@@ -351,64 +351,6 @@ public class DataSinkTask<IT> extends AbstractOutputTask {
 			throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent.");
 		}
 	}
-	
-	// ------------------------------------------------------------------------
-	//                     Degree of parallelism & checks
-	// ------------------------------------------------------------------------
-	
-
-	@Override
-	public int getMaximumNumberOfSubtasks() {
-		if (!(this.format instanceof FileOutputFormat<?>)) {
-			return -1;
-		}
-		
-		final FileOutputFormat<?> fileOutputFormat = (FileOutputFormat<?>) this.format;
-		
-		// ----------------- This code applies only to file inputs ------------------
-		
-		final Path path = fileOutputFormat.getOutputFilePath();
-		final WriteMode writeMode = fileOutputFormat.getWriteMode();
-		final OutputDirectoryMode outDirMode = fileOutputFormat.getOutputDirectoryMode();
-
-		// Prepare output path and determine max DOP		
-		try {
-			
-			int dop = getTaskConfiguration().getInteger(DEGREE_OF_PARALLELISM_KEY, -1);
-			final FileSystem fs = path.getFileSystem();
-			
-			if(dop == 1 && outDirMode == OutputDirectoryMode.PARONLY) {
-				// output is not written in parallel and should be written to a single file.
-				
-				if(fs.isDistributedFS()) {
-					// prepare distributed output path
-					if(!fs.initOutPathDistFS(path, writeMode, false)) {
-						// output preparation failed! Cancel task.
-						throw new IOException("Output path could not be initialized.");
-					}
-				}
-				
-				return 1;
-				
-			} else {
-				// output should be written to a directory
-				
-				if(fs.isDistributedFS()) {
-					// only distributed file systems can be initialized at start-up time.
-					if(!fs.initOutPathDistFS(path, writeMode, true)) {
-						throw new IOException("Output directory could not be created.");
-					}
-				}
-				
-				return -1;
-				
-			}
-		}
-		catch (IOException e) {
-			LOG.error("Could not access the file system to detemine the status of the output.", e);
-			throw new RuntimeException("I/O Error while accessing file", e);
-		}
-	}
 
 	// ------------------------------------------------------------------------
 	//                               Utilities

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java
index af176b9..f835ace 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java
@@ -78,27 +78,12 @@ public class DataSourceTask<OT> extends AbstractInputTask<InputSplit> {
 	@Override
 	public void registerInputOutput()
 	{
-		if (LOG.isDebugEnabled()) {
+		initInputFormat();
+
+		if (LOG.isDebugEnabled())
 			LOG.debug(getLogString("Start registering input and output"));
 		}
 
-		if (this.userCodeClassLoader == null) {
-			try {
-				this.userCodeClassLoader = LibraryCacheManager.getClassLoader(getEnvironment().getJobID());
-			}
-			catch (IOException ioe) {
-				throw new RuntimeException("Usercode ClassLoader could not be obtained for job: " + 
-							getEnvironment().getJobID(), ioe);
-			}
-		}
-		
-		// obtain task configuration (including stub parameters)
-		Configuration taskConf = getTaskConfiguration();
-		taskConf.setClassLoader(this.userCodeClassLoader);
-		this.config = new TaskConfig(taskConf);
-		
-		initInputFormat(this.userCodeClassLoader);
-		
 		try {
 			initOutputs(this.userCodeClassLoader);
 		} catch (Exception ex) {
@@ -301,17 +286,42 @@ public class DataSourceTask<OT> extends AbstractInputTask<InputSplit> {
 
 	/**
 	 * Initializes the InputFormat implementation and configuration.
-	 * 
+l	 * 
 	 * @throws RuntimeException
 	 *         Throws if instance of InputFormat implementation can not be
 	 *         obtained.
 	 */
-	private void initInputFormat(ClassLoader cl) {
-		// instantiate the stub
-		@SuppressWarnings("unchecked")
-		Class<InputFormat<OT, InputSplit>> superClass = (Class<InputFormat<OT, InputSplit>>) (Class<?>) InputFormat.class;
-		this.format = RegularPactTask.instantiateUserCode(this.config, cl, superClass);
-		
+	private void initInputFormat() {
+		if (this.userCodeClassLoader == null) {
+			try {
+				this.userCodeClassLoader = LibraryCacheManager.getClassLoader(getEnvironment().getJobID());
+			}
+			catch (IOException ioe) {
+				throw new RuntimeException("Usercode ClassLoader could not be obtained for job: " +
+						getEnvironment().getJobID(), ioe);
+			}
+		}
+
+		// obtain task configuration (including stub parameters)
+		Configuration taskConf = getTaskConfiguration();
+		taskConf.setClassLoader(this.userCodeClassLoader);
+		this.config = new TaskConfig(taskConf);
+
+		try {
+			this.format = config.<InputFormat<OT, InputSplit>>getStubWrapper(this.userCodeClassLoader)
+					.getUserCodeObject(InputFormat.class, this.userCodeClassLoader);
+
+			// check if the class is a subclass, if the check is required
+			if (!InputFormat.class.isAssignableFrom(this.format.getClass())) {
+				throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" +
+						InputFormat.class.getName() + "' as is required.");
+			}
+		}
+		catch (ClassCastException ccex) {
+			throw new RuntimeException("The stub class is not a proper subclass of " + InputFormat.class.getName(),
+					ccex);
+		}
+
 		// configure the stub. catch exceptions here extra, to report them as originating from the user code 
 		try {
 			this.format.configure(this.config.getStubParameters());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java
index a43f8cc..2eb003d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java
@@ -246,6 +246,10 @@ public class TaskConfig {
 	public String getTaskName() {
 		return this.config.getString(TASK_NAME, null);
 	}
+
+	public boolean hasStubWrapper() {
+		return this.config.containsKey(STUB_OBJECT);
+	}
 	
 	
 	public void setStubWrapper(UserCodeWrapper<?> wrapper) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptorTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptorTest.java
index 7000667..dc2e605 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptorTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptorTest.java
@@ -19,12 +19,12 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 
+import eu.stratosphere.pact.runtime.task.RegularPactTask;
 import org.junit.Test;
 
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.util.FileLineReader;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.nephele.util.SerializableArrayList;
@@ -50,7 +50,7 @@ public class TaskDeploymentDescriptorTest {
 		final int currentNumberOfSubtasks = 1;
 		final Configuration jobConfiguration = new Configuration();
 		final Configuration taskConfiguration = new Configuration();
-		final Class<? extends AbstractInvokable> invokableClass = FileLineReader.class;
+		final Class<? extends AbstractInvokable> invokableClass =  RegularPactTask.class;
 		final SerializableArrayList<GateDeploymentDescriptor> outputGates = new SerializableArrayList<GateDeploymentDescriptor>(
 			0);
 		final SerializableArrayList<GateDeploymentDescriptor> inputGates = new SerializableArrayList<GateDeploymentDescriptor>(
@@ -85,7 +85,7 @@ public class TaskDeploymentDescriptorTest {
 		final int currentNumberOfSubtasks = 1;
 		final Configuration jobConfiguration = new Configuration();
 		final Configuration taskConfiguration = new Configuration();
-		final Class<? extends AbstractInvokable> invokableClass = FileLineReader.class;
+		final Class<? extends AbstractInvokable> invokableClass = RegularPactTask.class;
 		final SerializableArrayList<GateDeploymentDescriptor> outputGates = new SerializableArrayList<GateDeploymentDescriptor>(
 			0);
 		final SerializableArrayList<GateDeploymentDescriptor> inputGates = new SerializableArrayList<GateDeploymentDescriptor>(
@@ -239,7 +239,7 @@ public class TaskDeploymentDescriptorTest {
 		final int currentNumberOfSubtasks = 1;
 		final Configuration jobConfiguration = new Configuration();
 		final Configuration taskConfiguration = new Configuration();
-		final Class<? extends AbstractInvokable> invokableClass = FileLineReader.class;
+		final Class<? extends AbstractInvokable> invokableClass = RegularPactTask.class;
 		final SerializableArrayList<GateDeploymentDescriptor> outputGates = new SerializableArrayList<GateDeploymentDescriptor>(
 			0);
 		final SerializableArrayList<GateDeploymentDescriptor> inputGates = new SerializableArrayList<GateDeploymentDescriptor>(

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossInputTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossInputTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossInputTask.java
deleted file mode 100644
index 1ce23e6..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossInputTask.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.executiongraph;
-
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.runtime.io.api.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractFileInputTask;
-
-/**
- * This class represents the data source in the self cross unit test.
- * 
- */
-public class SelfCrossInputTask extends AbstractFileInputTask {
-
-
-	@Override
-	public void registerInputOutput() {
-
-		new RecordWriter<StringRecord>(this);
-		new RecordWriter<StringRecord>(this);
-	}
-
-
-	@Override
-	public void invoke() throws Exception {
-
-		// Nothing to do here
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleSourceTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleSourceTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleSourceTask.java
deleted file mode 100644
index 1e2be47..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleSourceTask.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobmanager;
-
-import java.util.Iterator;
-
-import eu.stratosphere.core.fs.FSDataInputStream;
-import eu.stratosphere.core.fs.FileInputSplit;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.runtime.io.api.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractFileInputTask;
-import eu.stratosphere.runtime.fs.LineReader;
-
-public class DoubleSourceTask extends AbstractFileInputTask {
-
-	private RecordWriter<StringRecord> output1 = null;
-
-	private RecordWriter<StringRecord> output2 = null;
-
-	@Override
-	public void invoke() throws Exception {
-		this.output1.initializeSerializers();
-		this.output2.initializeSerializers();
-
-		final Iterator<FileInputSplit> splitIterator = getFileInputSplits();
-
-		while (splitIterator.hasNext()) {
-
-			final FileInputSplit split = splitIterator.next();
-
-			final long start = split.getStart();
-			final long length = split.getLength();
-
-			final FileSystem fs = FileSystem.get(split.getPath().toUri());
-
-			final FSDataInputStream fdis = fs.open(split.getPath());
-
-			final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
-
-			byte[] line = lineReader.readLine();
-
-			while (line != null) {
-
-				// Create a string object from the data read
-				StringRecord str = new StringRecord();
-				str.set(line);
-
-				// Send out string
-				output1.emit(str);
-				output2.emit(str);
-
-				line = lineReader.readLine();
-			}
-
-			// Close the stream;
-			lineReader.close();
-		}
-
-		this.output1.flush();
-		this.output2.flush();
-	}
-
-	@Override
-	public void registerInputOutput() {
-		this.output1 = new RecordWriter<StringRecord>(this);
-		this.output2 = new RecordWriter<StringRecord>(this);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java
index f0ca435..a1ce0b2 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java
@@ -13,18 +13,18 @@
 
 package eu.stratosphere.nephele.jobmanager;
 
-import eu.stratosphere.core.io.StringRecord;
 import eu.stratosphere.runtime.io.api.RecordReader;
 import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.types.Record;
 
 public class DoubleTargetTask extends AbstractTask {
 
-	private RecordReader<StringRecord> input1 = null;
+	private RecordReader<Record> input1 = null;
 
-	private RecordReader<StringRecord> input2 = null;
+	private RecordReader<Record> input2 = null;
 
-	private RecordWriter<StringRecord> output = null;
+	private RecordWriter<Record> output = null;
 
 	@Override
 	public void invoke() throws Exception {
@@ -33,13 +33,13 @@ public class DoubleTargetTask extends AbstractTask {
 
 		while (this.input1.hasNext()) {
 
-			StringRecord s = input1.next();
+			Record s = input1.next();
 			this.output.emit(s);
 		}
 
 		while (this.input2.hasNext()) {
 
-			StringRecord s = input2.next();
+			Record s = input2.next();
 			this.output.emit(s);
 		}
 
@@ -49,9 +49,9 @@ public class DoubleTargetTask extends AbstractTask {
 
 	@Override
 	public void registerInputOutput() {
-		this.input1 = new RecordReader<StringRecord>(this, StringRecord.class);
-		this.input2 = new RecordReader<StringRecord>(this, StringRecord.class);
-		this.output = new RecordWriter<StringRecord>(this);
+		this.input1 = new RecordReader<Record>(this, Record.class);
+		this.input2 = new RecordReader<Record>(this, Record.class);
+		this.output = new RecordWriter<Record>(this);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java
index 96be668..377e304 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java
@@ -13,15 +13,15 @@
 
 package eu.stratosphere.nephele.jobmanager;
 
-import eu.stratosphere.core.io.StringRecord;
 import eu.stratosphere.runtime.io.api.RecordReader;
 import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.types.Record;
 
 public class ForwardTask extends AbstractTask {
 
-	private RecordReader<StringRecord> input = null;
-	private RecordWriter<StringRecord> output = null;
+	private RecordReader<Record> input = null;
+	private RecordWriter<Record> output = null;
 
 	@Override
 	public void invoke() throws Exception {
@@ -30,7 +30,7 @@ public class ForwardTask extends AbstractTask {
 
 		while (this.input.hasNext()) {
 
-			StringRecord s = input.next();
+			Record s = input.next();
 			this.output.emit(s);
 		}
 
@@ -39,7 +39,7 @@ public class ForwardTask extends AbstractTask {
 
 	@Override
 	public void registerInputOutput() {
-		this.input = new RecordReader<StringRecord>(this, StringRecord.class);
-		this.output = new RecordWriter<StringRecord>(this);
+		this.input = new RecordReader<Record>(this, Record.class);
+		this.output = new RecordWriter<Record>(this);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
index 124a24d..209eff1 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
@@ -18,6 +18,7 @@ import eu.stratosphere.runtime.io.api.MutableRecordReader;
 import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.runtime.io.api.UnionRecordReader;
 import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.types.Record;
 
 /**
  * A simple implementation of a task using a {@link UnionRecordReader}.
@@ -27,21 +28,22 @@ public class UnionTask extends AbstractTask {
 	/**
 	 * The union record reader to be used during the tests.
 	 */
-	private UnionRecordReader<StringRecord> unionReader;
+	private UnionRecordReader<Record> unionReader;
 
-	private RecordWriter<StringRecord> writer;
+	private RecordWriter<Record> writer;
 	
 	
 	@Override
 	public void registerInputOutput() {
 
 		@SuppressWarnings("unchecked")
-		MutableRecordReader<StringRecord>[] recordReaders = (MutableRecordReader<StringRecord>[]) new MutableRecordReader<?>[2];
-		recordReaders[0] = new MutableRecordReader<StringRecord>(this);
-		recordReaders[1] = new MutableRecordReader<StringRecord>(this);
-		this.unionReader = new UnionRecordReader<StringRecord>(recordReaders, StringRecord.class);
+		MutableRecordReader<Record>[] recordReaders = (MutableRecordReader<Record>[]) new
+				MutableRecordReader<?>[2];
+		recordReaders[0] = new MutableRecordReader<Record>(this);
+		recordReaders[1] = new MutableRecordReader<Record>(this);
+		this.unionReader = new UnionRecordReader<Record>(recordReaders, Record.class);
 		
-		this.writer = new RecordWriter<StringRecord>(this);
+		this.writer = new RecordWriter<Record>(this);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java
index c8bcddc..6a41fe9 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java
@@ -95,6 +95,47 @@ public class DefaultSchedulerTest {
 
 	}
 
+	public static final class DummyInputFormat extends GenericInputFormat<IntValue> {
+
+		@Override
+		public boolean reachedEnd() throws IOException {
+			return true;
+		}
+
+		@Override
+		public IntValue nextRecord(IntValue reuse) throws IOException {
+			return null;
+		}
+	}
+
+	public static final class DummyOutputFormat implements OutputFormat<IntValue> {
+
+		@Override
+		public void configure(Configuration parameters) {
+
+		}
+
+		@Override
+		public void open(int taskNumber, int numTasks) throws IOException {
+
+		}
+
+		@Override
+		public void writeRecord(IntValue record) throws IOException {
+
+		}
+
+		@Override
+		public void close() throws IOException {
+
+		}
+
+		@Override
+		public void initialize(Configuration configuration) {
+
+		}
+	}
+
 	/**
 	 * Constructs a sample execution graph consisting of two vertices connected by a channel of the given type.
 	 * 
@@ -108,10 +149,12 @@ public class DefaultSchedulerTest {
 
 		final JobInputVertex inputVertex = new JobInputVertex("Input 1", jobGraph);
 		inputVertex.setInputClass(InputTask.class);
+		inputVertex.setInputFormat(new DummyInputFormat());
 		inputVertex.setNumberOfSubtasks(1);
 
 		final JobOutputVertex outputVertex = new JobOutputVertex("Output 1", jobGraph);
 		outputVertex.setOutputClass(OutputTask.class);
+		outputVertex.setOutputFormat(new DummyOutputFormat());
 		outputVertex.setNumberOfSubtasks(1);
 
 		try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java
deleted file mode 100644
index fcb4fa1..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.util;
-
-import java.util.Iterator;
-
-import eu.stratosphere.core.fs.FSDataInputStream;
-import eu.stratosphere.core.fs.FileInputSplit;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.runtime.io.api.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractFileInputTask;
-import eu.stratosphere.runtime.fs.LineReader;
-
-/**
- * A file line reader reads the associated file input splits line by line and outputs the lines as string records.
- * 
- */
-public class FileLineReader extends AbstractFileInputTask {
-
-	private RecordWriter<StringRecord> output = null;
-
-	@Override
-	public void invoke() throws Exception {
-
-		output.initializeSerializers();
-
-		final Iterator<FileInputSplit> splitIterator = getFileInputSplits();
-
-		while (splitIterator.hasNext()) {
-
-			final FileInputSplit split = splitIterator.next();
-
-			long start = split.getStart();
-			long length = split.getLength();
-
-			final FileSystem fs = FileSystem.get(split.getPath().toUri());
-
-			final FSDataInputStream fdis = fs.open(split.getPath());
-
-			final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
-
-			byte[] line = lineReader.readLine();
-
-			while (line != null) {
-
-				// Create a string object from the data read
-				StringRecord str = new StringRecord();
-				str.set(line);
-
-				// Send out string
-				output.emit(str);
-
-				line = lineReader.readLine();
-			}
-
-			// Close the stream;
-			lineReader.close();
-		}
-
-		this.output.flush();
-	}
-
-	@Override
-	public void registerInputOutput() {
-		output = new RecordWriter<StringRecord>(this);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java
deleted file mode 100644
index bc738df..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.util;
-
-import eu.stratosphere.core.fs.FSDataOutputStream;
-import eu.stratosphere.core.fs.FileStatus;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.runtime.io.api.RecordReader;
-import eu.stratosphere.nephele.template.AbstractFileOutputTask;
-
-/**
- * A file line writer reads string records its input gate and writes them to the associated output file.
- * 
- */
-public class FileLineWriter extends AbstractFileOutputTask {
-
-	/**
-	 * The record reader through which incoming string records are received.
-	 */
-	private RecordReader<StringRecord> input = null;
-
-
-	@Override
-	public void invoke() throws Exception {
-
-		Path outputPath = getFileOutputPath();
-
-		FileSystem fs = FileSystem.get(outputPath.toUri());
-		if (fs.exists(outputPath)) {
-			FileStatus status = fs.getFileStatus(outputPath);
-
-			if (status.isDir()) {
-				outputPath = new Path(outputPath.toUri().toString() + "/file_" + getIndexInSubtaskGroup() + ".txt");
-			}
-		}
-
-		final FSDataOutputStream outputStream = fs.create(outputPath, true);
-
-		while (this.input.hasNext()) {
-
-			StringRecord record = this.input.next();
-			byte[] recordByte = (record.toString() + "\r\n").getBytes();
-			outputStream.write(recordByte, 0, recordByte.length);
-		}
-
-		outputStream.close();
-
-	}
-
-
-	@Override
-	public void registerInputOutput() {
-		this.input = new RecordReader<StringRecord>(this, StringRecord.class);
-	}
-
-
-	@Override
-	public int getMaximumNumberOfSubtasks() {
-		// The default implementation always returns -1
-		return -1;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/library/FileLineReadWriteTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/library/FileLineReadWriteTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/library/FileLineReadWriteTest.java
deleted file mode 100644
index 17c2f58..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/library/FileLineReadWriteTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.library;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-import java.io.File;
-import java.io.IOException;
-
-import eu.stratosphere.runtime.io.api.RecordWriter;
-import eu.stratosphere.nephele.util.FileLineReader;
-import eu.stratosphere.nephele.util.FileLineWriter;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.fs.FileInputSplit;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.execution.Environment;
-import eu.stratosphere.runtime.io.api.RecordReader;
-import eu.stratosphere.nephele.template.InputSplitProvider;
-
-/**
- * This class checks the functionality of the {@link eu.stratosphere.nephele.util.FileLineReader} and the {@link eu.stratosphere.nephele.util.FileLineWriter} class.
- * 
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(FileLineReader.class)
-public class FileLineReadWriteTest {
-
-	@Mock
-	private Environment environment;
-
-	@Mock
-	private Configuration conf;
-
-	@Mock
-	private RecordReader<StringRecord> recordReader;
-
-	@Mock
-	private RecordWriter<StringRecord> recordWriter;
-
-	@Mock
-	private InputSplitProvider inputSplitProvider;
-
-	private File file = new File("./tmp");
-
-	/**
-	 * Set up mocks
-	 * 
-	 * @throws IOException
-	 */
-	@Before
-	public void before() throws Exception {
-
-		MockitoAnnotations.initMocks(this);
-	}
-
-	/**
-	 * remove the temporary file
-	 */
-	@After
-	public void after() {
-		this.file.delete();
-	}
-
-	/**
-	 * Tests the read and write methods
-	 * 
-	 * @throws Exception
-	 */
-	@Test
-	public void testReadWrite() throws Exception {
-
-		this.file.createNewFile();
-		FileLineWriter writer = new FileLineWriter();
-		Whitebox.setInternalState(writer, "environment", this.environment);
-		Whitebox.setInternalState(writer, "input", this.recordReader);
-		when(this.environment.getTaskConfiguration()).thenReturn(this.conf);
-
-		when(this.conf.getString("outputPath", null)).thenReturn(this.file.toURI().toString());
-		when(this.recordReader.hasNext()).thenReturn(true, true, true, false);
-		StringRecord in = new StringRecord("abc");
-		try {
-			when(this.recordReader.next()).thenReturn(in);
-		} catch (IOException e) {
-			fail();
-			e.printStackTrace();
-		} catch (InterruptedException e) {
-			fail();
-			e.printStackTrace();
-		}
-		writer.invoke();
-
-		final FileInputSplit split = new FileInputSplit(0, new Path(this.file.toURI().toString()), 0,
-			this.file.length(), null);
-		when(this.environment.getInputSplitProvider()).thenReturn(this.inputSplitProvider);
-		when(this.inputSplitProvider.getNextInputSplit()).thenReturn(split, (FileInputSplit) null);
-
-		FileLineReader reader = new FileLineReader();
-		Whitebox.setInternalState(reader, "environment", this.environment);
-		Whitebox.setInternalState(reader, "output", this.recordWriter);
-		StringRecord record = mock(StringRecord.class);
-
-		whenNew(StringRecord.class).withNoArguments().thenReturn(record);
-
-		reader.invoke();
-
-		// verify the correct bytes have been written and read
-		verify(record, times(3)).set(in.getBytes());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java
index 57253d1..3de547e 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java
@@ -44,4 +44,7 @@ public class DiscardingOutputFormat implements OutputFormat<Record> {
 	@Override
 	public void close() throws IOException
 	{}
+
+	@Override
+	public void initialize(Configuration configuration){}
 }