You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/22 23:47:35 UTC

[14/22] git commit: Removed RuntimeEnvironment instantiation from execution graph construction. Removed legacy job vertex classes and input/output tasks.

Removed RuntimeEnvironment instantiation from execution graph construction. Removed legacy job vertex classes and input/output tasks.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ea79186b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ea79186b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ea79186b

Branch: refs/heads/master
Commit: ea79186b7ef787991fa1c4dbfa29f26c7aefd804
Parents: 429493d
Author: Till Rohrmann <ti...@mailbox.tu-berlin.de>
Authored: Wed Mar 26 02:58:15 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jun 22 21:07:20 2014 +0200

----------------------------------------------------------------------
 .../plantranslate/NepheleJobGraphGenerator.java |  33 +--
 .../api/common/io/FileOutputFormat.java         |  38 ++++
 .../api/common/io/OutputFormat.java             |   7 +
 .../configuration/Configuration.java            |  12 ++
 .../api/java/io/PrintingOutputFormat.java       |   3 +
 .../nephele/execution/RuntimeEnvironment.java   |  56 ++----
 .../nephele/executiongraph/ExecutionGraph.java  |  86 ++------
 .../executiongraph/ExecutionGroupVertex.java    |  53 ++---
 .../jobgraph/AbstractJobInputVertex.java        |   7 +
 .../nephele/jobgraph/AbstractJobVertex.java     |  62 +-----
 .../nephele/jobgraph/JobFileInputVertex.java    | 195 ------------------
 .../nephele/jobgraph/JobFileOutputVertex.java   | 198 ------------------
 .../nephele/jobgraph/JobGenericInputVertex.java | 168 ----------------
 .../jobgraph/JobGenericOutputVertex.java        | 182 -----------------
 .../nephele/jobgraph/JobInputVertex.java        |  90 ++++++++-
 .../nephele/jobgraph/JobOutputVertex.java       |  56 ++++++
 .../nephele/jobgraph/JobTaskVertex.java         |  17 --
 .../splitassigner/InputSplitManager.java        |  13 +-
 .../LocatableInputSplitAssigner.java            |  14 +-
 .../file/FileInputSplitAssigner.java            |  14 +-
 .../nephele/template/AbstractFileInputTask.java | 201 -------------------
 .../template/AbstractFileOutputTask.java        |  46 -----
 .../template/AbstractGenericInputTask.java      |  39 ----
 .../nephele/template/AbstractInputTask.java     |  23 ---
 .../nephele/template/AbstractInvokable.java     |  33 ---
 .../nephele/template/GenericInputTask.java      |  39 ----
 .../pact/runtime/task/DataSinkTask.java         |  58 ------
 .../pact/runtime/task/DataSourceTask.java       |  60 +++---
 .../pact/runtime/task/util/TaskConfig.java      |   4 +
 .../TaskDeploymentDescriptorTest.java           |   8 +-
 .../executiongraph/SelfCrossInputTask.java      |  41 ----
 .../nephele/jobmanager/DoubleSourceTask.java    |  81 --------
 .../nephele/jobmanager/DoubleTargetTask.java    |  18 +-
 .../nephele/jobmanager/ForwardTask.java         |  12 +-
 .../nephele/jobmanager/UnionTask.java           |  16 +-
 .../scheduler/queue/DefaultSchedulerTest.java   |  43 ++++
 .../nephele/util/FileLineReader.java            |  80 --------
 .../nephele/util/FileLineWriter.java            |  75 -------
 .../io/library/FileLineReadWriteTest.java       | 136 -------------
 .../recordJobs/util/DiscardingOutputFormat.java |   3 +
 40 files changed, 412 insertions(+), 1908 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
index b4c7560..200ef7c 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -20,7 +20,14 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
+
+import eu.stratosphere.api.common.io.InputFormat;
+import eu.stratosphere.api.common.io.OutputFormat;
+import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
+import eu.stratosphere.core.io.InputSplit;
+import eu.stratosphere.nephele.template.AbstractInputTask;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import eu.stratosphere.api.common.aggregators.AggregatorRegistry;
 import eu.stratosphere.api.common.aggregators.AggregatorWithName;
@@ -805,31 +812,31 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 
 	private JobInputVertex createDataSourceVertex(SourcePlanNode node) throws CompilerException {
 		final JobInputVertex vertex = new JobInputVertex(node.getNodeName(), this.jobGraph);
-		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
-		
+
 		// set task class
 		@SuppressWarnings("unchecked")
-		final Class<AbstractInputTask<?>> clazz = (Class<AbstractInputTask<?>>) (Class<?>) DataSourceTask.class;
+		final Class<AbstractInputTask<?>> clazz = (Class<AbstractInputTask<?>>) (Class<?>) DataSourceTask
+				.class;
 		vertex.setInputClass(clazz);
 
 		// set user code
-		config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
-		config.setStubParameters(node.getPactContract().getParameters());
-		
-		config.setOutputSerializer(node.getSerializer());
+		vertex.setInputFormat((UserCodeWrapper<? extends InputFormat<?, InputSplit>>)node.getPactContract()
+				.getUserCodeWrapper());
+		vertex.setInputFormatParameters(node.getPactContract().getParameters());
+		vertex.setOutputSerializer(node.getSerializer());
 		return vertex;
 	}
 
 	private AbstractJobOutputVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException {
 		final JobOutputVertex vertex = new JobOutputVertex(node.getNodeName(), this.jobGraph);
-		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
-		
+
 		vertex.setOutputClass(DataSinkTask.class);
 		vertex.getConfiguration().setInteger(DataSinkTask.DEGREE_OF_PARALLELISM_KEY, node.getDegreeOfParallelism());
-		
+
 		// set user code
-		config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
-		config.setStubParameters(node.getPactContract().getParameters());
+		vertex.setOutputFormat((UserCodeWrapper<? extends OutputFormat<?>>)node.getPactContract().getUserCodeWrapper
+				());
+		vertex.setOutputFormatParameters(node.getPactContract().getParameters());
 		
 		return vertex;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java
index b04ced9..7733c71 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java
@@ -437,4 +437,42 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
 			super(targetConfig);
 		}
 	}
+
+	@Override
+	public void initialize(Configuration configuration){
+		final Path path = this.getOutputFilePath();
+		final WriteMode writeMode = this.getWriteMode();
+		final OutputDirectoryMode outDirMode = this.getOutputDirectoryMode();
+
+		// Prepare output path and determine max DOP
+		try {
+			final FileSystem fs = path.getFileSystem();
+
+			int dop = configuration.getInteger(DEGREE_OF_PARALLELISM_KEY, -1);
+			if(dop == 1 && outDirMode == OutputDirectoryMode.PARONLY) {
+				// output is not written in parallel and should be written to a single file.
+
+				if(fs.isDistributedFS()) {
+					// prepare distributed output path
+					if(!fs.initOutPathDistFS(path, writeMode, false)) {
+						// output preparation failed! Cancel task.
+						throw new IOException("Output path could not be initialized.");
+					}
+				}
+			} else {
+				// output should be written to a directory
+
+				if(fs.isDistributedFS()) {
+					// only distributed file systems can be initialized at start-up time.
+					if(!fs.initOutPathDistFS(path, writeMode, true)) {
+						throw new IOException("Output directory could not be created.");
+					}
+				}
+			}
+		}
+		catch (IOException e) {
+			LOG.error("Could not access the file system to detemine the status of the output.", e);
+			throw new RuntimeException("I/O Error while accessing file", e);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java
index c32be78..3b66902 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java
@@ -79,5 +79,12 @@ public interface OutputFormat<IT> extends Serializable {
 	 * @throws IOException Thrown, if the input could not be closed properly.
 	 */
 	void close() throws IOException;
+
+	/**
+	 * Method which is called on the master node prior to execution. It can be used to set up the output format.
+	 *
+	 * @param configuration The task configuration
+	 */
+	void initialize(Configuration configuration);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java b/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java
index 46cadc3..0271b59 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java
@@ -405,6 +405,18 @@ public class Configuration implements IOReadableWritable {
 			}
 		}
 	}
+
+	/**
+	 * Checks whether there is an entry with key
+	 *
+	 * @param key key of entry
+	 * @return true if entry with key is stored in the configuration, otherwise false
+	 */
+	public boolean containsKey(String key){
+		synchronized (this.confData){
+			return this.confData.containsKey(key);
+		}
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java
index 5c09439..d1736d4 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java
@@ -95,4 +95,7 @@ public class PrintingOutputFormat<T> implements OutputFormat<T> {
 	public String toString() {
 		return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
 	}
+
+	@Override
+	public void initialize(Configuration configuration){}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
index 4e07694..70718a9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
@@ -83,6 +83,12 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	private final List<InputGate<? extends IOReadableWritable>> inputGates = new CopyOnWriteArrayList<InputGate<? extends IOReadableWritable>>();
 
 	/**
+	 * Queue of unbound output gate IDs which are required for deserializing an environment in the course of an RPC
+	 * call.
+	 */
+	private final Queue<GateID> unboundOutputGateIDs = new ArrayDeque<GateID>();
+
+	/**
 	 * Queue of unbound input gate IDs which are required for deserializing an environment in the course of an RPC
 	 * call.
 	 */
@@ -165,46 +171,18 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	private volatile boolean canceled;
 
 	/**
-	 * Creates a new runtime environment object which contains the runtime information for the encapsulated Nephele
-	 * task.
-	 *
-	 * @param jobID             the ID of the original Nephele job
-	 * @param taskName          the name of task running in this environment
-	 * @param invokableClass    invokableClass the class that should be instantiated as a Nephele task
-	 * @param taskConfiguration the configuration object which was attached to the original JobVertex
-	 * @param jobConfiguration  the configuration object which was attached to the original JobGraph
-	 * @throws Exception thrown if an error occurs while instantiating the invokable class
-	 */
-	public RuntimeEnvironment(final JobID jobID, final String taskName,
-							final Class<? extends AbstractInvokable> invokableClass, final Configuration taskConfiguration,
-							final Configuration jobConfiguration)
-		throws Exception
-	{
-		this.jobID = jobID;
-		this.taskName = taskName;
-		this.invokableClass = invokableClass;
-		this.taskConfiguration = taskConfiguration;
-		this.jobConfiguration = jobConfiguration;
-		this.indexInSubtaskGroup = 0;
-		this.currentNumberOfSubtasks = 0;
-		this.memoryManager = null;
-		this.ioManager = null;
-		this.inputSplitProvider = null;
-		this.cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
-		
-		this.invokable = this.invokableClass.newInstance();
-		this.invokable.setEnvironment(this);
-		this.invokable.registerInputOutput();
-	}
-
-	/**
 	 * Constructs a runtime environment from a task deployment description.
-	 *
-	 * @param tdd                the task deployment description
-	 * @param memoryManager      the task manager's memory manager component
-	 * @param ioManager          the task manager's I/O manager component
-	 * @param inputSplitProvider the input split provider for this environment
-	 * @throws Exception thrown if an error occurs while instantiating the invokable class
+	 * 
+	 * @param tdd
+	 *        the task deployment description
+	 * @param memoryManager
+	 *        the task manager's memory manager component
+	 * @param ioManager
+	 *        the task manager's I/O manager component
+	 * @param inputSplitProvider
+	 *        the input split provider for this environment
+	 * @throws Exception
+	 *         thrown if an error occurs while instantiating the invokable class
 	 */
 	public RuntimeEnvironment(final TaskDeploymentDescriptor tdd,
 							final MemoryManager memoryManager, final IOManager ioManager,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
index c5059f9..93e0a25 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
@@ -399,17 +399,6 @@ public class ExecutionGraph implements ExecutionListener {
 			final ExecutionVertex sev = entry.getValue();
 			final ExecutionGroupVertex sgv = sev.getGroupVertex();
 
-			// First compare number of output gates
-			if (sjv.getNumberOfForwardConnections() != sgv.getEnvironment().getNumberOfOutputGates()) {
-				throw new GraphConversionException("Job and execution vertex " + sjv.getName()
-					+ " have different number of outputs");
-			}
-
-			if (sjv.getNumberOfBackwardConnections() != sgv.getEnvironment().getNumberOfInputGates()) {
-				throw new GraphConversionException("Job and execution vertex " + sjv.getName()
-					+ " have different number of inputs");
-			}
-
 			// First, build the group edges
 			for (int i = 0; i < sjv.getNumberOfForwardConnections(); ++i) {
 				final JobEdge edge = sjv.getForwardConnection(i);
@@ -488,16 +477,13 @@ public class ExecutionGraph implements ExecutionListener {
 
 			final InputSplit[] inputSplits;
 
-			// let the task code compute the input splits
-			if (groupVertex.getEnvironment().getInvokable() instanceof AbstractInputTask) {
-				try {
-					inputSplits = ((AbstractInputTask<?>) groupVertex.getEnvironment().getInvokable())
-						.computeInputSplits(jobVertex.getNumberOfSubtasks());
-				} catch (Exception e) {
-					throw new GraphConversionException("Cannot compute input splits for " + groupVertex.getName(), e);
-				}
-			} else {
-				throw new GraphConversionException("JobInputVertex contained a task class which was not an input task.");
+			final Class<? extends InputSplit> inputSplitType = jobInputVertex.getInputSplitType();
+
+			try{
+				inputSplits = jobInputVertex.getInputSplits(jobVertex.getNumberOfSubtasks());
+			}catch(Exception e) {
+				throw new GraphConversionException("Cannot compute input splits for " + groupVertex.getName() + ": "
+						+ StringUtils.stringifyException(e));
 			}
 
 			if (inputSplits == null) {
@@ -507,13 +493,19 @@ public class ExecutionGraph implements ExecutionListener {
 					+ " input splits");
 			}
 
-			// assign input splits
+			// assign input splits and type
 			groupVertex.setInputSplits(inputSplits);
+			groupVertex.setInputSplitType(inputSplitType);
 		}
-		// TODO: This is a quick workaround, problem can be solved in a more generic way
-		if (jobVertex instanceof JobFileOutputVertex) {
-			final JobFileOutputVertex jbov = (JobFileOutputVertex) jobVertex;
-			jobVertex.getConfiguration().setString("outputPath", jbov.getFilePath().toString());
+
+		if(jobVertex instanceof JobOutputVertex){
+			final JobOutputVertex jobOutputVertex = (JobOutputVertex) jobVertex;
+
+			final OutputFormat<?> outputFormat = jobOutputVertex.getOutputFormat();
+
+			if(outputFormat != null){
+				outputFormat.initialize(groupVertex.getConfiguration());
+			}
 		}
 
 		// Add group vertex to initial execution stage
@@ -796,48 +788,6 @@ public class ExecutionGraph implements ExecutionListener {
 	}
 
 	/**
-	 * Retrieves the maximum parallel degree of the job represented by this execution graph
-	 */
-	public int getMaxNumberSubtasks() {
-		int maxDegree = 0;
-		final Iterator<ExecutionStage> stageIterator = this.stages.iterator();
-
-		while(stageIterator.hasNext()){
-			final ExecutionStage stage = stageIterator.next();
-
-			int maxPerStageDegree = stage.getMaxNumberSubtasks();
-
-			if(maxPerStageDegree > maxDegree){
-				maxDegree = maxPerStageDegree;
-			}
-		}
-
-		return maxDegree;
-	}
-
-	/**
-	 * Retrieves the number of required slots to run this execution graph
-	 * @return
-	 */
-	public int getRequiredSlots(){
-		int maxRequiredSlots = 0;
-
-		final Iterator<ExecutionStage> stageIterator = this.stages.iterator();
-
-		while(stageIterator.hasNext()){
-			final ExecutionStage stage = stageIterator.next();
-
-			int requiredSlots = stage.getRequiredSlots();
-
-			if(requiredSlots > maxRequiredSlots){
-				maxRequiredSlots = requiredSlots;
-			}
-		}
-
-		return maxRequiredSlots;
-	}
-
-	/**
 	 * Returns the stage which is currently executed.
 	 * 
 	 * @return the currently executed stage or <code>null</code> if the job execution is already completed

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
index c865609..91e9e53 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
@@ -114,6 +114,11 @@ public final class ExecutionGroupVertex {
 	private volatile InputSplit[] inputSplits = null;
 
 	/**
+	 * Input split type
+	 */
+	private volatile Class<? extends InputSplit> inputSplitType = null;
+
+	/**
 	 * The execution stage this vertex belongs to.
 	 */
 	private volatile ExecutionStage executionStage = null;
@@ -129,11 +134,6 @@ public final class ExecutionGroupVertex {
 	private final Class<? extends AbstractInvokable> invokableClass;
 
 	/**
-	 * The environment created to execute the vertex's task.
-	 */
-	private final RuntimeEnvironment environment;
-
-	/**
 	 * Constructs a new group vertex.
 	 * 
 	 * @param name
@@ -177,9 +177,6 @@ public final class ExecutionGroupVertex {
 		this.executionSignature = signature;
 
 		this.invokableClass = invokableClass;
-
-		this.environment = new RuntimeEnvironment(executionGraph.getJobID(), name, invokableClass, configuration,
-			executionGraph.getJobConfiguration());
 	}
 
 	/**
@@ -192,16 +189,6 @@ public final class ExecutionGroupVertex {
 	}
 
 	/**
-	 * Returns the environment of the instantiated {@link AbstractInvokable} object.
-	 * 
-	 * @return the environment of the instantiated {@link AbstractInvokable} object
-	 */
-	public RuntimeEnvironment getEnvironment() {
-
-		return this.environment;
-	}
-
-	/**
 	 * Sets the execution stage this group vertex is associated with.
 	 * 
 	 * @param executionStage
@@ -407,20 +394,6 @@ public final class ExecutionGroupVertex {
 			}
 		}
 
-		// Make sure the value of newNumber is valid
-		// TODO: Move these checks to some other place
-		/*
-		 * if (this.getMinimumNumberOfGroupMember() < 1) {
-		 * throw new GraphConversionException("The minimum number of members is below 1 for group vertex "
-		 * + this.getName());
-		 * }
-		 * if ((this.getMaximumNumberOfGroupMembers() != -1)
-		 * && (this.getMaximumNumberOfGroupMembers() < this.getMinimumNumberOfGroupMember())) {
-		 * throw new GraphConversionException(
-		 * "The maximum number of members is smaller than the minimum for group vertex " + this.getName());
-		 * }
-		 */
-
 		final ExecutionVertex originalVertex = this.getGroupMember(0);
 		int currentNumberOfExecutionVertices = this.getCurrentNumberOfGroupMembers();
 
@@ -453,6 +426,14 @@ public final class ExecutionGroupVertex {
 	}
 
 	/**
+	 * Sets the input split type class
+	 *
+	 * @param inputSplitType Input split type class
+	 */
+	public void setInputSplitType(final Class<? extends InputSplit> inputSplitType) { this.inputSplitType =
+			inputSplitType; }
+
+	/**
 	 * Returns the input splits assigned to this group vertex.
 	 * 
 	 * @return the input splits, possibly <code>null</code> if the group vertex does not represent an input vertex
@@ -462,6 +443,14 @@ public final class ExecutionGroupVertex {
 		return this.inputSplits;
 	}
 
+	/**
+	 * Returns the input split type class
+	 *
+	 * @return the input split type class, possibly <code>null</code> if the group vertex does not represent an input
+	 * vertex
+	 */
+	public Class<? extends InputSplit> getInputSplitType() { return this.inputSplitType; }
+
 	public ExecutionGroupEdge getForwardEdge(int index) {
 
 		if (index < 0) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java
index 958ed9d..22b4d7c 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java
@@ -13,6 +13,10 @@
 
 package eu.stratosphere.nephele.jobgraph;
 
+import eu.stratosphere.core.io.InputSplit;
+
+import java.io.IOException;
+
 /**
  * An abstract base class for input vertices in Nephele.
  * 
@@ -34,4 +38,7 @@ public abstract class AbstractJobInputVertex extends AbstractJobVertex {
 
 		jobGraph.addVertex(this);
 	}
+
+	public abstract Class<? extends InputSplit> getInputSplitType();
+	public abstract InputSplit[] getInputSplits(int minNumSplits) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
index d64c622..7cec46a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
@@ -573,65 +573,15 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 	}
 
 	/**
-	 * Performs task specific checks if the
-	 * respective task has been configured properly.
-	 * 
-	 * @param invokable
-	 *        an instance of the task this vertex represents
+	 * Performs check whether the vertex has been properly configured
+	 *
+	 * @param configuration
+	 *        configuration of this vertex
 	 * @throws IllegalConfigurationException
 	 *         thrown if the respective tasks is not configured properly
 	 */
-	public void checkConfiguration(final AbstractInvokable invokable) throws IllegalConfigurationException {
-
-		if (invokable == null) {
-			throw new IllegalArgumentException("Argument invokable is null");
-		}
-
-		// see if the task itself has a valid configuration
-		// because this is user code running on the master, we embed it in a catch-all block
-		try {
-			invokable.checkConfiguration();
-		} catch (IllegalConfigurationException icex) {
-			throw icex; // simply forward
-		} catch (Throwable t) {
-			throw new IllegalConfigurationException("Checking the invokable's configuration caused an error: "
-				+ StringUtils.stringifyException(t));
-		}
-	}
-
-	/**
-	 * Returns the minimum number of subtasks the respective task
-	 * must be split into at runtime.
-	 * 
-	 * @param invokable
-	 *        an instance of the task this vertex represents
-	 * @return the minimum number of subtasks the respective task must be split into at runtime
-	 */
-	public int getMinimumNumberOfSubtasks(final AbstractInvokable invokable) {
-
-		if (invokable == null) {
-			throw new IllegalArgumentException("Argument invokable is null");
-		}
-
-		return invokable.getMinimumNumberOfSubtasks();
-	}
-
-	/**
-	 * Returns the maximum number of subtasks the respective task
-	 * can be split into at runtime.
-	 * 
-	 * @param invokable
-	 *        an instance of the task this vertex represents
-	 * @return the maximum number of subtasks the respective task can be split into at runtime, <code>-1</code> for
-	 *         infinity
-	 */
-	public int getMaximumNumberOfSubtasks(final AbstractInvokable invokable) {
-
-		if (invokable == null) {
-			throw new IllegalArgumentException("Argument invokable is null");
-		}
-
-		return invokable.getMaximumNumberOfSubtasks();
+	public void checkConfiguration(final Configuration configuration) throws IllegalConfigurationException {
+		//default configuration check
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobFileInputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobFileInputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobFileInputVertex.java
deleted file mode 100644
index 65685ee..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobFileInputVertex.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobgraph;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.configuration.IllegalConfigurationException;
-import eu.stratosphere.core.fs.FileStatus;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.template.AbstractFileInputTask;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-import eu.stratosphere.util.StringUtils;
-
-/**
- * A JobFileInputVertex is a specific subtype of a {@link AbstractJobInputVertex} and is designed
- * for Nephele tasks which read data from a local or distributed file system. As every job input vertex
- * A JobFileInputVertex must not have any further input.
- * 
- */
-public final class JobFileInputVertex extends AbstractJobInputVertex {
-
-	/**
-	 * The path pointing to the input file/directory.
-	 */
-	private Path path = null;
-
-	/**
-	 * Creates a new job file input vertex with the specified name.
-	 * 
-	 * @param name
-	 *        the name of the new job file input vertex
-	 * @param id
-	 *        the ID of this vertex
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public JobFileInputVertex(final String name, final JobVertexID id, final JobGraph jobGraph) {
-		super(name, id, jobGraph);
-	}
-
-	/**
-	 * Creates a new job file input vertex with the specified name.
-	 * 
-	 * @param name
-	 *        the name of the new job file input vertex
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public JobFileInputVertex(final String name, final JobGraph jobGraph) {
-		super(name, null, jobGraph);
-	}
-
-	/**
-	 * Creates a new job file input vertex.
-	 * 
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public JobFileInputVertex(final JobGraph jobGraph) {
-		super(null, null, jobGraph);
-	}
-
-	/**
-	 * Sets the path of the file the job file input vertex's task should read from.
-	 * 
-	 * @param path
-	 *        the path of the file the job file input vertex's task should read from
-	 */
-	public void setFilePath(final Path path) {
-		this.path = path;
-	}
-
-	/**
-	 * Returns the path of the file the job file input vertex's task should read from.
-	 * 
-	 * @return the path of the file the job file input vertex's task should read from or <code>null</code> if no path
-	 *         has yet been set
-	 */
-	public Path getFilePath() {
-		return this.path;
-	}
-
-	/**
-	 * Sets the class of the vertex's input task.
-	 * 
-	 * @param inputClass
-	 *        the class of the vertex's input task.
-	 */
-	public void setFileInputClass(final Class<? extends AbstractFileInputTask> inputClass) {
-		this.invokableClass = inputClass;
-	}
-
-	/**
-	 * Returns the class of the vertex's input task.
-	 * 
-	 * @return the class of the vertex's input task or <code>null</code> if no task has yet been set
-	 */
-	@SuppressWarnings("unchecked")
-	public Class<? extends AbstractFileInputTask> getFileInputClass() {
-		return (Class<? extends AbstractFileInputTask>) this.invokableClass;
-	}
-
-
-	@Override
-	public void read(final DataInput in) throws IOException {
-		super.read(in);
-
-		// Read path of the input file
-		final boolean isNotNull = in.readBoolean();
-		if (isNotNull) {
-			this.path = new Path();
-			this.path.read(in);
-		}
-	}
-
-
-	@Override
-	public void write(final DataOutput out) throws IOException {
-		super.write(out);
-
-		// Write out the path of the input file
-		if (this.path == null) {
-			out.writeBoolean(false);
-		} else {
-			out.writeBoolean(true);
-			this.path.write(out);
-		}
-
-	}
-
-
-	@Override
-	public void checkConfiguration(final AbstractInvokable invokable) throws IllegalConfigurationException {
-
-		// Check if the user has specified a path
-		if (this.path == null) {
-			throw new IllegalConfigurationException(this.getName() + " does not specify an input path");
-		}
-
-		// Check if the path is valid
-		try {
-			final FileSystem fs = this.path.getFileSystem();
-			final FileStatus f = fs.getFileStatus(this.path);
-			if (f == null) {
-				throw new IOException(this.path.toString() + " led to a null object");
-			}
-		} catch (IOException e) {
-			throw new IllegalConfigurationException("Cannot access file or directory: "
-				+ StringUtils.stringifyException(e));
-		}
-
-		// register the path in the configuration
-		invokable.getTaskConfiguration()
-			.setString(AbstractFileInputTask.INPUT_PATH_CONFIG_KEY, this.path.toString());
-
-		// Finally, see if the task itself has a valid configuration
-		super.checkConfiguration(invokable);
-	}
-
-
-	@Override
-	public int getMaximumNumberOfSubtasks(final AbstractInvokable invokable) {
-
-		int numberOfBlocks = -1;
-
-		if (this.path == null) {
-			return -1;
-		}
-
-		try {
-			final FileSystem fs = this.path.getFileSystem();
-			final FileStatus f = fs.getFileStatus(this.path);
-			numberOfBlocks = fs.getNumberOfBlocks(f);
-
-		} catch (IOException e) {
-			return -1;
-		}
-
-		return (int) Math.min(numberOfBlocks, invokable.getMaximumNumberOfSubtasks());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobFileOutputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobFileOutputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobFileOutputVertex.java
deleted file mode 100644
index 645041a..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobFileOutputVertex.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobgraph;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import eu.stratosphere.configuration.IllegalConfigurationException;
-import eu.stratosphere.core.fs.FileStatus;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.template.AbstractFileOutputTask;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-
-/**
- * A JobFileOutputVertex is a specific subtype of a {@link AbstractJobOutputVertex} and is designed
- * for Nephele tasks which write data to a local or distributed file system. As every job output vertex
- * A JobFileOutputVertex must not have any further output.
- * 
- */
-public class JobFileOutputVertex extends AbstractJobOutputVertex {
-
-	/**
-	 * The path pointing to the output file/directory.
-	 */
-	private Path path = null;
-
-	/**
-	 * Creates a new job file output vertex with the specified name.
-	 * 
-	 * @param name
-	 *        the name of the new job file output vertex
-	 * @param id
-	 *        the ID of this vertex
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public JobFileOutputVertex(final String name, final JobVertexID id, final JobGraph jobGraph) {
-		super(name, id, jobGraph);
-	}
-
-	/**
-	 * Creates a new job file output vertex with the specified name.
-	 * 
-	 * @param name
-	 *        the name of the new job file output vertex
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public JobFileOutputVertex(final String name, final JobGraph jobGraph) {
-		super(name, null, jobGraph);
-	}
-
-	/**
-	 * Creates a new job file input vertex.
-	 * 
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public JobFileOutputVertex(final JobGraph jobGraph) {
-		super(null, null, jobGraph);
-	}
-
-	/**
-	 * Sets the path of the file the job file input vertex's task should write to.
-	 * 
-	 * @param path
-	 *        the path of the file the job file input vertex's task should write to
-	 */
-	public void setFilePath(final Path path) {
-		this.path = path;
-	}
-
-	/**
-	 * Returns the path of the file the job file output vertex's task should write to.
-	 * 
-	 * @return the path of the file the job file output vertex's task should write to or <code>null</code> if no path
-	 *         has yet been set
-	 */
-
-	public Path getFilePath() {
-		return this.path;
-	}
-
-	/**
-	 * Sets the class of the vertex's output task.
-	 * 
-	 * @param outputClass
-	 *        the class of the vertex's output task.
-	 */
-	public void setFileOutputClass(final Class<? extends AbstractFileOutputTask> outputClass) {
-		this.invokableClass = outputClass;
-	}
-
-	/**
-	 * Returns the class of the vertex's output task.
-	 * 
-	 * @return the class of the vertex's output task or <code>null</code> if no task has yet been set
-	 */
-	@SuppressWarnings("unchecked")
-	public Class<? extends AbstractFileOutputTask> getFileOutputClass() {
-		return (Class<? extends AbstractFileOutputTask>) this.invokableClass;
-	}
-
-
-	@Override
-	public void read(final DataInput in) throws IOException {
-		super.read(in);
-
-		// Read path of the input file
-		boolean isNotNull = in.readBoolean();
-		if (isNotNull) {
-			this.path = new Path();
-			this.path.read(in);
-		}
-	}
-
-
-	@Override
-	public void write(final DataOutput out) throws IOException {
-		super.write(out);
-
-		// Write out the path of the input file
-		if (this.path == null) {
-			out.writeBoolean(false);
-		} else {
-			out.writeBoolean(true);
-			this.path.write(out);
-		}
-	}
-
-
-	@Override
-	public void checkConfiguration(final AbstractInvokable invokable) throws IllegalConfigurationException {
-
-		// Check if the user has specified a path
-		if (this.path == null) {
-			throw new IllegalConfigurationException(this.getName() + " does not specify an output path");
-		}
-
-		super.checkConfiguration(invokable);
-	}
-
-
-	@Override
-	public int getMaximumNumberOfSubtasks(final AbstractInvokable invokable) {
-
-		if (this.path == null) {
-			return 0;
-		}
-
-		// Check if the path is valid
-		try {
-			final FileSystem fs = path.getFileSystem();
-
-			try {
-				final FileStatus f = fs.getFileStatus(path);
-
-				if (f == null) {
-					return 1;
-				}
-
-				// If the path points to a directory we allow an infinity number of subtasks
-				if (f.isDir()) {
-					return -1;
-				}
-			} catch (FileNotFoundException fnfex) {
-				// The exception is thrown if the requested file/directory does not exist.
-				// if the degree of parallelism is > 1, we create a directory for this path
-				if (getNumberOfSubtasks() > 1) {
-					fs.mkdirs(path);
-					return -1;
-				} else {
-					// a none existing file and a degree of parallelism that is one
-					return 1;
-				}
-			}
-		} catch (IOException e) {
-			// any other kind of I/O exception: we assume only a degree of one here
-			return 1;
-		}
-
-		return 1;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGenericInputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGenericInputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGenericInputVertex.java
deleted file mode 100644
index 658ea0d..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGenericInputVertex.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobgraph;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.configuration.IllegalConfigurationException;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.template.AbstractInputTask;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-import eu.stratosphere.util.StringUtils;
-
-public class JobGenericInputVertex extends JobInputVertex
-{
-	/**
-	 * Class of input task.
-	 */
-	protected Class<? extends AbstractInputTask<?>> inputClass = null;
-
-	/**
-	 * Creates a new job input vertex with the specified name.
-	 * 
-	 * @param name The name of the new job file input vertex.
-	 * @param id The ID of this vertex.
-	 * @param jobGraph The job graph this vertex belongs to.
-	 */
-	public JobGenericInputVertex(String name, JobVertexID id, JobGraph jobGraph) {
-		super(name, id, jobGraph);
-	}
-
-	/**
-	 * Creates a new job file input vertex with the specified name.
-	 * 
-	 * @param name The name of the new job file input vertex.
-	 * @param jobGraph The job graph this vertex belongs to.
-	 */
-	public JobGenericInputVertex(String name, JobGraph jobGraph) {
-		super(name, null, jobGraph);
-	}
-
-	/**
-	 * Creates a new job file input vertex.
-	 * 
-	 * @param jobGraph The job graph this vertex belongs to.
-	 */
-	public JobGenericInputVertex(JobGraph jobGraph) {
-		super(null, null, jobGraph);
-	}
-
-	/**
-	 * Sets the class of the vertex's input task.
-	 * 
-	 * @param inputClass The class of the vertex's input task.
-	 */
-	public void setInputClass(Class<? extends AbstractInputTask<?>> inputClass) {
-		this.inputClass = inputClass;
-	}
-
-	/**
-	 * Returns the class of the vertex's input task.
-	 * 
-	 * @return the class of the vertex's input task or <code>null</code> if no task has yet been set
-	 */
-	public Class<? extends AbstractInputTask<?>> getInputClass() {
-		return this.inputClass;
-	}
-
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void read(DataInput in) throws IOException
-	{
-		super.read(in);
-
-		// Read class
-		boolean isNotNull = in.readBoolean();
-		if (isNotNull) {
-			// Read the name of the class and try to instantiate the class object
-			final ClassLoader cl = LibraryCacheManager.getClassLoader(this.getJobGraph().getJobID());
-			if (cl == null) {
-				throw new IOException("Cannot find class loader for vertex " + getID());
-			}
-
-			// Read the name of the expected class
-			final String className = StringRecord.readString(in);
-
-			try {
-				this.inputClass = (Class<? extends AbstractInputTask<?>>) Class.forName(className, true, cl).asSubclass(AbstractInputTask.class);
-			}
-			catch (ClassNotFoundException cnfe) {
-				throw new IOException("Class " + className + " not found in one of the supplied jar files: "
-					+ StringUtils.stringifyException(cnfe));
-			}
-			catch (ClassCastException ccex) {
-				throw new IOException("Class " + className + " is not a subclass of "
-					+ AbstractInputTask.class.getName() + ": " + StringUtils.stringifyException(ccex));
-			}
-		}
-	}
-
-
-	@Override
-	public void write(DataOutput out) throws IOException
-	{
-		super.write(out);
-
-		// Write out the name of the class
-		if (this.inputClass == null) {
-			out.writeBoolean(false);
-		} else {
-			out.writeBoolean(true);
-			StringRecord.writeString(out, this.inputClass.getName());
-		}
-	}
-
-
-	@Override
-	public void checkConfiguration(AbstractInvokable invokable) throws IllegalConfigurationException
-	{
-		// see if the task itself has a valid configuration
-		// because this is user code running on the master, we embed it in a catch-all block
-		try {
-			invokable.checkConfiguration();
-		}
-		catch (IllegalConfigurationException icex) {
-			throw icex; // simply forward
-		}
-		catch (Throwable t) {
-			throw new IllegalConfigurationException("Checking the invokable's configuration caused an error: " 
-				+ StringUtils.stringifyException(t));
-		}
-	}
-
-
-	@Override
-	public Class<? extends AbstractInvokable> getInvokableClass() {
-
-		return this.inputClass;
-	}
-
-
-	@Override
-	public int getMaximumNumberOfSubtasks(AbstractInvokable invokable)
-	{
-		return invokable.getMaximumNumberOfSubtasks();
-	}
-
-
-	@Override
-	public int getMinimumNumberOfSubtasks(AbstractInvokable invokable) {
-
-		return invokable.getMinimumNumberOfSubtasks();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGenericOutputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGenericOutputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGenericOutputVertex.java
deleted file mode 100644
index a5b0665..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGenericOutputVertex.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobgraph;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.configuration.IllegalConfigurationException;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
-import eu.stratosphere.util.StringUtils;
-
-/**
- * A JobGenericOutputVertex is a specific subtype of a {@link JobOutputVertex} and is designed
- * for Nephele tasks which sink data in a not further specified way. As every job output vertex,
- * a JobGenericOutputVertex must not have any further output.
- * 
- */
-public class JobGenericOutputVertex extends JobOutputVertex {
-
-	/**
-	 * The class of the output task.
-	 */
-	protected Class<? extends AbstractOutputTask> outputClass = null;
-
-
-	/**
-	 * Creates a new job file output vertex with the specified name.
-	 * 
-	 * @param name
-	 *        the name of the new job file output vertex
-	 * @param id
-	 *        the ID of this vertex
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public JobGenericOutputVertex(String name, JobVertexID id, JobGraph jobGraph) {
-		super(name, id, jobGraph);
-	}
-
-	/**
-	 * Creates a new job file output vertex with the specified name.
-	 * 
-	 * @param name
-	 *        the name of the new job file output vertex
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public JobGenericOutputVertex(String name, JobGraph jobGraph) {
-		super(name, null, jobGraph);
-	}
-
-	/**
-	 * Creates a new job file input vertex.
-	 * 
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public JobGenericOutputVertex(JobGraph jobGraph) {
-		super(null, null, jobGraph);
-	}
-
-	/**
-	 * Sets the class of the vertex's output task.
-	 * 
-	 * @param outputClass The class of the vertex's output task.
-	 */
-	public void setOutputClass(Class<? extends AbstractOutputTask> outputClass) {
-		this.outputClass = outputClass;
-	}
-
-	/**
-	 * Returns the class of the vertex's output task.
-	 * 
-	 * @return The class of the vertex's output task or <code>null</code> if no task has yet been set.
-	 */
-	public Class<? extends AbstractOutputTask> getOutputClass() {
-		return this.outputClass;
-	}
-
-
-	@Override
-	public void read(DataInput in) throws IOException {
-		super.read(in);
-
-		// Read class
-		boolean isNotNull = in.readBoolean();
-		if (isNotNull) {
-
-			// Read the name of the class and try to instantiate the class object
-			final ClassLoader cl = LibraryCacheManager.getClassLoader(this.getJobGraph().getJobID());
-			if (cl == null) {
-				throw new IOException("Cannot find class loader for vertex " + getID());
-			}
-
-			// Read the name of the expected class
-			final String className = StringRecord.readString(in);
-
-			try {
-				this.outputClass = Class.forName(className, true, cl).asSubclass(AbstractOutputTask.class);
-			}
-			catch (ClassNotFoundException cnfe) {
-				throw new IOException("Class " + className + " not found in one of the supplied jar files: "
-					+ StringUtils.stringifyException(cnfe));
-			}
-			catch (ClassCastException ccex) {
-				throw new IOException("Class " + className + " is not a subclass of "
-					+ AbstractOutputTask.class.getName() + ": " + StringUtils.stringifyException(ccex));
-			}
-		}
-	}
-
-
-	@Override
-	public void write(DataOutput out) throws IOException {
-		super.write(out);
-
-		// Write out the name of the class
-		if (this.outputClass == null) {
-			out.writeBoolean(false);
-		}
-		else {
-			out.writeBoolean(true);
-			StringRecord.writeString(out, this.outputClass.getName());
-		}
-	}
-
-
-	@Override
-	public void checkConfiguration(AbstractInvokable invokable) throws IllegalConfigurationException
-	{
-		// see if the task itself has a valid configuration
-		// because this is user code running on the master, we embed it in a catch-all block
-		try {
-			invokable.checkConfiguration();
-		}
-		catch (IllegalConfigurationException icex) {
-			throw icex; // simply forward
-		}
-		catch (Throwable t) {
-			throw new IllegalConfigurationException("Checking the invokable's configuration caused an error: " 
-				+ StringUtils.stringifyException(t));
-		}
-	}
-
-
-	@Override
-	public Class<? extends AbstractInvokable> getInvokableClass() {
-
-		return this.outputClass;
-	}
-
-
-	@Override
-	public int getMaximumNumberOfSubtasks(AbstractInvokable invokable)
-	{
-		// Delegate call to invokable
-		return invokable.getMaximumNumberOfSubtasks();
-	}
-
-
-	@Override
-	public int getMinimumNumberOfSubtasks(AbstractInvokable invokable)
-	{
-		// Delegate call to invokable
-		return invokable.getMinimumNumberOfSubtasks();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
index a22d7ca..9e5f6c7 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
@@ -13,9 +13,21 @@
 
 package eu.stratosphere.nephele.jobgraph;
 
+import eu.stratosphere.api.common.io.InputFormat;
+import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
+import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
+import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.core.io.InputSplit;
+import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
 import eu.stratosphere.nephele.template.AbstractInputTask;
+import eu.stratosphere.pact.runtime.task.util.TaskConfig;
+
+import java.io.DataInput;
+import java.io.IOException;
 
 public class JobInputVertex extends AbstractJobInputVertex {
+	private volatile InputFormat<?, ? extends InputSplit> inputFormat = null;
 
 	/**
 	 * Creates a new job input vertex with the specified name.
@@ -55,7 +67,7 @@ public class JobInputVertex extends AbstractJobInputVertex {
 
 	/**
 	 * Sets the class of the vertex's input task.
-	 * 
+	 *
 	 * @param inputClass
 	 *        The class of the vertex's input task.
 	 */
@@ -72,4 +84,80 @@ public class JobInputVertex extends AbstractJobInputVertex {
 	public Class<? extends AbstractInputTask<?>> getInputClass() {
 		return (Class<? extends AbstractInputTask<?>>) this.invokableClass;
 	}
+
+	public void setInputFormat(UserCodeWrapper<? extends InputFormat<?, ? extends InputSplit>> inputFormatWrapper) {
+		TaskConfig config = new TaskConfig(this.getConfiguration());
+		config.setStubWrapper(inputFormatWrapper);
+
+		inputFormat = inputFormatWrapper.getUserCodeObject();
+	}
+
+	public void setInputFormat(InputFormat<?, ? extends InputSplit> inputFormat) {
+		this.inputFormat = inputFormat;
+
+		UserCodeWrapper<? extends InputFormat<?, ? extends InputSplit>> wrapper = new
+				UserCodeObjectWrapper<InputFormat<?, ? extends InputSplit>>(inputFormat);
+		TaskConfig config = new TaskConfig(this.getConfiguration());
+		config.setStubWrapper(wrapper);
+	}
+
+	public void setInputFormatParameters(Configuration inputFormatParameters){
+		TaskConfig config = new TaskConfig(this.getConfiguration());
+		config.setStubParameters(inputFormatParameters);
+
+		if(inputFormat == null){
+			throw new RuntimeException("There is no input format set in job vertex: " + this.getID());
+		}
+
+		inputFormat.configure(inputFormatParameters);
+	}
+
+	public void setOutputSerializer(TypeSerializerFactory<?> factory){
+		TaskConfig config = new TaskConfig(this.getConfiguration());
+		config.setOutputSerializer(factory);
+	}
+
+
+	@Override
+	public void read(final DataInput input) throws IOException{
+		super.read(input);
+
+		// load input format wrapper from the config
+		ClassLoader cl = null;
+
+		try{
+			cl = LibraryCacheManager.getClassLoader(this.getJobGraph().getJobID());
+		}
+		catch (IOException ioe) {
+			throw new RuntimeException("Usercode ClassLoader could not be obtained for job: " +
+					this.getJobGraph().getJobID(), ioe);
+		}
+
+		final Configuration config = this.getConfiguration();
+		config.setClassLoader(cl);
+		final TaskConfig taskConfig = new TaskConfig(config);
+
+		inputFormat = taskConfig.<InputFormat<?, InputSplit>>getStubWrapper(cl).getUserCodeObject(InputFormat.class,
+				cl);
+
+		inputFormat.configure(taskConfig.getStubParameters());
+	}
+
+	@Override
+	public Class<? extends InputSplit> getInputSplitType() {
+		if(inputFormat == null){
+			throw new RuntimeException("No input format has been set for job vertex: "+ this.getID());
+		}
+
+		return inputFormat.getInputSplitType();
+	}
+
+	@Override
+	public InputSplit[] getInputSplits(int minNumSplits) throws IOException {
+		if(inputFormat == null){
+			throw new RuntimeException("No input format has been set for job vertex: "+ this.getID());
+		}
+
+		return inputFormat.createInputSplits(minNumSplits);
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java
index 31452c3..154e639 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java
@@ -13,7 +13,16 @@
 
 package eu.stratosphere.nephele.jobgraph;
 
+import eu.stratosphere.api.common.io.OutputFormat;
+import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
+import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
 import eu.stratosphere.nephele.template.AbstractOutputTask;
+import eu.stratosphere.pact.runtime.task.util.TaskConfig;
+
+import java.io.DataInput;
+import java.io.IOException;
 
 /**
  * A JobOutputVertex is a specific subtype of a {@link AbstractJobOutputVertex} and is designed
@@ -22,6 +31,7 @@ import eu.stratosphere.nephele.template.AbstractOutputTask;
  * 
  */
 public class JobOutputVertex extends AbstractJobOutputVertex {
+	private volatile OutputFormat<?> outputFormat = null;
 
 	/**
 	 * Creates a new job file output vertex with the specified name.
@@ -78,4 +88,50 @@ public class JobOutputVertex extends AbstractJobOutputVertex {
 	public Class<? extends AbstractOutputTask> getOutputClass() {
 		return (Class<? extends AbstractOutputTask>) this.invokableClass;
 	}
+
+	public void setOutputFormat(UserCodeWrapper<? extends OutputFormat<?>> outputFormatWrapper){
+		TaskConfig config = new TaskConfig(this.getConfiguration());
+		config.setStubWrapper(outputFormatWrapper);
+		outputFormat = outputFormatWrapper.getUserCodeObject();
+	}
+
+	public void setOutputFormat(OutputFormat<?> outputFormat){
+		this.outputFormat = outputFormat;
+		UserCodeWrapper<? extends OutputFormat<?>> wrapper = new UserCodeObjectWrapper<OutputFormat<?>>
+				(outputFormat);
+		TaskConfig config = new TaskConfig(this.getConfiguration());
+		config.setStubWrapper(wrapper);
+	}
+
+	public void setOutputFormatParameters(Configuration parameters){
+		TaskConfig config = new TaskConfig(this.getConfiguration());
+		config.setStubParameters(parameters);
+
+		outputFormat.configure(parameters);
+	}
+
+	@Override
+	public void read(final DataInput input) throws IOException{
+		super.read(input);
+
+		ClassLoader cl = null;
+		try{
+			cl = LibraryCacheManager.getClassLoader(this.getJobGraph().getJobID());
+		}
+		catch (IOException ioe) {
+			throw new RuntimeException("Usercode ClassLoader could not be obtained for job: " +
+					this.getJobGraph().getJobID(), ioe);
+		}
+
+		final Configuration config = this.getConfiguration();
+		config.setClassLoader(cl);
+		final TaskConfig taskConfig = new TaskConfig(config);
+
+		if(taskConfig.hasStubWrapper()){
+			outputFormat = taskConfig.<OutputFormat<?> >getStubWrapper(cl).getUserCodeObject(OutputFormat.class,cl);
+			outputFormat.configure(taskConfig.getStubParameters());
+		}
+	}
+
+	public OutputFormat<?> getOutputFormat() { return outputFormat; }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java
index 61eb66c..8672aeb 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java
@@ -13,7 +13,6 @@
 
 package eu.stratosphere.nephele.jobgraph;
 
-import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.nephele.template.AbstractTask;
 
 /**
@@ -84,20 +83,4 @@ public class JobTaskVertex extends AbstractJobVertex {
 	public Class<? extends AbstractTask> getTaskClass() {
 		return (Class<? extends AbstractTask>) this.invokableClass;
 	}
-
-
-	@Override
-	public int getMaximumNumberOfSubtasks(final AbstractInvokable invokable) {
-
-		// Delegate call to invokable
-		return invokable.getMaximumNumberOfSubtasks();
-	}
-
-
-	@Override
-	public int getMinimumNumberOfSubtasks(final AbstractInvokable invokable) {
-
-		// Delegate call to invokable
-		return invokable.getMinimumNumberOfSubtasks();
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
index bbef991..790aca9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
@@ -102,18 +102,7 @@ public final class InputSplitManager {
 				continue;
 			}
 
-			final AbstractInvokable invokable = groupVertex.getEnvironment().getInvokable();
-			if (!(invokable instanceof AbstractInputTask)) {
-				LOG.error(groupVertex.getName() + " has " + inputSplits.length
-					+ " input splits, but is not of typt AbstractInputTask, ignoring...");
-				continue;
-			}
-
-			@SuppressWarnings("unchecked")
-			final AbstractInputTask<? extends InputSplit> inputTask = (AbstractInputTask<? extends InputSplit>) invokable;
-			final Class<? extends InputSplit> splitType = inputTask.getInputSplitType();
-
-			final InputSplitAssigner assigner = getAssignerByType(splitType, true);
+			final InputSplitAssigner assigner = getAssignerByType(groupVertex.getInputSplitType(), true);
 			// Add entry to cache for fast retrieval during the job execution
 			this.assignerCache.put(groupVertex, assigner);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
index 3717fbf..1e6929d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
@@ -49,18 +49,8 @@ public final class LocatableInputSplitAssigner implements InputSplitAssigner {
 	@Override
 	public void registerGroupVertex(final ExecutionGroupVertex groupVertex) {
 
-		// Do some sanity checks first
-		final AbstractInvokable invokable = groupVertex.getEnvironment().getInvokable();
-
-		// if (!(invokable instanceof AbstractFileInputTask)) {
-		// LOG.error(groupVertex.getName() + " is not an input vertex, ignoring vertex...");
-		// return;
-		// }
-
-		@SuppressWarnings("unchecked")
-		final AbstractInputTask<? extends InputSplit> inputTask = (AbstractInputTask<? extends InputSplit>) invokable;
-		if (!LocatableInputSplit.class.isAssignableFrom(inputTask.getInputSplitType())) {
-			LOG.error(groupVertex.getName() + " produces input splits of type " + inputTask.getInputSplitType()
+		if (!LocatableInputSplit.class.isAssignableFrom(groupVertex.getInputSplitType())) {
+			LOG.error(groupVertex.getName() + " produces input splits of type " + groupVertex.getInputSplitType()
 				+ " and cannot be handled by this split assigner");
 			return;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
index 7894334..048562c 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
@@ -50,18 +50,8 @@ public final class FileInputSplitAssigner implements InputSplitAssigner {
 	@Override
 	public void registerGroupVertex(final ExecutionGroupVertex groupVertex) {
 
-		// Do some sanity checks first
-		final AbstractInvokable invokable = groupVertex.getEnvironment().getInvokable();
-
-		// if (!(invokable instanceof AbstractFileInputTask)) {
-		// LOG.error(groupVertex.getName() + " is not an input vertex, ignoring vertex...");
-		// return;
-		// }
-
-		@SuppressWarnings("unchecked")
-		final AbstractInputTask<? extends InputSplit> inputTask = (AbstractInputTask<? extends InputSplit>) invokable;
-		if (!FileInputSplit.class.equals(inputTask.getInputSplitType())) {
-			LOG.error(groupVertex.getName() + " produces input splits of type " + inputTask.getInputSplitType()
+		if (!FileInputSplit.class.equals(groupVertex.getInputSplitType())) {
+			LOG.error(groupVertex.getName() + " produces input splits of type " + groupVertex.getInputSplitType()
 				+ " and cannot be handled by this split assigner");
 			return;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractFileInputTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractFileInputTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractFileInputTask.java
deleted file mode 100644
index d16e757..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractFileInputTask.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.template;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-import eu.stratosphere.core.fs.BlockLocation;
-import eu.stratosphere.core.fs.FileInputSplit;
-import eu.stratosphere.core.fs.FileStatus;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.fs.Path;
-
-/**
- * Specialized subtype of {@link AbstractInputTask} for tasks which are supposed to generate input from
- * a file. In addition to {@link AbstractInputTask} this class includes a method to query file splits
- * which should be read during the task's execution.
- * 
- */
-public abstract class AbstractFileInputTask extends AbstractInputTask<FileInputSplit> {
-
-	public static final String INPUT_PATH_CONFIG_KEY = "input.path";
-
-	/**
-	 * The fraction that the last split may be larger than the others.
-	 */
-	private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
-
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Returns an iterator to a (possible empty) list of file input splits which is expected to be consumed by this
-	 * instance of the {@link AbstractFileInputTask}.
-	 * 
-	 * @return an iterator to a (possible empty) list of file input splits.
-	 */
-	public Iterator<FileInputSplit> getFileInputSplits() {
-
-		return new InputSplitIterator<FileInputSplit>(getEnvironment().getInputSplitProvider());
-	}
-
-
-	@Override
-	public FileInputSplit[] computeInputSplits(final int minNumSplits) throws IOException {
-
-		final String pathURI = getTaskConfiguration().getString(INPUT_PATH_CONFIG_KEY, null);
-		if (pathURI == null) {
-			throw new IOException("The path to the file was not found in the runtime configuration.");
-		}
-
-		final Path path;
-		try {
-			path = new Path(pathURI);
-		} catch (Exception iaex) {
-			throw new IOException("Invalid file path specifier: ", iaex);
-		}
-
-		final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>();
-
-		// get all the files that are involved in the splits
-		final List<FileStatus> files = new ArrayList<FileStatus>();
-		long totalLength = 0;
-
-		final FileSystem fs = path.getFileSystem();
-		final FileStatus pathFile = fs.getFileStatus(path);
-
-		if (pathFile.isDir()) {
-			// input is directory. list all contained files
-			final FileStatus[] dir = fs.listStatus(path);
-			for (int i = 0; i < dir.length; i++) {
-				if (!dir[i].isDir()) {
-					files.add(dir[i]);
-					totalLength += dir[i].getLen();
-				}
-			}
-
-		} else {
-			files.add(pathFile);
-			totalLength += pathFile.getLen();
-		}
-
-		final long minSplitSize = 1;
-		final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE : (totalLength / minNumSplits +
-					(totalLength % minNumSplits == 0 ? 0 : 1));
-
-		// now that we have the files, generate the splits
-		int splitNum = 0;
-		for (final FileStatus file : files) {
-
-			final long len = file.getLen();
-			final long blockSize = file.getBlockSize();
-
-			final long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
-			final long halfSplit = splitSize >>> 1;
-
-			final long maxBytesForLastSplit = (long) (splitSize * MAX_SPLIT_SIZE_DISCREPANCY);
-
-			if (len > 0) {
-
-				// get the block locations and make sure they are in order with respect to their offset
-				final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len);
-				Arrays.sort(blocks);
-
-				long bytesUnassigned = len;
-				long position = 0;
-
-				int blockIndex = 0;
-
-				while (bytesUnassigned > maxBytesForLastSplit) {
-					// get the block containing the majority of the data
-					blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
-					// create a new split
-					final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position, splitSize,
-						blocks[blockIndex]
-							.getHosts());
-					inputSplits.add(fis);
-
-					// adjust the positions
-					position += splitSize;
-					bytesUnassigned -= splitSize;
-				}
-
-				// assign the last split
-				if (bytesUnassigned > 0) {
-					blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
-					final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position,
-						bytesUnassigned,
-						blocks[blockIndex].getHosts());
-					inputSplits.add(fis);
-				}
-			} else {
-				// special case with a file of zero bytes size
-				final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, 0);
-				String[] hosts;
-				if (blocks.length > 0) {
-					hosts = blocks[0].getHosts();
-				} else {
-					hosts = new String[0];
-				}
-				final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, 0, hosts);
-				inputSplits.add(fis);
-			}
-		}
-
-		return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
-	}
-
-	/**
-	 * Retrieves the index of the <tt>BlockLocation</tt> that contains the part of the file described by the given
-	 * offset.
-	 * 
-	 * @param blocks
-	 *        The different blocks of the file. Must be ordered by their offset.
-	 * @param offset
-	 *        The offset of the position in the file.
-	 * @param startIndex
-	 *        The earliest index to look at.
-	 * @return The index of the block containing the given position.
-	 */
-	private final int getBlockIndexForPosition(final BlockLocation[] blocks, final long offset,
-			final long halfSplitSize, final int startIndex) {
-		
-		// go over all indexes after the startIndex
-		for (int i = startIndex; i < blocks.length; i++) {
-			long blockStart = blocks[i].getOffset();
-			long blockEnd = blockStart + blocks[i].getLength();
-
-			if (offset >= blockStart && offset < blockEnd) {
-				// got the block where the split starts
-				// check if the next block contains more than this one does
-				if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
-					return i + 1;
-				} else {
-					return i;
-				}
-			}
-		}
-		throw new IllegalArgumentException("The given offset is not contained in the any block.");
-	}
-
-
-	@Override
-	public Class<FileInputSplit> getInputSplitType() {
-
-		return FileInputSplit.class;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractFileOutputTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractFileOutputTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractFileOutputTask.java
deleted file mode 100644
index 5f231c1..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractFileOutputTask.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.template;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.fs.Path;
-
-/**
- * Specialized subtype of {@link AbstractOutputTask} for tasks which are supposed to write output to
- * a file.
- * 
- */
-public abstract class AbstractFileOutputTask extends AbstractOutputTask {
-
-	/**
-	 * Returns the output path which has been assigned to the original {@link JobFileOutputVertex}.
-	 * 
-	 * @return the output path which has been assigned to the original {@link JobFileOutputVertex} or <code>null</code>
-	 *         if the path cannot be retrieved
-	 */
-	public Path getFileOutputPath() {
-
-		// TODO: This is a quick workaround, problem can be solved in a more generic way
-		final Configuration conf = getEnvironment().getTaskConfiguration();
-
-		final String outputPath = conf.getString("outputPath", null);
-
-		if (outputPath != null) {
-			return new Path(outputPath);
-		}
-
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractGenericInputTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractGenericInputTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractGenericInputTask.java
deleted file mode 100644
index cf6d916..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractGenericInputTask.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.template;
-
-import eu.stratosphere.core.io.GenericInputSplit;
-
-/**
- * An input task that processes generic input splits (partitions).
- */
-public abstract class AbstractGenericInputTask extends AbstractInputTask<GenericInputSplit> {
-
-
-	@Override
-	public GenericInputSplit[] computeInputSplits(final int requestedMinNumber) throws Exception {
-		GenericInputSplit[] splits = new GenericInputSplit[requestedMinNumber];
-		for (int i = 0; i < requestedMinNumber; i++) {
-			splits[i] = new GenericInputSplit(i,requestedMinNumber);
-		}
-		return splits;
-	}
-
-
-	@Override
-	public Class<GenericInputSplit> getInputSplitType() {
-
-		return GenericInputSplit.class;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInputTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInputTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInputTask.java
index 76c9377..88e4fcb 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInputTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInputTask.java
@@ -27,29 +27,6 @@ import eu.stratosphere.core.io.InputSplit;
 public abstract class AbstractInputTask<T extends InputSplit> extends AbstractInvokable {
 
 	/**
-	 * This method computes the different splits of the input that can be processed in parallel. It needs
-	 * to be implemented by classes that describe input tasks.
-	 * <p>
-	 * Note that this method does not return the input splits for the task instance only, but it computes all splits for
-	 * all parallel instances. Those computed splits are then assigned to the individual task instances by the Job
-	 * Manager. To obtain the input splits for the current task instance, use the {@link #getTaskInputSplits()} method.
-	 * 
-	 * @param requestedMinNumber
-	 *        The minimum number of splits to create. This is a hint by the system how many splits
-	 *        should be generated at least (typically because there are that many parallel task
-	 *        instances), but it is no hard constraint
-	 * @return The input splits for the input to be processed by all instances of this input task
-	 */
-	public abstract T[] computeInputSplits(int requestedMinNumber) throws Exception;
-
-	/**
-	 * Returns the type of input splits that is generated by this input task.
-	 * 
-	 * @return the type of input splits that is generated by this input task
-	 */
-	public abstract Class<T> getInputSplitType();
-
-	/**
 	 * Returns an iterator to a (possible empty) list of input splits which is expected to be consumed by this
 	 * instance of the {@link AbstractInputTask}.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInvokable.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInvokable.java
index a37f592..79390f8 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInvokable.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInvokable.java
@@ -62,39 +62,6 @@ public abstract class AbstractInvokable {
 		return this.environment;
 	}
 
-	/**
-	 * Overwrite this method to implement task specific checks if the
-	 * respective task has been configured properly.
-	 * 
-	 * @throws IllegalConfigurationException
-	 *         thrown if the respective tasks is not configured properly
-	 */
-	public void checkConfiguration() throws IllegalConfigurationException {
-		// The default implementation does nothing
-	}
-
-	/**
-	 * Overwrite this method to provide the minimum number of subtasks the respective task
-	 * must be split into at runtime.
-	 * 
-	 * @return the minimum number of subtasks the respective task must be split into at runtime
-	 */
-	public int getMinimumNumberOfSubtasks() {
-		// The default implementation always returns 1
-		return 1;
-	}
-
-	/**
-	 * Overwrite this method to provide the maximum number of subtasks the respective task
-	 * can be split into at runtime.
-	 * 
-	 * @return the maximum number of subtasks the respective task can be split into at runtime, <code>-1</code> for
-	 *         infinity
-	 */
-	public int getMaximumNumberOfSubtasks() {
-		// The default implementation always returns -1
-		return -1;
-	}
 
 	/**
 	 * Returns the current number of subtasks the respective task is split into.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/GenericInputTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/GenericInputTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/GenericInputTask.java
deleted file mode 100644
index c2cbbc1..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/GenericInputTask.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.template;
-
-import eu.stratosphere.core.io.GenericInputSplit;
-
-/**
- * An input task that processes generic input splits (partitions).
- */
-public abstract class GenericInputTask extends AbstractInputTask<GenericInputSplit> {
-
-
-	@Override
-	public GenericInputSplit[] computeInputSplits(final int requestedMinNumber) throws Exception {
-		GenericInputSplit[] splits = new GenericInputSplit[requestedMinNumber];
-		for (int i = 0; i < requestedMinNumber; i++) {
-			splits[i] = new GenericInputSplit(i, requestedMinNumber);
-		}
-		return splits;
-	}
-
-
-	@Override
-	public Class<GenericInputSplit> getInputSplitType() {
-
-		return GenericInputSplit.class;
-	}
-}