You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:30:59 UTC

[24/30] Offer buffer-oriented API for I/O (#25)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/UnionJob.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/UnionJob.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/UnionJob.java
deleted file mode 100644
index d9c09d3..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/UnionJob.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.union;
-
-import java.io.File;
-import java.io.IOException;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.client.JobClient;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.jobgraph.JobFileInputVertex;
-import eu.stratosphere.nephele.jobgraph.JobFileOutputVertex;
-import eu.stratosphere.nephele.jobgraph.JobGraph;
-import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
-import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-import eu.stratosphere.nephele.util.JarFileCreator;
-
-public class UnionJob {
-
-	public static void main(final String[] args) {
-
-		// Create graph and define vertices
-		final JobGraph unionGraph = new JobGraph("Union Job");
-
-		final JobFileInputVertex input1 = new JobFileInputVertex("Input 1", unionGraph);
-		input1.setFileInputClass(ProducerTask.class);
-		input1.setFilePath(new Path("file:///tmp/"));
-
-		final JobFileInputVertex input2 = new JobFileInputVertex("Input 2", unionGraph);
-		input2.setFileInputClass(ProducerTask.class);
-		input2.setFilePath(new Path("file:///tmp/"));
-
-		final JobTaskVertex union = new JobTaskVertex("Union", unionGraph);
-		union.setTaskClass(UnionTask.class);
-
-		final JobFileOutputVertex output = new JobFileOutputVertex("Output", unionGraph);
-		output.setFileOutputClass(ConsumerTask.class);
-		output.setFilePath(new Path("file:///tmp/"));
-
-		// Create edges between vertices
-		try {
-			input1.connectTo(union, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
-			input2.connectTo(union, ChannelType.NETWORK,  DistributionPattern.POINTWISE);
-			union.connectTo(output, ChannelType.INMEMORY);
-		} catch (JobGraphDefinitionException e) {
-			e.printStackTrace();
-			return;
-		}
-
-		// Create jar file and attach it
-		final File jarFile = new File("/tmp/unionJob.jar");
-		final JarFileCreator jarFileCreator = new JarFileCreator(jarFile);
-		jarFileCreator.addClass(ProducerTask.class);
-		jarFileCreator.addClass(UnionTask.class);
-		jarFileCreator.addClass(ConsumerTask.class);
-		
-		try {
-			jarFileCreator.createJarFile();
-		} catch (IOException ioe) {
-
-			ioe.printStackTrace();
-
-			if (jarFile.exists()) {
-				jarFile.delete();
-			}
-
-			return;
-		}
-		
-		//Define instance sharing
-		input1.setVertexToShareInstancesWith(output);
-		input2.setVertexToShareInstancesWith(output);
-		union.setVertexToShareInstancesWith(output);
-
-		unionGraph.addJar(new Path("file://" + jarFile.getAbsolutePath()));
-
-		final Configuration conf = new Configuration();
-		conf.setString("jobmanager.rpc.address", "localhost");
-		
-		try {
-			final JobClient jobClient = new JobClient(unionGraph, conf);
-			jobClient.submitJobAndWait();
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-
-		if (jarFile.exists()) {
-			jarFile.delete();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/UnionTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/UnionTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/UnionTask.java
deleted file mode 100644
index d99eb42..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/UnionTask.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.union;
-
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.MutableRecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.io.UnionRecordReader;
-import eu.stratosphere.nephele.template.AbstractTask;
-
-public class UnionTask extends AbstractTask {
-
-	private UnionRecordReader<StringRecord> input;
-
-	private RecordWriter<StringRecord> output;
-
-	@Override
-	public void registerInputOutput() {
-		@SuppressWarnings("unchecked")
-		MutableRecordReader<StringRecord>[] recordReaders = (MutableRecordReader<StringRecord>[]) new MutableRecordReader<?>[2];
-		recordReaders[0] = new MutableRecordReader<StringRecord>(this);
-		recordReaders[1] = new MutableRecordReader<StringRecord>(this);
-
-		this.input = new UnionRecordReader<StringRecord>(recordReaders, StringRecord.class);
-		this.output = new RecordWriter<StringRecord>(this, StringRecord.class);
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		while (this.input.hasNext()) {
-			this.output.emit(this.input.next());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/CancelTaskException.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/CancelTaskException.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/CancelTaskException.java
new file mode 100644
index 0000000..6a09e89
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/CancelTaskException.java
@@ -0,0 +1,25 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.nephele.execution;
+
+
+import eu.stratosphere.pact.runtime.task.chaining.ExceptionInChainedStubException;
+
+/**
+ * Thrown to trigger a canceling of the executing task. Intended to cause a cancelled status, rather than a failed status.
+ */
+public class CancelTaskException extends RuntimeException {
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java
index a606dcf..e0bcc70 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java
@@ -1,5 +1,5 @@
 /***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  * the License. You may obtain a copy of the License at
@@ -13,30 +13,25 @@
 
 package eu.stratosphere.nephele.execution;
 
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.FutureTask;
-
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.fs.Path;
 import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.ChannelSelector;
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.io.InputGate;
-import eu.stratosphere.nephele.io.OutputGate;
-import eu.stratosphere.nephele.io.RecordDeserializerFactory;
-import eu.stratosphere.nephele.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.gates.GateID;
+import eu.stratosphere.runtime.io.gates.InputGate;
+import eu.stratosphere.runtime.io.gates.OutputGate;
+import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.protocols.AccumulatorProtocol;
 import eu.stratosphere.nephele.services.iomanager.IOManager;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
 import eu.stratosphere.nephele.template.InputSplitProvider;
 
 /**
  * The user code of every Nephele task runs inside an <code>Environment</code> object. The environment provides
  * important services to the task. It keeps track of setting up the communication channels and provides access to input
  * splits, memory manager, etc.
- * 
  */
 public interface Environment {
 	/**
@@ -49,9 +44,9 @@ public interface Environment {
 	JobID getJobID();
 
 	/**
-	 * Returns the task configuration object which was attached to the original {@link JobVertex}.
+	 * Returns the task configuration object which was attached to the original JobVertex.
 	 * 
-	 * @return the task configuration object which was attached to the original {@link JobVertex}
+	 * @return the task configuration object which was attached to the original JobVertex.
 	 */
 	Configuration getTaskConfiguration();
 
@@ -128,13 +123,6 @@ public interface Environment {
 	GateID getNextUnboundInputGateID();
 
 	/**
-	 * Returns the next unbound output gate ID or <code>null</code> if no such ID exists
-	 * 
-	 * @return the next unbound output gate ID or <code>null</code> if no such ID exists
-	 */
-	GateID getNextUnboundOutputGateID();
-
-	/**
 	 * Returns the number of output gates registered with this environment.
 	 * 
 	 * @return the number of output gates registered with this environment
@@ -163,46 +151,16 @@ public interface Environment {
 	int getNumberOfInputChannels();
 
 	/**
-	 * Creates an output gate.
-	 * 
-	 * @param gateID
-	 * @param outputClass
-	 * @param selector
-	 * @param isBroadcast
-	 * @param <T>
-	 *        The type of the record consumed by the output gate.
-	 * @return The created output gate.
+	 * Creates a new OutputGate and registers it with the Environment.
+	 *
+	 * @return the newly created output gate
 	 */
-	<T extends IOReadableWritable> OutputGate<T> createOutputGate(GateID gateID, Class<T> outputClass,
-															ChannelSelector<T> selector, boolean isBroadcast);
+	OutputGate createAndRegisterOutputGate();
 
 	/**
-	 * Creates an input gate.
-	 * 
-	 * @param gateID
-	 * @param deserializer
-	 * @param distributionPattern
-	 * @param <T>
-	 *        The type of the record read from the input gate.
-	 * @return The created input gate.
+	 * Creates a new InputGate and registers it with the Environment.
 	 */
-	<T extends IOReadableWritable> InputGate<T> createInputGate(GateID gateID, RecordDeserializerFactory<T> deserializerFactory);
-
-	/**
-	 * Registers an output gate with this environment.
-	 * 
-	 * @param outputGate
-	 *        the output gate to be registered
-	 */
-	void registerOutputGate(OutputGate<? extends IOReadableWritable> outputGate);
-
-	/**
-	 * Registers an input gate with this environment.
-	 * 
-	 * @param inputGate
-	 *        the input gate to be registered
-	 */
-	void registerInputGate(InputGate<? extends IOReadableWritable> inputGate);
+	<T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate();
 
 	/**
 	 * Returns the IDs of all output channels connected to this environment.
@@ -255,5 +213,13 @@ public interface Environment {
 	 */
 	AccumulatorProtocol getAccumulatorProtocolProxy();
 
-	Map<String, FutureTask<Path>> getCopyTask();
+	/**
+	 * Returns the buffer provider for this environment.
+	 * <p>
+	 * The returned buffer provider is used by the output side of the network stack.
+	 *
+	 * @return Buffer provider for the output side of the network stack
+	 * @see eu.stratosphere.runtime.io.api.RecordWriter
+	 */
+	BufferProvider getOutputBufferProvider();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java
index 7d7cebb..55f036a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java
@@ -97,29 +97,9 @@ public final class ExecutionStateTransition {
 			unexpectedStateChange = false;
 		}
 
-		// This transition can appear if a task cannot be deployed at the assigned task manager.
-		else if (oldState == ExecutionState.STARTING && newState == ExecutionState.FAILED) {
-			unexpectedStateChange = false;
-		}
-
 		// -------------- error cases --------------
-		else if (newState == FAILED) {
-			// any state may fail
-			unexpectedStateChange = false;
-		}
-		
-		// This is a regular transition as a result of a cancel operation.
-		else if (oldState == ExecutionState.RUNNING && newState == ExecutionState.CANCELING) {
-			unexpectedStateChange = false;
-		}
-
-		// This is a regular transition as a result of a cancel operation.
-		else if (oldState == ExecutionState.FINISHING && newState == ExecutionState.CANCELING) {
-			unexpectedStateChange = false;
-		}
-
-		// This is a regular transition as a result of a cancel operation.
-		else if (oldState == ExecutionState.CANCELING && newState == ExecutionState.CANCELED) {
+		else if (newState == FAILED || newState == CANCELED || newState == CANCELING) {
+			// any state may fail or cancel itself
 			unexpectedStateChange = false;
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
index a8ca11c..59787d2 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
@@ -36,21 +36,25 @@ import eu.stratosphere.nephele.deployment.ChannelDeploymentDescriptor;
 import eu.stratosphere.nephele.deployment.GateDeploymentDescriptor;
 import eu.stratosphere.nephele.deployment.TaskDeploymentDescriptor;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.io.ChannelSelector;
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.io.InputGate;
-import eu.stratosphere.nephele.io.OutputGate;
-import eu.stratosphere.nephele.io.RecordDeserializerFactory;
-import eu.stratosphere.nephele.io.RuntimeInputGate;
-import eu.stratosphere.nephele.io.RuntimeOutputGate;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.protocols.AccumulatorProtocol;
 import eu.stratosphere.nephele.services.iomanager.IOManager;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
 import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.nephele.template.InputSplitProvider;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.OutputChannel;
+import eu.stratosphere.runtime.io.gates.GateID;
+import eu.stratosphere.runtime.io.gates.InputGate;
+import eu.stratosphere.runtime.io.gates.OutputGate;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
+import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPool;
+import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
 import eu.stratosphere.util.StringUtils;
 
 /**
@@ -60,7 +64,7 @@ import eu.stratosphere.util.StringUtils;
  * <p>
  * This class is thread-safe.
  */
-public class RuntimeEnvironment implements Environment, Runnable {
+public class RuntimeEnvironment implements Environment, BufferProvider, LocalBufferPoolOwner, Runnable {
 
 	/**
 	 * The log object used for debugging.
@@ -75,7 +79,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
 	/**
 	 * List of output gates created by the task.
 	 */
-	private final List<OutputGate<? extends IOReadableWritable>> outputGates = new CopyOnWriteArrayList<OutputGate<? extends IOReadableWritable>>();
+	private final List<OutputGate> outputGates = new CopyOnWriteArrayList<OutputGate>();
 
 	/**
 	 * List of input gates created by the task.
@@ -83,12 +87,6 @@ public class RuntimeEnvironment implements Environment, Runnable {
 	private final List<InputGate<? extends IOReadableWritable>> inputGates = new CopyOnWriteArrayList<InputGate<? extends IOReadableWritable>>();
 
 	/**
-	 * Queue of unbound output gate IDs which are required for deserializing an environment in the course of an RPC
-	 * call.
-	 */
-	private final Queue<GateID> unboundOutputGateIDs = new ArrayDeque<GateID>();
-
-	/**
 	 * Queue of unbound input gate IDs which are required for deserializing an environment in the course of an RPC
 	 * call.
 	 */
@@ -145,7 +143,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
 	private volatile ExecutionObserver executionObserver = null;
 	
 	/**
-	 * The RPC procy to report accumulators to JobManager
+	 * The RPC proxy to report accumulators to JobManager
 	 */
 	private AccumulatorProtocol accumulatorProtocolProxy = null;
 
@@ -164,7 +162,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
 	 */
 	private final String taskName;
 
-	private Map<String,FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
+	private LocalBufferPool outputBufferPool;
 
 	/**
 	 * Creates a new runtime environment object which contains the runtime information for the encapsulated Nephele
@@ -177,7 +175,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
 	 * @param invokableClass
 	 *        invokableClass the class that should be instantiated as a Nephele task
 	 * @param taskConfiguration
-	 *        the configuration object which was attached to the original {@link JobVertex}
+	 *        the configuration object which was attached to the original JobVertex
 	 * @param jobConfiguration
 	 *        the configuration object which was attached to the original {@link JobGraph}
 	 * @throws Exception
@@ -240,60 +238,16 @@ public class RuntimeEnvironment implements Environment, Runnable {
 		this.invokable.setEnvironment(this);
 		this.invokable.registerInputOutput();
 
-		if (!this.unboundOutputGateIDs.isEmpty() && LOG.isErrorEnabled()) {
-			LOG.error("Inconsistency: " + this.unboundOutputGateIDs.size() + " unbound output gate IDs left");
-		}
+		int numOutputGates = tdd.getNumberOfOutputGateDescriptors();
 
-		if (!this.unboundInputGateIDs.isEmpty() && LOG.isErrorEnabled()) {
-			LOG.error("Inconsistency: " + this.unboundInputGateIDs.size() + " unbound output gate IDs left");
+		for (int i = 0; i < numOutputGates; ++i) {
+			this.outputGates.get(i).initializeChannels(tdd.getOutputGateDescriptor(i));
 		}
 
-		final int noogdd = tdd.getNumberOfOutputGateDescriptors();
-		for (int i = 0; i < noogdd; ++i) {
-			final GateDeploymentDescriptor gdd = tdd.getOutputGateDescriptor(i);
-			final OutputGate og = this.outputGates.get(i);
-			final ChannelType channelType = gdd.getChannelType();
-			og.setChannelType(channelType);
+		int numInputGates = tdd.getNumberOfInputGateDescriptors();
 
-			final int nocdd = gdd.getNumberOfChannelDescriptors();
-			for (int j = 0; j < nocdd; ++j) {
-
-				final ChannelDeploymentDescriptor cdd = gdd.getChannelDescriptor(j);
-				switch (channelType) {
-				case NETWORK:
-					og.createNetworkOutputChannel(og, cdd.getOutputChannelID(), cdd.getInputChannelID());
-					break;
-				case INMEMORY:
-					og.createInMemoryOutputChannel(og, cdd.getOutputChannelID(), cdd.getInputChannelID());
-					break;
-				default:
-					throw new IllegalStateException("Unknown channel type");
-				}
-			}
-		}
-
-		final int noigdd = tdd.getNumberOfInputGateDescriptors();
-		for (int i = 0; i < noigdd; ++i) {
-			final GateDeploymentDescriptor gdd = tdd.getInputGateDescriptor(i);
-			final InputGate ig = this.inputGates.get(i);
-			final ChannelType channelType = gdd.getChannelType();
-			ig.setChannelType(channelType);
-
-			final int nicdd = gdd.getNumberOfChannelDescriptors();
-			for (int j = 0; j < nicdd; ++j) {
-
-				final ChannelDeploymentDescriptor cdd = gdd.getChannelDescriptor(j);
-				switch (channelType) {
-				case NETWORK:
-					ig.createNetworkInputChannel(ig, cdd.getInputChannelID(), cdd.getOutputChannelID());
-					break;
-				case INMEMORY:
-					ig.createInMemoryInputChannel(ig, cdd.getInputChannelID(), cdd.getOutputChannelID());
-					break;
-				default:
-					throw new IllegalStateException("Unknown channel type");
-				}
-			}
+		for(int i = 0; i < numInputGates; i++){
+			this.inputGates.get(i).initializeChannels(tdd.getInputGateDescriptor(i));
 		}
 	}
 
@@ -306,29 +260,26 @@ public class RuntimeEnvironment implements Environment, Runnable {
 		return this.invokable;
 	}
 
-
 	@Override
 	public JobID getJobID() {
 		return this.jobID;
 	}
 
-
 	@Override
 	public GateID getNextUnboundInputGateID() {
-
 		return this.unboundInputGateIDs.poll();
 	}
 
+	@Override
+	public OutputGate createAndRegisterOutputGate() {
+		OutputGate gate = new OutputGate(getJobID(), new GateID(),  getNumberOfOutputGates());
+		this.outputGates.add(gate);
 
-	public GateID getNextUnboundOutputGateID() {
-
-		return this.unboundOutputGateIDs.poll();
+		return gate;
 	}
 
-
 	@Override
 	public void run() {
-
 		if (invokable == null) {
 			LOG.fatal("ExecutionEnvironment has no Invokable set");
 		}
@@ -343,9 +294,6 @@ public class RuntimeEnvironment implements Environment, Runnable {
 		}
 
 		try {
-
-			// Activate input channels
-			// activateInputChannels();
 			ClassLoader cl = LibraryCacheManager.getClassLoader(jobID);
 			Thread.currentThread().setContextClassLoader(cl);
 			this.invokable.invoke();
@@ -354,9 +302,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
 			if (this.executionObserver.isCanceled()) {
 				throw new InterruptedException();
 			}
-
 		} catch (Throwable t) {
-
 			if (!this.executionObserver.isCanceled()) {
 
 				// Perform clean up when the task failed and has been not canceled by the user
@@ -370,7 +316,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
 			// Release all resources that may currently be allocated by the individual channels
 			releaseAllChannelResources();
 
-			if (this.executionObserver.isCanceled()) {
+			if (this.executionObserver.isCanceled() || t instanceof CancelTaskException) {
 				changeExecutionState(ExecutionState.CANCELED, null);
 			} else {
 				changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException(t));
@@ -399,7 +345,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
 			// Release all resources that may currently be allocated by the individual channels
 			releaseAllChannelResources();
 
-			if (this.executionObserver.isCanceled()) {
+			if (this.executionObserver.isCanceled() || t instanceof CancelTaskException) {
 				changeExecutionState(ExecutionState.CANCELED, null);
 			} else {
 				changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException(t));
@@ -415,62 +361,35 @@ public class RuntimeEnvironment implements Environment, Runnable {
 		changeExecutionState(ExecutionState.FINISHED, null);
 	}
 
-
 	@Override
-	public <T extends IOReadableWritable> OutputGate<T> createOutputGate(final GateID gateID, Class<T> outputClass,
-			final ChannelSelector<T> selector, final boolean isBroadcast) {
-		final RuntimeOutputGate<T> rog = new RuntimeOutputGate<T>(getJobID(), gateID, outputClass,
-															getNumberOfOutputGates(), selector, isBroadcast);
-		return rog;
-	}
+	public <T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate() {
+		InputGate<T> gate = new InputGate<T>(getJobID(), new GateID(), getNumberOfInputGates());
+		this.inputGates.add(gate);
 
-
-	@Override
-	public <T extends IOReadableWritable> InputGate<T> createInputGate(final GateID gateID,
-										final RecordDeserializerFactory<T> deserializerFactory) {
-		final RuntimeInputGate<T> rig = new RuntimeInputGate<T>(getJobID(), gateID, deserializerFactory,
-			getNumberOfInputGates());
-		return rig;
-	}
-
-	@Override
-	public void registerOutputGate(OutputGate<? extends IOReadableWritable> outputGate) {
-
-		this.outputGates.add(outputGate);
+		return gate;
 	}
 
-	@Override
-	public void registerInputGate(InputGate<? extends IOReadableWritable> inputGate) {
-		this.inputGates.add(inputGate);
-	}
-
-
 	public int getNumberOfOutputGates() {
 		return this.outputGates.size();
 	}
 
-
 	@Override
 	public int getNumberOfInputGates() {
 		return this.inputGates.size();
 	}
 
-
 	@Override
 	public int getNumberOfOutputChannels() {
-
 		int numberOfOutputChannels = 0;
 		for (int i = 0; i < this.outputGates.size(); ++i) {
-			numberOfOutputChannels += this.outputGates.get(i).getNumberOfOutputChannels();
+			numberOfOutputChannels += this.outputGates.get(i).getNumChannels();
 		}
 
 		return numberOfOutputChannels;
 	}
 
-
 	@Override
 	public int getNumberOfInputChannels() {
-
 		int numberOfInputChannels = 0;
 		for (int i = 0; i < this.inputGates.size(); ++i) {
 			numberOfInputChannels += this.inputGates.get(i).getNumberOfInputChannels();
@@ -497,13 +416,13 @@ public class RuntimeEnvironment implements Environment, Runnable {
 	/**
 	 * Returns the registered output gate with index <code>pos</code>.
 	 * 
-	 * @param pos
+	 * @param index
 	 *        the index of the output gate to return
 	 * @return the output gate at index <code>pos</code> or <code>null</code> if no such index exists
 	 */
-	public OutputGate<? extends IOReadableWritable> getOutputGate(final int pos) {
-		if (pos < this.outputGates.size()) {
-			return this.outputGates.get(pos);
+	public OutputGate getOutputGate(int index) {
+		if (index < this.outputGates.size()) {
+			return this.outputGates.get(index);
 		}
 
 		return null;
@@ -515,7 +434,6 @@ public class RuntimeEnvironment implements Environment, Runnable {
 	 * @return the thread which is assigned to execute the user code
 	 */
 	public Thread getExecutingThread() {
-
 		synchronized (this) {
 
 			if (this.executingThread == null) {
@@ -538,29 +456,14 @@ public class RuntimeEnvironment implements Environment, Runnable {
 	 * @throws InterruptedException
 	 *         thrown if the thread waiting for the channels to be closed is interrupted
 	 */
-	private void waitForOutputChannelsToBeClosed() throws IOException, InterruptedException {
-
-		// Wait for disconnection of all output gates
-		while (true) {
-
-			// Make sure, we leave this method with an InterruptedException when the task has been canceled
-			if (this.executionObserver.isCanceled()) {
-				throw new InterruptedException();
-			}
-
-			boolean allClosed = true;
-			for (int i = 0; i < getNumberOfOutputGates(); i++) {
-				final OutputGate<? extends IOReadableWritable> og = this.outputGates.get(i);
-				if (!og.isClosed()) {
-					allClosed = false;
-				}
-			}
+	private void waitForOutputChannelsToBeClosed() throws InterruptedException {
+		// Make sure, we leave this method with an InterruptedException when the task has been canceled
+		if (this.executionObserver.isCanceled()) {
+			return;
+		}
 
-			if (allClosed) {
-				break;
-			} else {
-				Thread.sleep(SLEEPINTERVAL);
-			}
+		for (OutputGate og : this.outputGates) {
+			og.waitForGateToBeClosed();
 		}
 	}
 
@@ -573,7 +476,6 @@ public class RuntimeEnvironment implements Environment, Runnable {
 	 *         thrown if the thread waiting for the channels to be closed is interrupted
 	 */
 	private void waitForInputChannelsToBeClosed() throws IOException, InterruptedException {
-
 		// Wait for disconnection of all output gates
 		while (true) {
 
@@ -602,7 +504,6 @@ public class RuntimeEnvironment implements Environment, Runnable {
 	 * Closes all input gates which are not already closed.
 	 */
 	private void closeInputGates() throws IOException, InterruptedException {
-
 		for (int i = 0; i < this.inputGates.size(); i++) {
 			final InputGate<? extends IOReadableWritable> eig = this.inputGates.get(i);
 			// Important: close must be called on each input gate exactly once
@@ -615,61 +516,49 @@ public class RuntimeEnvironment implements Environment, Runnable {
 	 * Requests all output gates to be closed.
 	 */
 	private void requestAllOutputGatesToClose() throws IOException, InterruptedException {
-
 		for (int i = 0; i < this.outputGates.size(); i++) {
 			this.outputGates.get(i).requestClose();
 		}
 	}
 
-
 	@Override
 	public IOManager getIOManager() {
 		return this.ioManager;
 	}
 
-
 	@Override
 	public MemoryManager getMemoryManager() {
 		return this.memoryManager;
 	}
 
-
 	@Override
 	public Configuration getTaskConfiguration() {
 		return this.taskConfiguration;
 	}
 
-
 	@Override
 	public Configuration getJobConfiguration() {
 		return this.jobConfiguration;
 	}
 
-
 	@Override
 	public int getCurrentNumberOfSubtasks() {
-
 		return this.currentNumberOfSubtasks;
 	}
 
-
 	@Override
 	public int getIndexInSubtaskGroup() {
-
 		return this.indexInSubtaskGroup;
 	}
 
 	private void changeExecutionState(final ExecutionState newExecutionState, final String optionalMessage) {
-
 		if (this.executionObserver != null) {
 			this.executionObserver.executionStateChanged(newExecutionState, optionalMessage);
 		}
 	}
 
-
 	@Override
 	public String getTaskName() {
-
 		return this.taskName;
 	}
 
@@ -679,9 +568,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
 	 * @return the name of the task with its index in the subtask group and the total number of subtasks
 	 */
 	public String getTaskNameWithIndex() {
-
-		return this.taskName + " (" + (getIndexInSubtaskGroup() + 1) + "/"
-			+ getCurrentNumberOfSubtasks() + ")";
+		return String.format("%s (%d/%d)", this.taskName, getIndexInSubtaskGroup() + 1, getCurrentNumberOfSubtasks());
 	}
 
 	/**
@@ -694,25 +581,20 @@ public class RuntimeEnvironment implements Environment, Runnable {
 		this.executionObserver = executionObserver;
 	}
 
-
 	@Override
 	public InputSplitProvider getInputSplitProvider() {
 		return this.inputSplitProvider;
 	}
 
-
 	@Override
 	public void userThreadStarted(final Thread userThread) {
-
 		if (this.executionObserver != null) {
 			this.executionObserver.userThreadStarted(userThread);
 		}
 	}
 
-
 	@Override
 	public void userThreadFinished(final Thread userThread) {
-
 		if (this.executionObserver != null) {
 			this.executionObserver.userThreadFinished(userThread);
 		}
@@ -723,7 +605,6 @@ public class RuntimeEnvironment implements Environment, Runnable {
 	 * method should only be called after the respected task has stopped running.
 	 */
 	private void releaseAllChannelResources() {
-
 		for (int i = 0; i < this.inputGates.size(); i++) {
 			this.inputGates.get(i).releaseAllChannelResources();
 		}
@@ -733,28 +614,21 @@ public class RuntimeEnvironment implements Environment, Runnable {
 		}
 	}
 
-
 	@Override
 	public Set<ChannelID> getOutputChannelIDs() {
+		Set<ChannelID> ids= new HashSet<ChannelID>();
 
-		final Set<ChannelID> outputChannelIDs = new HashSet<ChannelID>();
-
-		final Iterator<OutputGate<? extends IOReadableWritable>> gateIterator = this.outputGates.iterator();
-		while (gateIterator.hasNext()) {
-
-			final OutputGate<? extends IOReadableWritable> outputGate = gateIterator.next();
-			for (int i = 0; i < outputGate.getNumberOfOutputChannels(); ++i) {
-				outputChannelIDs.add(outputGate.getOutputChannel(i).getID());
+		for (OutputGate gate : this.outputGates) {
+			for (OutputChannel channel : gate.channels()) {
+				ids.add(channel.getID());
 			}
 		}
 
-		return Collections.unmodifiableSet(outputChannelIDs);
+		return Collections.unmodifiableSet(ids);
 	}
 
-
 	@Override
 	public Set<ChannelID> getInputChannelIDs() {
-
 		final Set<ChannelID> inputChannelIDs = new HashSet<ChannelID>();
 
 		final Iterator<InputGate<? extends IOReadableWritable>> gateIterator = this.inputGates.iterator();
@@ -769,10 +643,8 @@ public class RuntimeEnvironment implements Environment, Runnable {
 		return Collections.unmodifiableSet(inputChannelIDs);
 	}
 
-
 	@Override
 	public Set<GateID> getInputGateIDs() {
-
 		final Set<GateID> inputGateIDs = new HashSet<GateID>();
 
 		final Iterator<InputGate<? extends IOReadableWritable>> gateIterator = this.inputGates.iterator();
@@ -783,13 +655,11 @@ public class RuntimeEnvironment implements Environment, Runnable {
 		return Collections.unmodifiableSet(inputGateIDs);
 	}
 
-
 	@Override
 	public Set<GateID> getOutputGateIDs() {
-
 		final Set<GateID> outputGateIDs = new HashSet<GateID>();
 
-		final Iterator<OutputGate<? extends IOReadableWritable>> gateIterator = this.outputGates.iterator();
+		final Iterator<OutputGate> gateIterator = this.outputGates.iterator();
 		while (gateIterator.hasNext()) {
 			outputGateIDs.add(gateIterator.next().getGateID());
 		}
@@ -800,11 +670,10 @@ public class RuntimeEnvironment implements Environment, Runnable {
 
 	@Override
 	public Set<ChannelID> getOutputChannelIDsOfGate(final GateID gateID) {
-
-		OutputGate<? extends IOReadableWritable> outputGate = null;
-		final Iterator<OutputGate<? extends IOReadableWritable>> gateIterator = this.outputGates.iterator();
+		OutputGate outputGate = null;
+		final Iterator<OutputGate> gateIterator = this.outputGates.iterator();
 		while (gateIterator.hasNext()) {
-			final OutputGate<? extends IOReadableWritable> candidateGate = gateIterator.next();
+			final OutputGate candidateGate = gateIterator.next();
 			if (candidateGate.getGateID().equals(gateID)) {
 				outputGate = candidateGate;
 				break;
@@ -817,8 +686,8 @@ public class RuntimeEnvironment implements Environment, Runnable {
 
 		final Set<ChannelID> outputChannelIDs = new HashSet<ChannelID>();
 
-		for (int i = 0; i < outputGate.getNumberOfOutputChannels(); ++i) {
-			outputChannelIDs.add(outputGate.getOutputChannel(i).getID());
+		for (int i = 0; i < outputGate.getNumChannels(); ++i) {
+			outputChannelIDs.add(outputGate.getChannel(i).getID());
 		}
 
 		return Collections.unmodifiableSet(outputChannelIDs);
@@ -827,7 +696,6 @@ public class RuntimeEnvironment implements Environment, Runnable {
 
 	@Override
 	public Set<ChannelID> getInputChannelIDsOfGate(final GateID gateID) {
-
 		InputGate<? extends IOReadableWritable> inputGate = null;
 		final Iterator<InputGate<? extends IOReadableWritable>> gateIterator = this.inputGates.iterator();
 		while (gateIterator.hasNext()) {
@@ -850,18 +718,86 @@ public class RuntimeEnvironment implements Environment, Runnable {
 
 		return Collections.unmodifiableSet(inputChannelIDs);
 	}
+
+	public List<OutputGate> outputGates() {
+		return this.outputGates;
+	}
+
+	public List<InputGate<? extends IOReadableWritable>> inputGates() {
+		return this.inputGates;
+	}
 	
 	@Override
 	public AccumulatorProtocol getAccumulatorProtocolProxy() {
 		return accumulatorProtocolProxy;
 	}
 
-	public void addCopyTaskForCacheFile(String name, FutureTask<Path> copyTask) {
-		this.cacheCopyTasks.put(name, copyTask);
+	@Override
+	public BufferProvider getOutputBufferProvider() {
+		return this;
 	}
+	
+	// -----------------------------------------------------------------------------------------------------------------
+	//                                            BufferProvider methods
+	// -----------------------------------------------------------------------------------------------------------------
+	
 	@Override
-	public Map<String, FutureTask<Path>> getCopyTask() {
-		return this.cacheCopyTasks;
+	public Buffer requestBuffer(int minBufferSize) throws IOException {
+		return this.outputBufferPool.requestBuffer(minBufferSize);
 	}
+	
+	@Override
+	public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException {
+		return this.outputBufferPool.requestBufferBlocking(minBufferSize);
+	}
+	
+	@Override
+	public int getBufferSize() {
+		return this.outputBufferPool.getBufferSize();
+	}
+	
+	@Override
+	public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+		return this.outputBufferPool.registerBufferAvailabilityListener(listener);
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	//                                       LocalBufferPoolOwner methods
+	// -----------------------------------------------------------------------------------------------------------------
 
+	@Override
+	public int getNumberOfChannels() {
+		return getNumberOfOutputChannels();
+	}
+
+	@Override
+	public void setDesignatedNumberOfBuffers(int numBuffers) {
+		this.outputBufferPool.setNumDesignatedBuffers(numBuffers);
+	}
+
+	@Override
+	public void clearLocalBufferPool() {
+		this.outputBufferPool.destroy();
+	}
+
+	@Override
+	public void registerGlobalBufferPool(GlobalBufferPool globalBufferPool) {
+		if (this.outputBufferPool == null) {
+			this.outputBufferPool = new LocalBufferPool(globalBufferPool, 1);
+		}
+	}
+
+	@Override
+	public void logBufferUtilization() {
+		LOG.info(String.format("\t%s: %d available, %d requested, %d designated",
+				getTaskNameWithIndex(),
+				this.outputBufferPool.numAvailableBuffers(),
+				this.outputBufferPool.numRequestedBuffers(),
+				this.outputBufferPool.numDesignatedBuffers()));
+	}
+
+	@Override
+	public void reportAsynchronousEvent() {
+		this.outputBufferPool.reportAsynchronousEvent();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/DistributionPatternProvider.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/DistributionPatternProvider.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/DistributionPatternProvider.java
index 0c004ad..35350cc 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/DistributionPatternProvider.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/DistributionPatternProvider.java
@@ -13,7 +13,7 @@
 
 package eu.stratosphere.nephele.executiongraph;
 
-import eu.stratosphere.nephele.io.DistributionPattern;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
 
 public final class DistributionPatternProvider {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java
index cbb957f..0106361 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java
@@ -13,11 +13,11 @@
 
 package eu.stratosphere.nephele.executiongraph;
 
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 
 /**
- * Objects of this class represent a pair of {@link AbstractInputChannel} and {@link AbstractOutputChannel} objects
+ * Objects of this class represent a pair of {@link eu.stratosphere.runtime.io.serialization.io.channels.InputChannel} and {@link AbstractOutputChannel} objects
  * within an {@link ExecutionGraph}, Nephele's internal scheduling representation for jobs.
  * 
  */
@@ -85,11 +85,6 @@ public final class ExecutionEdge {
 		return this.groupEdge.getChannelType();
 	}
 	
-	public boolean isBroadcast() {
-		
-		return this.groupEdge.isBroadcast();
-	}
-	
 	public int getConnectionID() {
 		
 		return this.groupEdge.getConnectionID();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGate.java
index f1bcb76..20d1a4e 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGate.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGate.java
@@ -16,8 +16,8 @@ package eu.stratosphere.nephele.executiongraph;
 import java.util.Collection;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.gates.GateID;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 
 /**
  * Objects of this class represent either an {@link InputGate} or {@link OutputGate} within an {@link ExecutionGraph},

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
index aa08655..ca7eddb 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
@@ -40,17 +40,17 @@ import eu.stratosphere.nephele.instance.AllocatedResource;
 import eu.stratosphere.nephele.instance.DummyInstance;
 import eu.stratosphere.nephele.instance.InstanceManager;
 import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.gates.GateID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.jobgraph.AbstractJobInputVertex;
 import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
 import eu.stratosphere.nephele.jobgraph.JobEdge;
 import eu.stratosphere.nephele.jobgraph.JobFileOutputVertex;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.taskmanager.runtime.ExecutorThreadFactory;
+import eu.stratosphere.nephele.taskmanager.ExecutorThreadFactory;
 import eu.stratosphere.nephele.template.AbstractInputTask;
 import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.util.StringUtils;
@@ -412,9 +412,6 @@ public class ExecutionGraph implements ExecutionListener {
 
 			// First, build the group edges
 			for (int i = 0; i < sjv.getNumberOfForwardConnections(); ++i) {
-
-				final boolean isBroadcast = sgv.getEnvironment().getOutputGate(i).isBroadcast();
-
 				final JobEdge edge = sjv.getForwardConnection(i);
 				final AbstractJobVertex tjv = edge.getConnectedVertex();
 
@@ -427,13 +424,12 @@ public class ExecutionGraph implements ExecutionListener {
 					userDefinedChannelType = false;
 					channelType = ChannelType.NETWORK;
 				}
-				// Use NO_COMPRESSION as default compression level if nothing else is defined by the user
 
 				final DistributionPattern distributionPattern = edge.getDistributionPattern();
 
 				// Connect the corresponding group vertices and copy the user settings from the job edge
 				final ExecutionGroupEdge groupEdge = sgv.wireTo(tgv, edge.getIndexOfInputGate(), i, channelType,
-					userDefinedChannelType,distributionPattern, isBroadcast);
+					userDefinedChannelType,distributionPattern);
 
 				final ExecutionGate outputGate = new ExecutionGate(new GateID(), sev, groupEdge, false);
 				sev.insertOutputGate(i, outputGate);
@@ -970,7 +966,7 @@ public class ExecutionGraph implements ExecutionListener {
 
 				final ExecutionGate outputGate = sourceVertex.getOutputGate(i);
 				final ChannelType channelType = outputGate.getChannelType();
-				if (channelType == ChannelType.INMEMORY) {
+				if (channelType == ChannelType.IN_MEMORY) {
 					final int numberOfOutputChannels = outputGate.getNumberOfEdges();
 					for (int j = 0; j < numberOfOutputChannels; ++j) {
 						final ExecutionEdge outputChannel = outputGate.getEdge(j);
@@ -990,7 +986,7 @@ public class ExecutionGraph implements ExecutionListener {
 
 				final ExecutionGate inputGate = targetVertex.getInputGate(i);
 				final ChannelType channelType = inputGate.getChannelType();
-				if (channelType == ChannelType.INMEMORY) {
+				if (channelType == ChannelType.IN_MEMORY) {
 					final int numberOfInputChannels = inputGate.getNumberOfEdges();
 					for (int j = 0; j < numberOfInputChannels; ++j) {
 						final ExecutionEdge inputChannel = inputGate.getEdge(j);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupEdge.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupEdge.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupEdge.java
index 14970fd..373d120 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupEdge.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupEdge.java
@@ -13,8 +13,8 @@
 
 package eu.stratosphere.nephele.executiongraph;
 
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
 
 /**
  * An execution group edge represents an edge between two execution group vertices.
@@ -66,11 +66,6 @@ public class ExecutionGroupEdge {
 	private final DistributionPattern distributionPattern;
 
 	/**
-	 * Stores if the edge is part of a broadcast group.
-	 */
-	private final boolean isBroadcast;
-
-	/**
 	 * Constructs a new group edge.
 	 * 
 	 * @param sourceVertex
@@ -85,19 +80,12 @@ public class ExecutionGroupEdge {
 	 *        the channel type for the edge
 	 * @param userDefinedChannelType
 	 *        <code>true</code> if the channel type has been specified by the user, <code>false</code> otherwise
-	 * @param compressionLevel
-	 *        the compression level for the edge
-	 * @param userDefinedCompressionLevel
-	 *        <code>true</code> if the compression level has been specified by the user, <code>false</code> otherwise
 	 * @param distributionPattern
 	 *        the distribution pattern to create the wiring
-	 * @param isBroadcast
-	 *        indicates that the edge is part of a broadcast group
 	 */
 	public ExecutionGroupEdge(final ExecutionGroupVertex sourceVertex, final int indexOfOutputGate,
 			final ExecutionGroupVertex targetVertex, final int indexOfInputGate, final ChannelType channelType,
-			final boolean userDefinedChannelType, final DistributionPattern distributionPattern,
-			final boolean isBroadcast) {
+			final boolean userDefinedChannelType, final DistributionPattern distributionPattern) {
 		this.sourceVertex = sourceVertex;
 		this.indexOfOutputGate = indexOfOutputGate;
 		this.channelType = channelType;
@@ -105,7 +93,6 @@ public class ExecutionGroupEdge {
 		this.userDefinedChannelType = userDefinedChannelType;
 		this.targetVertex = targetVertex;
 		this.distributionPattern = distributionPattern;
-		this.isBroadcast = isBroadcast;
 	}
 
 	/**
@@ -206,13 +193,4 @@ public class ExecutionGroupEdge {
 	public DistributionPattern getDistributionPattern() {
 		return this.distributionPattern;
 	}
-
-	/**
-	 * Checks if the edge is part of a broadcast group.
-	 * 
-	 * @return <code>true</code> if the edge is part of a broadcast group, <code>false</code> otherwise
-	 */
-	public boolean isBroadcast() {
-		return this.isBroadcast;
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
index 908eb6e..89b4b6d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
@@ -13,6 +13,17 @@
 
 package eu.stratosphere.nephele.executiongraph;
 
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.core.io.InputSplit;
+import eu.stratosphere.nephele.execution.RuntimeEnvironment;
+import eu.stratosphere.nephele.instance.AllocatedResource;
+import eu.stratosphere.nephele.instance.DummyInstance;
+import eu.stratosphere.nephele.instance.InstanceType;
+import eu.stratosphere.nephele.jobgraph.JobVertexID;
+import eu.stratosphere.nephele.template.AbstractInvokable;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -23,17 +34,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.io.InputSplit;
-import eu.stratosphere.nephele.execution.RuntimeEnvironment;
-import eu.stratosphere.nephele.instance.AllocatedResource;
-import eu.stratosphere.nephele.instance.DummyInstance;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.jobgraph.JobVertexID;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-
 /**
  * An ExecutionGroupVertex is created for every JobVertex of the initial job graph. It represents a number of execution
  * vertices that originate from the same job vertex.
@@ -388,7 +388,7 @@ public final class ExecutionGroupVertex {
 	 */
 	ExecutionGroupEdge wireTo(final ExecutionGroupVertex groupVertex, final int indexOfInputGate,
 			final int indexOfOutputGate, final ChannelType channelType, final boolean userDefinedChannelType,
-			final DistributionPattern distributionPattern, final boolean isBroadcast) throws GraphConversionException {
+			final DistributionPattern distributionPattern) throws GraphConversionException {
 
 		try {
 			final ExecutionGroupEdge previousEdge = this.forwardLinks.get(indexOfOutputGate);
@@ -401,8 +401,7 @@ public final class ExecutionGroupVertex {
 		}
 
 		final ExecutionGroupEdge edge = new ExecutionGroupEdge(this, indexOfOutputGate, groupVertex, indexOfInputGate,
-			channelType, userDefinedChannelType, distributionPattern,
-			isBroadcast);
+			channelType, userDefinedChannelType, distributionPattern);
 
 		this.forwardLinks.add(edge);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java
index a251d3c..eab2375 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java
@@ -27,7 +27,7 @@ import eu.stratosphere.nephele.instance.AbstractInstance;
 import eu.stratosphere.nephele.instance.DummyInstance;
 import eu.stratosphere.nephele.instance.InstanceRequestMap;
 import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 
 /**
  * An execution stage contains all execution group vertices (and as a result all execution vertices) which
@@ -401,7 +401,7 @@ public final class ExecutionStage {
 						recurse = true;
 					}
 
-					if (channelType == ChannelType.INMEMORY
+					if (channelType == ChannelType.IN_MEMORY
 						&& !pipeline.equals(connectedVertex.getExecutionPipeline())) {
 
 						connectedVertex.setExecutionPipeline(pipeline);
@@ -432,7 +432,7 @@ public final class ExecutionStage {
 						recurse = true;
 					}
 
-					if (channelType == ChannelType.INMEMORY
+					if (channelType == ChannelType.IN_MEMORY
 						&& !pipeline.equals(connectedVertex.getExecutionPipeline())) {
 
 						connectedVertex.setExecutionPipeline(pipeline);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
index 7a9c48b..57ff073 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
@@ -35,11 +35,10 @@ import eu.stratosphere.nephele.execution.ExecutionState;
 import eu.stratosphere.nephele.execution.ExecutionStateTransition;
 import eu.stratosphere.nephele.instance.AllocatedResource;
 import eu.stratosphere.nephele.instance.AllocationID;
-import eu.stratosphere.nephele.io.GateID;
+import eu.stratosphere.runtime.io.gates.GateID;
 import eu.stratosphere.nephele.taskmanager.AbstractTaskResult;
 import eu.stratosphere.nephele.taskmanager.AbstractTaskResult.ReturnCode;
 import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
-import eu.stratosphere.nephele.taskmanager.TaskKillResult;
 import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
 import eu.stratosphere.nephele.util.AtomicEnum;
 import eu.stratosphere.nephele.util.SerializableArrayList;
@@ -692,41 +691,6 @@ public final class ExecutionVertex {
 	}
 
 	/**
-	 * Kills and removes the task represented by this vertex from the instance it is currently running on. If the
-	 * corresponding task is not in the state <code>RUNNING</code>, this call will be ignored. If the call has been
-	 * executed
-	 * successfully, the task will change the state <code>FAILED</code>.
-	 * 
-	 * @return the result of the task kill attempt
-	 */
-	public TaskKillResult killTask() {
-
-		final ExecutionState state = this.executionState.get();
-
-		if (state != ExecutionState.RUNNING) {
-			final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.ReturnCode.ILLEGAL_STATE);
-			result.setDescription("Vertex " + this.toString() + " is in state " + state);
-			return result;
-		}
-
-		final AllocatedResource ar = this.allocatedResource.get();
-
-		if (ar == null) {
-			final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.ReturnCode.NO_INSTANCE);
-			result.setDescription("Assigned instance of vertex " + this.toString() + " is null!");
-			return result;
-		}
-
-		try {
-			return ar.getInstance().killTask(this.vertexID);
-		} catch (IOException e) {
-			final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.ReturnCode.IPC_ERROR);
-			result.setDescription(StringUtils.stringifyException(e));
-			return result;
-		}
-	}
-
-	/**
 	 * Cancels and removes the task represented by this vertex
 	 * from the instance it is currently running on. If the task
 	 * is not currently running, its execution state is simply

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertexID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertexID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertexID.java
index 266b6ef..6209fde 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertexID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertexID.java
@@ -13,7 +13,7 @@
 
 package eu.stratosphere.nephele.executiongraph;
 
-import eu.stratosphere.nephele.io.AbstractID;
+import eu.stratosphere.nephele.AbstractID;
 import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java
index cd768e2..72e3651 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java
@@ -18,7 +18,7 @@ import java.util.Iterator;
 import java.util.Map;
 
 import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.managementgraph.ManagementEdge;
 import eu.stratosphere.nephele.managementgraph.ManagementEdgeID;
 import eu.stratosphere.nephele.managementgraph.ManagementGate;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
index 898f8e9..50e0e7f 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
@@ -24,13 +24,12 @@ import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileRequest
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileResponse;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheUpdate;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
 import eu.stratosphere.nephele.ipc.RPC;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.net.NetUtils;
 import eu.stratosphere.nephele.protocols.TaskOperationProtocol;
 import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
-import eu.stratosphere.nephele.taskmanager.TaskKillResult;
 import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
 import eu.stratosphere.nephele.topology.NetworkNode;
 import eu.stratosphere.nephele.topology.NetworkTopology;
@@ -96,8 +95,8 @@ public abstract class AbstractInstance extends NetworkNode {
 		if (this.taskManager == null) {
 
 			this.taskManager = RPC.getProxy(TaskOperationProtocol.class,
-				new InetSocketAddress(getInstanceConnectionInfo().getAddress(),
-					getInstanceConnectionInfo().getIPCPort()), NetUtils.getSocketFactory());
+				new InetSocketAddress(getInstanceConnectionInfo().address(),
+					getInstanceConnectionInfo().ipcPort()), NetUtils.getSocketFactory());
 		}
 
 		return this.taskManager;
@@ -206,22 +205,6 @@ public abstract class AbstractInstance extends NetworkNode {
 		return getTaskManagerProxy().cancelTask(id);
 	}
 
-	/**
-	 * Kills the task identified by the given ID at the instance's
-	 * {@link eu.stratosphere.nephele.taskmanager.TaskManager}.
-	 * 
-	 * @param id
-	 *        the ID identifying the task to be killed
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the request or receiving the response
-	 * @return the result of the kill attempt
-	 */
-	public synchronized TaskKillResult killTask(final ExecutionVertexID id) throws IOException {
-
-		return getTaskManagerProxy().killTask(id);
-	}
-
-
 	@Override
 	public boolean equals(final Object obj) {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java
index 7e8339f..3c83b80 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java
@@ -13,7 +13,7 @@
 
 package eu.stratosphere.nephele.instance;
 
-import eu.stratosphere.nephele.io.AbstractID;
+import eu.stratosphere.nephele.AbstractID;
 
 /**
  * An allocation ID unambiguously identifies the allocated resources

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java
index a0fd608..257421b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java
@@ -154,7 +154,8 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 	 * 
 	 * @return the port instance's task manager runs its IPC service on
 	 */
-	public int getIPCPort() {
+	public int ipcPort() {
+
 		return this.ipcPort;
 	}
 
@@ -163,7 +164,8 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 	 * 
 	 * @return the port instance's task manager expects to receive transfer envelopes on
 	 */
-	public int getDataPort() {
+	public int dataPort() {
+
 		return this.dataPort;
 	}
 
@@ -172,7 +174,8 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 	 * 
 	 * @return the network address the instance's task manager binds its sockets to
 	 */
-	public InetAddress getAddress() {
+	public InetAddress address() {
+
 		return this.inetAddress;
 	}
 
@@ -182,7 +185,8 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 	 * 
 	 * @return the host name of the instance
 	 */
-	public String getHostName() {
+	public String hostname() {
+
 		return this.hostName;
 	}
 
@@ -191,7 +195,8 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 	 * 
 	 * @return the domain name of the instance or <code>null</code> if the domain name could not be determined
 	 */
-	public String getDomainName() {
+	public String domainName() {
+
 		return this.domainName;
 	}
 
@@ -251,15 +256,15 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 		if (obj instanceof InstanceConnectionInfo) {
 
 			InstanceConnectionInfo ici = (InstanceConnectionInfo) obj;
-			if (!this.inetAddress.equals(ici.getAddress())) {
+			if (!this.inetAddress.equals(ici.address())) {
 				return false;
 			}
 
-			if (this.ipcPort != ici.getIPCPort()) {
+			if (this.ipcPort != ici.ipcPort()) {
 				return false;
 			}
 
-			if (this.dataPort != ici.getDataPort()) {
+			if (this.dataPort != ici.dataPort()) {
 				return false;
 			}
 
@@ -278,7 +283,9 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 
 	@Override
 	public int compareTo(final InstanceConnectionInfo o) {
-		return this.getAddress().getHostName().compareTo(((InstanceConnectionInfo) o).getAddress().getHostName());
+
+		return this.address().getHostName()
+			.compareTo(((InstanceConnectionInfo) o).address().getHostName());
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceID.java
index c93b6bd..9b5b707 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceID.java
@@ -13,7 +13,7 @@
 
 package eu.stratosphere.nephele.instance;
 
-import eu.stratosphere.nephele.io.AbstractID;
+import eu.stratosphere.nephele.AbstractID;
 
 /**
  * Class for statistically unique instance IDs.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java
index b69d23e..480e521 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java
@@ -396,14 +396,14 @@ public class ClusterManager implements InstanceManager {
 			final HardwareDescription hardwareDescription) {
 
 		// Check if there is a user-defined instance type for this IP address
-		InstanceType instanceType = this.ipToInstanceTypeMapping.get(instanceConnectionInfo.getAddress());
+		InstanceType instanceType = this.ipToInstanceTypeMapping.get(instanceConnectionInfo.address());
 		if (instanceType != null) {
 			LOG.info("Found user-defined instance type for cluster instance with IP "
-				+ instanceConnectionInfo.getAddress() + ": " + instanceType);
+				+ instanceConnectionInfo.address() + ": " + instanceType);
 		} else {
 			instanceType = matchHardwareDescriptionWithInstanceType(hardwareDescription);
 			if (instanceType != null) {
-				LOG.info("Hardware profile of cluster instance with IP " + instanceConnectionInfo.getAddress()
+				LOG.info("Hardware profile of cluster instance with IP " + instanceConnectionInfo.address()
 					+ " matches with instance type " + instanceType);
 			} else {
 				LOG.error("No matching instance type, cannot create cluster instance");
@@ -412,7 +412,7 @@ public class ClusterManager implements InstanceManager {
 		}
 
 		// Try to match new host with a stub host from the existing topology
-		String instanceName = instanceConnectionInfo.getHostName();
+		String instanceName = instanceConnectionInfo.hostname();
 		NetworkNode parentNode = this.networkTopology.getRootNode();
 		NetworkNode currentStubNode = null;
 
@@ -439,7 +439,7 @@ public class ClusterManager implements InstanceManager {
 
 		// Try to match the new host using the IP address
 		if (currentStubNode == null) {
-			instanceName = instanceConnectionInfo.getAddress().toString();
+			instanceName = instanceConnectionInfo.address().toString();
 			instanceName = instanceName.replaceAll("/", ""); // Remove any / characters
 			currentStubNode = this.networkTopology.getNodeByName(instanceName);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractGate.java
deleted file mode 100644
index c3e3697..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractGate.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.event.task.EventListener;
-import eu.stratosphere.nephele.event.task.EventNotificationManager;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * In Nephele a gate represents the connection between a user program and the processing framework. A gate
- * must be connected to exactly one record reader/writer and to at least one channel. The <code>Gate</code> class itself
- * is abstract. A gate automatically created for every record reader/writer in the user program. A gate can only be used
- * to transport one specific type of records.
- * <p>
- * This class in general is not thread-safe.
- * 
- * @param <T>
- *        the record type to be transported from this gate
- */
-public abstract class AbstractGate<T extends IOReadableWritable> implements Gate<T> {
-
-	/**
-	 * The ID of the job this gate belongs to.
-	 */
-	private final JobID jobID;
-
-	/**
-	 * The ID of this gate.
-	 */
-	private final GateID gateID;
-
-	/**
-	 * The index of the gate in the list of available input/output gates.
-	 */
-	private final int index;
-
-	/**
-	 * The event notification manager used to dispatch events.
-	 */
-	private final EventNotificationManager eventNotificationManager = new EventNotificationManager();
-
-	/**
-	 * The type of input/output channels connected to this gate.
-	 */
-	private ChannelType channelType = ChannelType.NETWORK;
-
-	/**
-	 * Constructs a new abstract gate
-	 * 
-	 * @param jobID
-	 *        the ID of the job this gate belongs to
-	 * @param gateID
-	 *        the ID of this gate
-	 * @param index
-	 *        the index of the gate in the list of available input/output gates.
-	 */
-	protected AbstractGate(final JobID jobID, final GateID gateID, final int index) {
-		this.jobID = jobID;
-		this.gateID = gateID;
-		this.index = index;
-	}
-
-
-	@Override
-	public final int getIndex() {
-		return this.index;
-	}
-
-	/**
-	 * Returns the event notification manager used to dispatch events.
-	 * 
-	 * @return the event notification manager used to dispatch events
-	 */
-	protected final EventNotificationManager getEventNotificationManager() {
-		return this.eventNotificationManager;
-	}
-
-
-	@Override
-	public String toString() {
-
-		return "Gate " + this.index;
-	}
-
-
-	@Override
-	public final void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
-
-		this.eventNotificationManager.subscribeToEvent(eventListener, eventType);
-	}
-
-
-	@Override
-	public final void unsubscribeFromEvent(final EventListener eventListener,
-			final Class<? extends AbstractTaskEvent> eventType) {
-
-		this.eventNotificationManager.unsubscribeFromEvent(eventListener, eventType);
-	}
-
-
-	@Override
-	public final void deliverEvent(final AbstractTaskEvent event) {
-
-		this.eventNotificationManager.deliverEvent((AbstractTaskEvent) event);
-	}
-
-
-	@Override
-	public final void setChannelType(final ChannelType channelType) {
-
-		this.channelType = channelType;
-	}
-
-
-	@Override
-	public final ChannelType getChannelType() {
-
-		return this.channelType;
-	}
-
-
-	@Override
-	public JobID getJobID() {
-
-		return this.jobID;
-	}
-
-
-	@Override
-	public GateID getGateID() {
-
-		return this.gateID;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractID.java
deleted file mode 100644
index 37e6cbb..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractID.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.util.StringUtils;
-
-/**
- * ID is an abstract base class for providing statistically unique identification numbers in Nephele.
- * Every component that requires these kinds of IDs provides its own concrete type.
- * <p>
- * This class is thread-safe.
- * 
- */
-public abstract class AbstractID implements IOReadableWritable {
-
-	/**
-	 * The size of a long in bytes.
-	 */
-	private static final int SIZE_OF_LONG = 8;
-
-	/**
-	 * The size of the ID in byte.
-	 */
-	protected static final int SIZE = 2 * SIZE_OF_LONG;
-
-	/**
-	 * The upper part of the actual ID.
-	 */
-	private long upperPart;
-
-	/**
-	 * The lower part of the actual ID.
-	 */
-	private long lowerPart;
-
-	/**
-	 * Constructs a new ID with a specific bytes value.
-	 */
-	public AbstractID(final byte[] bytes) {
-
-		if (bytes.length != SIZE) {
-			throw new IllegalArgumentException("Argument bytes must by an array of " + SIZE + " bytes");
-		}
-
-		this.lowerPart = byteArrayToLong(bytes, 0);
-		this.upperPart = byteArrayToLong(bytes, SIZE_OF_LONG);
-	}
-
-	/**
-	 * Constructs a new abstract ID.
-	 * 
-	 * @param lowerPart
-	 *        the lower bytes of the ID
-	 * @param upperPart
-	 *        the higher bytes of the ID
-	 */
-	protected AbstractID(final long lowerPart, final long upperPart) {
-
-		this.lowerPart = lowerPart;
-		this.upperPart = upperPart;
-	}
-
-	/**
-	 * Creates a new abstract ID from the given one. The given and the newly created abtract ID will be identical, i.e.
-	 * a comparison by <code>equals</code> will return <code>true</code> and both objects will have the same hash code.
-	 * 
-	 * @param id
-	 *        the abstract ID to copy
-	 */
-	protected AbstractID(final AbstractID id) {
-
-		this.lowerPart = id.lowerPart;
-		this.upperPart = id.upperPart;
-	}
-
-	/**	
-	 * Constructs a new random ID from a uniform distribution.
-	 */
-	public AbstractID() {
-
-		this.lowerPart = generateRandomBytes();
-		this.upperPart = generateRandomBytes();
-	}
-
-	/**
-	 * Generates a uniformly distributed random positive long.
-	 * 
-	 * @return a uniformly distributed random positive long
-	 */
-	protected static long generateRandomBytes() {
-
-		return (long) (Math.random() * Long.MAX_VALUE);
-	}
-
-	/**
-	 * Converts the given byte array to a long.
-	 * 
-	 * @param ba
-	 *        the byte array to be converted
-	 * @param offset
-	 *        the offset indicating at which byte inside the array the conversion shall begin
-	 * @return the long variable
-	 */
-	private static long byteArrayToLong(final byte[] ba, final int offset) {
-
-		long l = 0;
-
-		for (int i = 0; i < SIZE_OF_LONG; ++i) {
-			l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i << 3);
-		}
-
-		return l;
-	}
-
-	/**
-	 * Converts a long to a byte array.
-	 * 
-	 * @param l
-	 *        the long variable to be converted
-	 * @param ba
-	 *        the byte array to store the result the of the conversion
-	 * @param offset
-	 *        the offset indicating at what position inside the byte array the result of the conversion shall be stored
-	 */
-	private static void longToByteArray(final long l, final byte[] ba, final int offset) {
-
-		for (int i = 0; i < SIZE_OF_LONG; ++i) {
-			final int shift = i << 3; // i * 8
-			ba[offset + SIZE_OF_LONG - 1 - i] = (byte) ((l & (0xffL << shift)) >>> shift);
-		}
-	}
-	
-	/**
-	 * Sets an ID from another ID by copying its internal byte representation.
-	 * 
-	 * @param src
-	 *        the source ID
-	 */
-	public void setID(final AbstractID src) {
-		this.lowerPart = src.lowerPart;
-		this.upperPart = src.upperPart;
-	}
-
-
-	@Override
-	public boolean equals(final Object obj) {
-
-		if (!(obj instanceof AbstractID)) {
-			return false;
-		}
-
-		final AbstractID src = (AbstractID) obj;
-
-		if (src.lowerPart != this.lowerPart) {
-			return false;
-		}
-
-		if (src.upperPart != this.upperPart) {
-			return false;
-		}
-
-		return true;
-	}
-
-
-	@Override
-	public int hashCode() {
-
-		return (int) (this.lowerPart ^ (this.upperPart >>> 32));
-	}
-
-
-	@Override
-	public void read(final DataInput in) throws IOException {
-
-		this.lowerPart = in.readLong();
-		this.upperPart = in.readLong();
-	}
-
-
-	@Override
-	public void write(final DataOutput out) throws IOException {
-
-		out.writeLong(this.lowerPart);
-		out.writeLong(this.upperPart);
-	}
-
-
-	@Override
-	public String toString() {
-
-		final byte[] ba = new byte[SIZE];
-		longToByteArray(this.lowerPart, ba, 0);
-		longToByteArray(this.upperPart, ba, SIZE_OF_LONG);
-
-		return StringUtils.byteToHexString(ba);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractRecordReader.java
deleted file mode 100644
index 3a8ed25..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractRecordReader.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.event.task.EventListener;
-import eu.stratosphere.nephele.event.task.EventNotificationManager;
-
-/**
- * This is an abstract base class for a record reader, either dealing with mutable or immutable records,
- * and dealing with reads from single gates (single end points) or multiple gates (union).
- */
-public abstract class AbstractRecordReader implements ReaderBase {
-	
-	
-	private final EventNotificationManager eventHandler = new EventNotificationManager();
-	
-	private int numEventsUntilEndOfSuperstep = -1;
-	
-	private int endOfSuperstepEventsCount;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Subscribes the listener object to receive events of the given type.
-	 * 
-	 * @param eventListener
-	 *        the listener object to register
-	 * @param eventType
-	 *        the type of event to register the listener for
-	 */
-	@Override
-	public void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
-		this.eventHandler.subscribeToEvent(eventListener, eventType);
-	}
-
-	/**
-	 * Removes the subscription for events of the given type for the listener object.
-	 * 
-	 * @param eventListener The listener object to cancel the subscription for.
-	 * @param eventType The type of the event to cancel the subscription for.
-	 */
-	@Override
-	public void unsubscribeFromEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
-		this.eventHandler.unsubscribeFromEvent(eventListener, eventType);
-	}
-	
-	
-	protected void handleEvent(AbstractTaskEvent evt) {
-		this.eventHandler.deliverEvent(evt);
-	}
-	
-	@Override
-	public void setIterative(int numEventsUntilEndOfSuperstep) {
-		this.numEventsUntilEndOfSuperstep = numEventsUntilEndOfSuperstep;
-	}
-
-	@Override
-	public void startNextSuperstep() {
-		if (this.numEventsUntilEndOfSuperstep == -1) {
-			throw new IllegalStateException("Called 'startNextSuperstep()' in a non-iterative reader.");
-		}
-		else if (endOfSuperstepEventsCount < numEventsUntilEndOfSuperstep) {
-			throw new IllegalStateException("Premature 'startNextSuperstep()'. Not yet reached the end-of-superstep.");
-		}
-		this.endOfSuperstepEventsCount = 0;
-	}
-	
-	@Override
-	public boolean hasReachedEndOfSuperstep() {
-		return endOfSuperstepEventsCount== numEventsUntilEndOfSuperstep;
-	}
-	
-	protected boolean incrementEndOfSuperstepEventAndCheck() {
-		if (numEventsUntilEndOfSuperstep == -1) {
-			throw new IllegalStateException("Received EndOfSuperstep event in a non-iterative reader.");
-		}
-		
-		endOfSuperstepEventsCount++;
-		
-		if (endOfSuperstepEventsCount > numEventsUntilEndOfSuperstep) {
-			throw new IllegalStateException("Received EndOfSuperstep events beyond the number to indicate the end of the superstep");
-		}
-		
-		return endOfSuperstepEventsCount== numEventsUntilEndOfSuperstep;
-	}
-}