You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/10 21:35:20 UTC

[23/34] Offer buffer-oriented API for I/O (#25)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractRecordWriter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractRecordWriter.java
deleted file mode 100644
index 9d50e9c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractRecordWriter.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.event.task.EventListener;
-import eu.stratosphere.nephele.execution.Environment;
-import eu.stratosphere.nephele.io.channels.bytebuffered.EndOfSuperstepEvent;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-
-/**
- * Abstract base class for a regular record writer and broadcast record writer.
- * 
- * @param <T> The type of the record that can be emitted with this record writer.
- */
-public abstract class AbstractRecordWriter<T extends IOReadableWritable> implements Writer<T> {
-
-	/**
-	 * The output gate assigned to this record writer.
-	 */
-	private OutputGate<T> outputGate;
-
-	/**
-	 * The environment associated to this record writer.
-	 */
-	private Environment environment;
-
-	/**
-	 * Constructs a new record writer and registers a new output gate with the application's environment.
-	 * 
-	 * @param invokable
-	 *        the application that instantiated the record writer
-	 * @param outputClass
-	 *        the class of records that can be emitted with this record writer
-	 * @param selector
-	 *        the channel selector to be used to determine the output channel to be used for a record
-	 * @param isBroadcast
-	 *        <code>true</code> if this record writer shall broadcast the records to all connected channels,
-	 *        <code>false/<code> otherwise
-	 */
-	public AbstractRecordWriter(AbstractInvokable invokable, Class<T> outputClass, ChannelSelector<T> selector, boolean isBroadcast) {
-		this.environment = invokable.getEnvironment();
-		connectOutputGate(outputClass, selector, isBroadcast);
-	}
-
-	/**
-	 * Connects a record writer to an output gate.
-	 * 
-	 * @param outputClass
-	 *        the class of the record that can be emitted with this record writer
-	 * @param selector
-	 *        the channel selector to be used to determine the output channel to be used for a record
-	 * @param isBroadcast
-	 *        <code>true</code> if this record writer shall broadcast the records to all connected channels,
-	 *        <code>false/<code> otherwise
-	 */
-	private void connectOutputGate(Class<T> outputClass, ChannelSelector<T> selector, boolean isBroadcast)
-	{
-		GateID gateID = this.environment.getNextUnboundOutputGateID();
-		if (gateID == null) {
-			gateID = new GateID();
-		}
-
-		this.outputGate = this.environment.createOutputGate(gateID, outputClass, selector, isBroadcast);
-		this.environment.registerOutputGate(this.outputGate);
-	}
-
-	/**
-	 * This method emits a record to the corresponding output gate. The method may block
-	 * until the record was transfered via any of the connected channels.
-	 * 
-	 * @param record
-	 *        The record to be emitted.
-	 * @throws IOException
-	 *         Thrown on an error that may happen during the transfer of the given record or a previous record.
-	 */
-	public void emit(final T record) throws IOException, InterruptedException {
-		this.outputGate.writeRecord(record);
-	}
-
-	/**
-	 * Subscribes the listener object to receive events of the given type.
-	 * 
-	 * @param eventListener
-	 *        the listener object to register
-	 * @param eventType
-	 *        the type of event to register the listener for
-	 */
-	public void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
-		this.outputGate.subscribeToEvent(eventListener, eventType);
-	}
-
-	/**
-	 * Removes the subscription for events of the given type for the listener object.
-	 * 
-	 * @param eventListener
-	 *        the listener object to cancel the subscription for
-	 * @param eventType
-	 *        the type of the event to cancel the subscription for
-	 */
-	public void unsubscribeFromEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
-		this.outputGate.unsubscribeFromEvent(eventListener, eventType);
-	}
-
-	/**
-	 * Publishes an event.
-	 * 
-	 * @param event
-	 *        the event to be published
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the event
-	 * @throws InterruptedException
-	 *         thrown if the thread is interrupted while waiting for the event to be published
-	 */
-	public void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException {
-		this.outputGate.publishEvent(event);
-	}
-
-	public void flush() throws IOException, InterruptedException {
-		this.outputGate.flush();
-	}
-	
-	public void sendEndOfSuperstep() throws IOException, InterruptedException {
-		this.outputGate.publishEvent(EndOfSuperstepEvent.INSTANCE);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractSingleGateRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractSingleGateRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractSingleGateRecordReader.java
deleted file mode 100644
index 8def532..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractSingleGateRecordReader.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.execution.Environment;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-
-/**
- * This is an abstract base class for a record reader, either dealing with mutable or immutable records.
- * 
- * @param <T> The type of the record that can be read from this record reader.
- */
-public abstract class AbstractSingleGateRecordReader<T extends IOReadableWritable> extends AbstractRecordReader {
-	
-	/**
-	 * The input gate associated with the record reader.
-	 */
-	protected final InputGate<T> inputGate;
-	
-	// --------------------------------------------------------------------------------------------
-
-	protected AbstractSingleGateRecordReader(AbstractInvokable invokable, RecordDeserializerFactory<T> deserializerFactory, int inputGateID) {
-		Environment environment = invokable.getEnvironment();
-		GateID gateID = environment.getNextUnboundInputGateID();
-		if (gateID == null) {
-			gateID = new GateID();
-		}
-
-		this.inputGate = environment.createInputGate(gateID, deserializerFactory);
-		environment.registerInputGate(this.inputGate);
-	}
-
-	/**
-	 * Returns the number of input channels wired to this reader's input gate.
-	 * 
-	 * @return the number of input channels wired to this reader's input gate
-	 */
-	public int getNumberOfInputChannels() {
-		return this.inputGate.getNumberOfInputChannels();
-	}
-
-	/**
-	 * Publishes an event.
-	 * 
-	 * @param event
-	 *        the event to be published
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the event
-	 * @throws InterruptedException
-	 *         thrown if the thread is interrupted while waiting for the event to be published
-	 */
-	@Override
-	public void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException {
-		// Delegate call to input gate to send events
-		this.inputGate.publishEvent(event);
-	}
-	
-	
-	InputGate<T> getInputGate() {
-		return this.inputGate;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractUnionRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractUnionRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractUnionRecordReader.java
deleted file mode 100644
index 5de3f67..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractUnionRecordReader.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.HashSet;
-import java.util.Set;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-
-public abstract class AbstractUnionRecordReader<T extends IOReadableWritable> extends AbstractRecordReader implements RecordAvailabilityListener<T> {
-
-	/**
-	 * The set of all input gates.
-	 */
-	private final InputGate<T>[] allInputGates;
-	
-	/**
-	 * The set of unclosed input gates.
-	 */
-	private final Set<InputGate<T>> remainingInputGates;
-
-	/**
-	 * Queue with indices of channels that store at least one available record.
-	 */
-	private final ArrayDeque<InputGate<T>> availableInputGates = new ArrayDeque<InputGate<T>>();
-	
-	/**
-	 * The next input gate to read a record from.
-	 */
-	private InputGate<T> nextInputGateToReadFrom;
-
-	
-	@Override
-	public boolean isInputClosed() {
-		return this.remainingInputGates.isEmpty();
-	}
-	
-	/**
-	 * Constructs a new mutable union record reader.
-	 * 
-	 * @param recordReaders
-	 *        the individual mutable record readers whose input is used to construct the union
-	 */
-	@SuppressWarnings("unchecked")
-	protected AbstractUnionRecordReader(MutableRecordReader<T>[] recordReaders) {
-
-		if (recordReaders == null) {
-			throw new IllegalArgumentException("Provided argument recordReaders is null");
-		}
-
-		if (recordReaders.length < 2) {
-			throw new IllegalArgumentException(
-				"The mutable union record reader must at least be initialized with two individual mutable record readers");
-		}
-		
-		this.allInputGates = new InputGate[recordReaders.length];
-		this.remainingInputGates = new HashSet<InputGate<T>>((int) (recordReaders.length * 1.6f));
-		
-		for (int i = 0; i < recordReaders.length; i++) {
-			InputGate<T> inputGate = recordReaders[i].getInputGate();
-			inputGate.registerRecordAvailabilityListener(this);
-			this.allInputGates[i] = inputGate;
-			this.remainingInputGates.add(inputGate);
-		}
-	}
-	
-	
-	@Override
-	public void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException {
-		for (InputGate<T> gate : this.allInputGates) {
-			gate.publishEvent(event);
-		}
-	}
-	
-	@Override
-	public void reportRecordAvailability(InputGate<T> inputGate) {
-		synchronized (this.availableInputGates) {
-			this.availableInputGates.add(inputGate);
-			this.availableInputGates.notifyAll();
-		}
-	}
-	
-	protected boolean getNextRecord(T target) throws IOException, InterruptedException {
-
-		while (true) {
-			// has the current input gate more data?
-			if (this.nextInputGateToReadFrom == null) {
-				if (this.remainingInputGates.isEmpty()) {
-					return false;
-				}
-				
-				this.nextInputGateToReadFrom = getNextAvailableInputGate();
-			}
-
-			InputChannelResult result = this.nextInputGateToReadFrom.readRecord(target);
-			switch (result) {
-				case INTERMEDIATE_RECORD_FROM_BUFFER: // record is available and we can stay on the same channel
-					return true;
-					
-				case LAST_RECORD_FROM_BUFFER: // record is available, but we need to re-check the channels
-					this.nextInputGateToReadFrom = null;
-					return true;
-					
-				case END_OF_SUPERSTEP:
-					this.nextInputGateToReadFrom = null;
-					if (incrementEndOfSuperstepEventAndCheck()) {
-						return false; // end of the superstep
-					}
-					else {
-						break; // fall through and wait for next record/event
-					}
-					
-				case TASK_EVENT:	// event for the subscribers is available
-					handleEvent(this.nextInputGateToReadFrom.getCurrentEvent());
-					this.nextInputGateToReadFrom = null;
-					break;
-					
-				case END_OF_STREAM: // one gate is empty
-					this.remainingInputGates.remove(this.nextInputGateToReadFrom);
-					this.nextInputGateToReadFrom = null;
-					break;
-					
-				case NONE: // gate processed an internal event and could not return a record on this call
-					this.nextInputGateToReadFrom = null;
-					break;
-			}
-		}
-	}
-	
-	private InputGate<T> getNextAvailableInputGate() throws InterruptedException {
-		synchronized (this.availableInputGates) {
-			while (this.availableInputGates.isEmpty()) {
-				this.availableInputGates.wait();
-			}
-			return this.availableInputGates.pop();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/BroadcastRecordWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/BroadcastRecordWriter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/BroadcastRecordWriter.java
deleted file mode 100644
index f2b1141..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/BroadcastRecordWriter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.template.AbstractInputTask;
-import eu.stratosphere.nephele.template.AbstractTask;
-
-/**
- * A record writer connects the application to an output gate. It allows the application
- * of emit (send out) to the output gate. The broadcast record writer will make sure that each emitted record will be
- * transfered via all connected output channels.
- * 
- * @param <T>
- *        the type of the record that can be emitted with this record writer
- */
-public class BroadcastRecordWriter<T extends IOReadableWritable> extends AbstractRecordWriter<T> {
-
-	/**
-	 * Constructs a new broadcast record writer and registers a new output gate with the application's environment.
-	 * 
-	 * @param taskBase
-	 *        the application that instantiated the record writer
-	 * @param outputClass
-	 *        the class of records that can be emitted with this record writer
-	 */
-	public BroadcastRecordWriter(AbstractTask taskBase, Class<T> outputClass) {
-		super(taskBase, outputClass, null, true);
-	}
-
-	/**
-	 * Constructs a new broadcast record writer and registers a new output gate with the application's environment.
-	 * 
-	 * @param inputBase
-	 *        the application that instantiated the record writer
-	 * @param outputClass
-	 *        the class of records that can be emitted with this record writer
-	 */
-	public BroadcastRecordWriter(AbstractInputTask<?> inputBase, Class<T> outputClass) {
-		super(inputBase, outputClass, null, true);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ChannelSelector.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ChannelSelector.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ChannelSelector.java
deleted file mode 100644
index f7a7e9b..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ChannelSelector.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-/**
- * Objects implementing this interface are passed to an {@link OutputGate}. When a record is sent through the output
- * gate, the channel selector object is called to determine to which {@link AbstractOutputChannel} objects the record
- * shall be passed on.
- * 
- * @param <T>
- *        the type of record which is sent through the attached output gate
- */
-public interface ChannelSelector<T extends IOReadableWritable> {
-
-	/**
-	 * Called to determine to which attached {@link AbstractOutputChannel} objects the given record shall be forwarded.
-	 * 
-	 * @param record
-	 *        the record to the determine the output channels for
-	 * @param numberOfOutputChannels
-	 *        the total number of output channels which are attached to respective output gate
-	 * @return a (possibly empty) array of integer numbers which indicate the indices of the output channels through
-	 *         which the record shall be forwarded
-	 */
-	int[] selectChannels(T record, int numberOfOutputChannels);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DataOutputBuffer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DataOutputBuffer.java
deleted file mode 100644
index 171e985..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DataOutputBuffer.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-/**
- * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
- * additional information regarding copyright ownership. 
- */
-
-package eu.stratosphere.nephele.io;
-
-import java.io.DataInput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-/**
- * A reusable {@link DataOutput} implementation that writes to an in-memory
- * buffer.
- * <p>
- * This saves memory over creating a new DataOutputStream and ByteArrayOutputStream each time data is written.
- * <p>
- * Typical usage is something like the following:
- * 
- * <pre>
- * 
- * DataOutputBuffer buffer = new DataOutputBuffer();
- * while (... loop condition ...) {
- *   buffer.reset();
- *   ... write buffer using DataOutput methods ...
- *   byte[] data = buffer.getData();
- *   int dataLength = buffer.getLength();
- *   ... write data to its ultimate destination ...
- * }
- * </pre>
- */
-public class DataOutputBuffer extends DataOutputStream {
-
-	private static class ByteBufferedOutputStream extends OutputStream {
-
-		private ByteBuffer buf;
-
-		public ByteBuffer getData() {
-			return this.buf;
-		}
-
-		public int getLength() {
-			return this.buf.limit();
-		}
-
-		public ByteBufferedOutputStream() {
-			this(1024);
-		}
-
-		public ByteBufferedOutputStream(int size) {
-			this.buf = ByteBuffer.allocate(size);
-			this.buf.position(0);
-			this.buf.limit(0);
-		}
-
-		public void reset() {
-			this.buf.position(0);
-			this.buf.limit(0);
-		}
-
-		public void write(DataInput in, int len) throws IOException {
-
-			final int newcount = this.buf.limit() + len;
-			if (newcount > this.buf.capacity()) {
-				final ByteBuffer newBuf = ByteBuffer.allocate(Math.max(this.buf.capacity() << 1, newcount));
-				newBuf.position(0);
-				System.arraycopy(this.buf.array(), 0, newBuf.array(), 0, this.buf.limit());
-				newBuf.limit(this.buf.limit());
-				this.buf = newBuf;
-			}
-
-			in.readFully(this.buf.array(), this.buf.limit(), len);
-			this.buf.limit(newcount);
-		}
-
-		@Override
-		public void write(byte[] b, int off, int len) throws IOException {
-
-			final int newcount = this.buf.limit() + len;
-			if (newcount > this.buf.capacity()) {
-				increaseInternalBuffer(newcount);
-			}
-
-			System.arraycopy(b, off, this.buf.array(), this.buf.limit(), len);
-			this.buf.limit(newcount);
-		}
-
-		@Override
-		public void write(byte[] b) throws IOException {
-			write(b, 0, b.length);
-		}
-
-		@Override
-		public void write(int arg0) throws IOException {
-
-			final int oldLimit = this.buf.limit();
-			final int newLimit = oldLimit + 1;
-
-			if (newLimit > this.buf.capacity()) {
-				increaseInternalBuffer(newLimit);
-			}
-
-			this.buf.limit(newLimit);
-			this.buf.put(oldLimit, (byte) arg0);
-		}
-
-		private void increaseInternalBuffer(int minimumRequiredSize) {
-			final ByteBuffer newBuf = ByteBuffer.allocate(Math.max(this.buf.capacity() << 1, minimumRequiredSize));
-			newBuf.position(0);
-			System.arraycopy(this.buf.array(), 0, newBuf.array(), 0, this.buf.limit());
-			newBuf.limit(this.buf.limit());
-			this.buf = newBuf;
-		}
-	}
-
-	private final ByteBufferedOutputStream byteBufferedOutputStream;
-
-	/** Constructs a new empty buffer. */
-	public DataOutputBuffer() {
-		this(new ByteBufferedOutputStream());
-	}
-
-	public DataOutputBuffer(int size) {
-		this(new ByteBufferedOutputStream(size));
-	}
-
-	private DataOutputBuffer(ByteBufferedOutputStream byteBufferedOutputStream) {
-		super(byteBufferedOutputStream);
-		this.byteBufferedOutputStream = byteBufferedOutputStream;
-	}
-
-	public ByteBuffer getData() {
-		return this.byteBufferedOutputStream.getData();
-	}
-
-	public int getLength() {
-		return this.byteBufferedOutputStream.getLength();
-	}
-
-	/** Resets the buffer to empty. */
-	public DataOutputBuffer reset() {
-		this.byteBufferedOutputStream.reset();
-		return this;
-	}
-
-	public void write(DataInput in, int length) throws IOException {
-		this.byteBufferedOutputStream.write(in, length);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DefaultChannelSelector.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DefaultChannelSelector.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DefaultChannelSelector.java
deleted file mode 100644
index cabc208..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DefaultChannelSelector.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-/**
- * This is the default implementation of the {@link ChannelSelector} interface. It represents a simple round-robin
- * strategy, i.e. regardless of the record every attached exactly one output channel is selected at a time.
-
- * @param <T>
- *        the type of record which is sent through the attached output gate
- */
-public class DefaultChannelSelector<T extends IOReadableWritable> implements ChannelSelector<T> {
-
-	/**
-	 * Stores the index of the channel to send the next record to.
-	 */
-	private final int[] nextChannelToSendTo = new int[1];
-
-	/**
-	 * Constructs a new default channel selector.
-	 */
-	public DefaultChannelSelector() {
-		this.nextChannelToSendTo[0] = 0;
-	}
-
-
-	@Override
-	public int[] selectChannels(final T record, final int numberOfOutpuChannels) {
-
-		this.nextChannelToSendTo[0] = (this.nextChannelToSendTo[0] + 1) % numberOfOutpuChannels;
-
-		return this.nextChannelToSendTo;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DistributionPattern.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DistributionPattern.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DistributionPattern.java
deleted file mode 100644
index 634fbcc..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DistributionPattern.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-/**
- * A distribution pattern determines which subtasks of a producing Nephele task a wired to which
- * subtasks of a consuming subtask.
- * 
- */
-
-public enum DistributionPattern {
-
-	/**
-	 * Each subtask of the producing Nephele task is wired to each subtask of the consuming Nephele task.
-	 */
-	BIPARTITE,
-
-	/**
-	 * The i-th subtask of the producing Nephele task is wired to the i-th subtask of the consuming Nephele task.
-	 */
-	POINTWISE
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Gate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Gate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Gate.java
deleted file mode 100644
index 26773ca..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Gate.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.event.task.EventListener;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-public interface Gate<T extends IOReadableWritable> {
-
-	/**
-	 * Returns the index that has been assigned to the gate upon initialization.
-	 * 
-	 * @return the index that has been assigned to the gate upon initialization.
-	 */
-	int getIndex();
-
-	/**
-	 * Subscribes the listener object to receive events of the given type.
-	 * 
-	 * @param eventListener
-	 *        the listener object to register
-	 * @param eventType
-	 *        the type of event to register the listener for
-	 */
-	void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType);
-
-	/**
-	 * Removes the subscription for events of the given type for the listener object.
-	 * 
-	 * @param eventListener
-	 *        the listener object to cancel the subscription for
-	 * @param eventType
-	 *        the type of the event to cancel the subscription for
-	 */
-	void unsubscribeFromEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType);
-
-	/**
-	 * Publishes an event.
-	 * 
-	 * @param event
-	 *        the event to be published
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the event
-	 * @throws InterruptedException
-	 *         thrown if the thread is interrupted while waiting for the event to be published
-	 */
-	void publishEvent(AbstractEvent event) throws IOException, InterruptedException;
-
-	/**
-	 * Passes a received event on to the event notification manager so it cam ne dispatched.
-	 * 
-	 * @param event
-	 *        the event to pass on to the notification manager
-	 */
-	void deliverEvent(AbstractTaskEvent event);
-
-	/**
-	 * Returns the ID of the job this gate belongs to.
-	 * 
-	 * @return the ID of the job this gate belongs to
-	 */
-	JobID getJobID();
-
-	/**
-	 * Returns the type of the input/output channels which are connected to this gate.
-	 * 
-	 * @return the type of input/output channels which are connected to this gate
-	 */
-	ChannelType getChannelType();
-
-	/**
-	 * Returns the ID of the gate.
-	 * 
-	 * @return the ID of the gate
-	 */
-	GateID getGateID();
-
-	/**
-	 * Releases the allocated resources (particularly buffer) of all channels attached to this gate. This method
-	 * should only be called after the respected task has stopped running.
-	 */
-	void releaseAllChannelResources();
-
-	/**
-	 * Checks if the gate is closed. The gate is closed if all this associated channels are closed.
-	 * 
-	 * @return <code>true</code> if the gate is closed, <code>false</code> otherwise
-	 * @throws IOException
-	 *         thrown if any error occurred while closing the gate
-	 * @throws InterruptedException
-	 *         thrown if the gate is interrupted while waiting for this operation to complete
-	 */
-	boolean isClosed() throws IOException, InterruptedException;
-
-	/**
-	 * Checks if the considered gate is an input gate.
-	 * 
-	 * @return <code>true</code> if the considered gate is an input gate, <code>false</code> if it is an output gate
-	 */
-	boolean isInputGate();
-
-	/**
-	 * Sets the type of the input/output channels which are connected to this gate.
-	 * 
-	 * @param channelType
-	 *        the type of input/output channels which are connected to this gate
-	 */
-	void setChannelType(ChannelType channelType);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/GateID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/GateID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/GateID.java
deleted file mode 100644
index 9998916..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/GateID.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-/**
- * A class for statistically unique gate IDs.
- * 
- */
-public final class GateID extends AbstractID {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ImmutableRecordDeserializerFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ImmutableRecordDeserializerFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ImmutableRecordDeserializerFactory.java
deleted file mode 100644
index be0a60f..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ImmutableRecordDeserializerFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.channels.DefaultDeserializer;
-
-/**
- * As simple factory implementation that instantiates deserializers for immutable records. For
- * each deserialization, a new record is instantiated from the given class.
- */
-public class ImmutableRecordDeserializerFactory<T extends IOReadableWritable> implements RecordDeserializerFactory<T> {
-	
-	private final Class<? extends T> recordType;			// the type of the record to be deserialized
-	
-	
-	/**
-	 * Creates a new factory that instantiates deserializers for immutable records.
-	 * 
-	 * @param recordType The type of the record to be deserialized.
-	 */
-	public ImmutableRecordDeserializerFactory(final Class<? extends T> recordType) {
-		this.recordType = recordType;
-	}
-
-	@Override
-	public RecordDeserializer<T> createDeserializer() {
-		return new DefaultDeserializer<T>(this.recordType);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/InputChannelResult.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/InputChannelResult.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/InputChannelResult.java
deleted file mode 100644
index f154a3f..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/InputChannelResult.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-package eu.stratosphere.nephele.io;
-
-public enum InputChannelResult {
-
-	NONE,
-	INTERMEDIATE_RECORD_FROM_BUFFER,
-	LAST_RECORD_FROM_BUFFER,
-	END_OF_SUPERSTEP,
-	TASK_EVENT,
-	END_OF_STREAM;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/InputGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/InputGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/InputGate.java
deleted file mode 100644
index 6a57756..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/InputGate.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.io.channels.AbstractInputChannel;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.bytebuffered.InMemoryInputChannel;
-import eu.stratosphere.nephele.io.channels.bytebuffered.NetworkInputChannel;
-
-/**
- * @param <T> The type of record that can be transported through this gate.
- */
-public interface InputGate<T extends IOReadableWritable> extends Gate<T> {
-
-	/**
-	 * Reads a record from one of the associated input channels. Channels are read such that one buffer from a channel is 
-	 * consecutively consumed. The buffers in turn are consumed in the order in which they arrive.
-	 * Note that this method is not guaranteed to return a record, because the currently available channel data may not always
-	 * constitute an entire record, when events or partial records are part of the data.
-	 * 
-	 * When called even though no data is available, this call will block until data is available, so this method should be called
-	 * when waiting is desired (such as when synchronously consuming a single gate) or only when it is known that data is available
-	 * (such as when reading a union of multiple input gates).
-	 * 
-	 * @param target The record object into which to construct the complete record.
-	 * @return The result indicating whether a complete record is available, a event is available, only incomplete data
-	 *         is available (NONE), or the gate is exhausted.
-	 * @throws IOException Thrown when an error occurred in the network stack relating to this channel.
-	 * @throws InterruptedException Thrown, when the thread working on this channel is interrupted.
-	 */
-	InputChannelResult readRecord(T target) throws IOException, InterruptedException;
-
-	/**
-	 * Returns the number of input channels associated with this input gate.
-	 * 
-	 * @return the number of input channels associated with this input gate
-	 */
-	int getNumberOfInputChannels();
-
-	/**
-	 * Returns the input channel from position <code>pos</code> of the gate's internal channel list.
-	 * 
-	 * @param pos
-	 *        the position to retrieve the channel from
-	 * @return the channel from the given position or <code>null</code> if such position does not exist.
-	 */
-	AbstractInputChannel<T> getInputChannel(int pos);
-
-	/**
-	 * Notify the gate that the channel with the given index has
-	 * at least one record available.
-	 * 
-	 * @param channelIndex
-	 *        the index of the channel which has at least one record available
-	 */
-	void notifyRecordIsAvailable(int channelIndex);
-
-	/**
-	 * Notify the gate that is has consumed a data unit from the channel with the given index
-	 * 
-	 * @param channelIndex
-	 *        the index of the channel from which a data unit has been consumed
-	 */
-	void notifyDataUnitConsumed(int channelIndex);
-
-	/**
-	 * Immediately closes the input gate and all its input channels. The corresponding
-	 * output channels are notified. Any remaining records in any buffers or queue is considered
-	 * irrelevant and is discarded.
-	 * 
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while closing the gate
-	 * @throws InterruptedException
-	 *         thrown if the thread is interrupted while waiting for the gate to be closed
-	 */
-	void close() throws IOException, InterruptedException;
-
-	/**
-	 * Creates a new network input channel and assigns it to the given input gate.
-	 * 
-	 * @param inputGate
-	 *        the input gate the channel shall be assigned to
-	 * @param channelID
-	 *        the ID of the channel
-	 * @param connectedChannelID
-	 *        the ID of the channel this channel is connected to
-	 * @param compressionLevel
-	 *        the level of compression to be used for this channel
-	 * @return the new network input channel
-	 */
-	NetworkInputChannel<T> createNetworkInputChannel(InputGate<T> inputGate, ChannelID channelID,
-			ChannelID connectedChannelID);
-
-
-	/**
-	 * Creates a new in-memory input channel and assigns it to the given input gate.
-	 * 
-	 * @param inputGate
-	 *        the input gate the channel shall be assigned to
-	 * @param channelID
-	 *        the ID of the channel
-	 * @param connectedChannelID
-	 *        the ID of the channel this channel is connected to
-	 * @param compressionLevel
-	 *        the level of compression to be used for this channel
-	 * @return the new in-memory input channel
-	 */
-	InMemoryInputChannel<T> createInMemoryInputChannel(InputGate<T> inputGate, ChannelID channelID,
-			ChannelID connectedChannelID);
-
-	/**
-	 * Registers a {@link RecordAvailabilityListener} with this input gate.
-	 * 
-	 * @param listener
-	 *        the listener object to be registered
-	 */
-	void registerRecordAvailabilityListener(RecordAvailabilityListener<T> listener);
-	
-	
-	AbstractTaskEvent getCurrentEvent();
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableReader.java
deleted file mode 100644
index ef1080c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableReader.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-/**
- * 
- */
-public interface MutableReader<T extends IOReadableWritable> extends ReaderBase {
-	
-	/**
-	 * @param target
-	 * @return
-	 * @throws IOException
-	 * @throws InterruptedException
-	 */
-	boolean next(T target) throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableRecordDeserializerFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableRecordDeserializerFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableRecordDeserializerFactory.java
deleted file mode 100644
index 5890562..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableRecordDeserializerFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.channels.DefaultDeserializer;
-
-/**
- * As simple factory implementation that instantiates deserializers for mutable records.
- */
-public class MutableRecordDeserializerFactory<T extends IOReadableWritable> implements RecordDeserializerFactory<T> {
-	
-	/**
-	 * Creates a new factory that instantiates deserializers for immutable records.
-	 * 
-	 * @param recordType The type of the record to be deserialized.
-	 */
-	public MutableRecordDeserializerFactory() {}
-
-	@Override
-	public RecordDeserializer<T> createDeserializer() {
-		return new DefaultDeserializer<T>(null);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private static final RecordDeserializerFactory<IOReadableWritable> INSTANCE = 
-									new MutableRecordDeserializerFactory<IOReadableWritable>();
-	
-	/**
-	 * Gets the singleton instance of the {@code MutableRecordDeserializerFactory}.
-	 * 
-	 * @param <E> The generic type of the record to be deserialized.
-	 * @return An instance of the factory.
-	 */
-	public static final <E extends IOReadableWritable> RecordDeserializerFactory<E> get() {
-		@SuppressWarnings("unchecked")
-		RecordDeserializerFactory<E> toReturn = (RecordDeserializerFactory<E>) INSTANCE;
-		return toReturn;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableRecordReader.java
deleted file mode 100644
index 23c26c4..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableRecordReader.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
-import eu.stratosphere.nephele.template.AbstractTask;
-
-public class MutableRecordReader<T extends IOReadableWritable> extends AbstractSingleGateRecordReader<T> implements MutableReader<T> {
-	
-	private boolean endOfStream;
-	
-	
-	/**
-	 * Constructs a new mutable record reader and registers a new input gate with the application's environment.
-	 * 
-	 * @param taskBase The application that instantiated the record reader.
-	 */
-	public MutableRecordReader(final AbstractTask taskBase) {
-		super(taskBase, MutableRecordDeserializerFactory.<T>get(), 0);
-	}
-
-	/**
-	 * Constructs a new record reader and registers a new input gate with the application's environment.
-	 * 
-	 * @param outputBase The application that instantiated the record reader.
-	 */
-	public MutableRecordReader(final AbstractOutputTask outputBase) {
-		super(outputBase, MutableRecordDeserializerFactory.<T>get(), 0);
-	}
-
-	/**
-	 * Constructs a new record reader and registers a new input gate with the application's environment.
-	 * 
-	 * @param taskBase
-	 *        the application that instantiated the record reader
-	 * @param inputGateID
-	 *        The ID of the input gate that the reader reads from.
-	 */
-	public MutableRecordReader(final AbstractTask taskBase, final int inputGateID) {
-		super(taskBase, MutableRecordDeserializerFactory.<T>get(), inputGateID);
-	}
-
-	/**
-	 * Constructs a new record reader and registers a new input gate with the application's environment.
-	 * 
-	 * @param outputBase
-	 *        the application that instantiated the record reader
-	 * @param inputGateID
-	 *        The ID of the input gate that the reader reads from.
-	 */
-	public MutableRecordReader(final AbstractOutputTask outputBase, final int inputGateID) {
-		super(outputBase, MutableRecordDeserializerFactory.<T>get(), inputGateID);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean next(final T target) throws IOException, InterruptedException {
-		if (this.endOfStream) {
-			return false;
-			
-		}
-		while (true) {
-			InputChannelResult result = this.inputGate.readRecord(target);
-			switch (result) {
-				case INTERMEDIATE_RECORD_FROM_BUFFER:
-				case LAST_RECORD_FROM_BUFFER:
-					return true;
-					
-				case END_OF_SUPERSTEP:
-					if (incrementEndOfSuperstepEventAndCheck()) {
-						return false; // end of the superstep
-					}
-					else {
-						break; // fall through and wait for next record/event
-					}
-					
-				case TASK_EVENT:
-					handleEvent(this.inputGate.getCurrentEvent());
-					break;	// fall through to get next record
-				
-				case END_OF_STREAM:
-					this.endOfStream = true;
-					return false;
-					
-				default:
-					; // fall through to get next record
-			}
-		}
-	}
-	
-	@Override
-	public boolean isInputClosed() {
-		return this.endOfStream;
-	}
-
-	@Override
-	public void setIterative(int numEventsUntilEndOfSuperstep) {
-		// sanity check for debug purposes
-		if (numEventsUntilEndOfSuperstep != getNumberOfInputChannels()) {
-			throw new IllegalArgumentException("Number of events till end of superstep is different from the number of input channels.");
-		}
-		super.setIterative(numEventsUntilEndOfSuperstep);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableUnionRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableUnionRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableUnionRecordReader.java
deleted file mode 100644
index 1fc8dd2..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableUnionRecordReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-public class MutableUnionRecordReader<T extends IOReadableWritable> extends AbstractUnionRecordReader<T> implements MutableReader<T> {
-
-	
-	/**
-	 * Constructs a new mutable union record reader.
-	 * 
-	 * @param recordReaders
-	 *        the individual mutable record readers whose input is used to construct the union
-	 */
-	public MutableUnionRecordReader(MutableRecordReader<T>[] recordReaders) {
-		super(recordReaders);
-	}
-
-	@Override
-	public boolean next(T target) throws IOException, InterruptedException {
-		return getNextRecord(target);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/OutputGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/OutputGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/OutputGate.java
deleted file mode 100644
index 0bbe5e0..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/OutputGate.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-import java.util.List;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.channels.AbstractOutputChannel;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.bytebuffered.InMemoryOutputChannel;
-import eu.stratosphere.nephele.io.channels.bytebuffered.NetworkOutputChannel;
-
-/**
- * In Nephele output gates are a specialization of general gates and connect
- * record writers and output channels. As channels, output gates are always
- * parameterized to a specific type of record which they can transport.
- * 
- * @param <T>
- *        the type of record that can be transported through this gate
- */
-public interface OutputGate<T extends IOReadableWritable> extends Gate<T> {
-
-	/**
-	 * Returns the type of record that can be transported through this gate.
-	 * 
-	 * @return the type of record that can be transported through this gate
-	 */
-	Class<T> getType();
-
-	/**
-	 * Writes a record to one of the associated output channels. Currently, the
-	 * channels are chosen in a simple round-robin fashion. This operation may
-	 * block until the respective channel has received the data.
-	 * 
-	 * @param record
-	 *        the record to be written
-	 * @throws IOException
-	 *         thrown if any error occurs during channel I/O
-	 */
-	void writeRecord(T record) throws IOException, InterruptedException;
-
-	/**
-	 * Returns all the OutputChannels connected to this gate
-	 * 
-	 * @return the list of OutputChannels connected to this RecordWriter
-	 */
-	List<AbstractOutputChannel<T>> getOutputChannels();
-
-	/**
-	 * Flushes all connected output channels.
-	 * 
-	 * @throws IOException
-	 *         thrown if an error occurs while flushing an output channel
-	 * @throws InterruptedException
-	 *         thrown if the thread is interrupted while waiting for the data to be flushed
-	 */
-	void flush() throws IOException, InterruptedException;
-
-	/**
-	 * Checks if this output gate operates in broadcast mode, i.e. all records passed to it are transferred through all
-	 * connected output channels.
-	 * 
-	 * @return <code>true</code> if this output gate operates in broadcast mode, <code>false</code> otherwise
-	 */
-	boolean isBroadcast();
-
-	/**
-	 * Returns the number of output channels associated with this output gate.
-	 * 
-	 * @return the number of output channels associated with this output gate
-	 */
-	int getNumberOfOutputChannels();
-
-	/**
-	 * Returns the output channel from position <code>pos</code> of the gate's
-	 * internal channel list.
-	 * 
-	 * @param pos
-	 *        the position to retrieve the channel from
-	 * @return the channel from the given position or <code>null</code> if such
-	 *         position does not exist.
-	 */
-	AbstractOutputChannel<T> getOutputChannel(int pos);
-
-	/**
-	 * Returns the output gate's channel selector.
-	 * 
-	 * @return the output gate's channel selector or <code>null</code> if the gate operates in broadcast mode
-	 */
-	ChannelSelector<T> getChannelSelector();
-
-	/**
-	 * Requests the output gate to closed. This means the application will send
-	 * no records through this gate anymore.
-	 * 
-	 * @throws IOException
-	 * @throws InterruptedException
-	 */
-	void requestClose() throws IOException, InterruptedException;
-
-	/**
-	 * Removes all output channels from the output gate.
-	 */
-	void removeAllOutputChannels();
-
-	/**
-	 * Creates a new network output channel and assigns it to the given output gate.
-	 * 
-	 * @param outputGate
-	 *        the output gate the channel shall be assigned to
-	 * @param channelID
-	 *        the ID of the channel
-	 * @param connectedChannelID
-	 *        the ID of the channel this channel is connected to
-	 * @param compressionLevel
-	 *        the level of compression to be used for this channel
-	 * @return the new network output channel
-	 */
-	NetworkOutputChannel<T> createNetworkOutputChannel(OutputGate<T> outputGate, ChannelID channelID,
-			ChannelID connectedChannelID);
-
-	/**
-	 * Creates a new in-memory output channel and assigns it to the given output gate.
-	 * 
-	 * @param outputGate
-	 *        the output gate the channel shall be assigned to
-	 * @param channelID
-	 *        the ID of the channel
-	 * @param connectedChannelID
-	 *        the ID of the channel this channel is connected to
-	 * @param compressionLevel
-	 *        the level of compression to be used for this channel
-	 * @return the new in-memory output channel
-	 */
-	InMemoryOutputChannel<T> createInMemoryOutputChannel(OutputGate<T> outputGate, ChannelID channelID,
-			ChannelID connectedChannelID);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Reader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Reader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Reader.java
deleted file mode 100644
index 80d2010..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Reader.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-/**
- * A reader interface to read records from an input.
- * 
- * @param <T> The type of the record that can be emitted with this record writer
- */
-public interface Reader<T extends IOReadableWritable> extends ReaderBase {
-
-	boolean hasNext() throws IOException, InterruptedException;
-
-	T next() throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ReaderBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ReaderBase.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ReaderBase.java
deleted file mode 100644
index ae69ad0..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ReaderBase.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.event.task.EventListener;
-
-
-/**
- *
- */
-public interface ReaderBase {
-
-	boolean isInputClosed();
-	
-	/**
-	 * Subscribes the listener object to receive events of the given type.
-	 * 
-	 * @param eventListener
-	 *        the listener object to register
-	 * @param eventType
-	 *        the type of event to register the listener for
-	 */
-	void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType);
-	
-	/**
-	 * Removes the subscription for events of the given type for the listener object.
-	 * 
-	 * @param eventListener
-	 *        the listener object to cancel the subscription for
-	 * @param eventType
-	 *        the type of the event to cancel the subscription for
-	 */
-	void unsubscribeFromEvent(final EventListener eventListener, final Class<? extends AbstractTaskEvent> eventType);
-
-	/**
-	 * Publishes an event.
-	 * 
-	 * @param event
-	 *        the event to be published
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the event
-	 * @throws InterruptedException
-	 *         thrown if the thread is interrupted while waiting for the event to be published
-	 */
-	void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException;
-	
-	
-	void setIterative(int numEventsUntilEndOfSuperstep);
-
-	
-	void startNextSuperstep();
-	
-	boolean hasReachedEndOfSuperstep();
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordAvailabilityListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordAvailabilityListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordAvailabilityListener.java
deleted file mode 100644
index d22f048..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordAvailabilityListener.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-/**
- * This interface can be implemented by a class which shall be notified by an input gate when one of the its connected
- * input channels has at least one record available for reading.
- * 
- * @param <T>
- *        the type of record transported through the corresponding input gate
- */
-public interface RecordAvailabilityListener<T extends IOReadableWritable> {
-
-	/**
-	 * This method is called by an input gate when one of its connected input channels has at least one record available
-	 * for reading.
-	 * 
-	 * @param inputGate
-	 *        the input gate which has at least one record available
-	 */
-	void reportRecordAvailability(InputGate<T> inputGate);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordDeserializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordDeserializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordDeserializer.java
deleted file mode 100644
index e8ccb77..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordDeserializer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
-
-/**
- * This interface must be implemented by classes which transfer bytes streams back into {@link Record} objects.
- * 
- * @param <T> The type of record this record deserializer works with.
- */
-public interface RecordDeserializer<T>
-{
-	/**
-	 * Transforms a record back from a readable byte channel. The deserialization might not complete, because the channel
-	 * has not all required data available. In that case, this method must return {@code null}. Furthermore, it may
-	 * not retain a reference to the given target object in that case, but must manage to put the data aside.
-	 * 
-	 * @param target The record object into which to deserialize the data. May be null for deserializers
-	 *               that operate on immutable objects, in which case the deserializer has to instantiate an
-	 *               object. In the case where this object is non-null, but the deserialization does not complete,
-	 *               the object must not be used to cache the partial state, as it is not guaranteed that the object
-	 *               will remain unchanged until the next attempt to continue the deserialization.
-	 * @param in The byte stream which contains the record's data.
-	 * @return The record deserialized from <code>in</code>, or null, if the record .
-	 * @throws IOException Thrown if an I/O error occurs while deserializing the record from the stream
-	 */
-	T readData(final T target, final ReadableByteChannel readableByteChannel) throws IOException;
-
-	/**
-	 * Clears the internal buffers of the deserializer and resets its state.
-	 */
-	void clear();
-	
-	/**
-	 * Checks whether the deserializer has data from a previous deserialization attempt stored in its internal buffers which
-	 * is not yet finished.
-	 * 
-	 * @return <code>true</code>, if the deserializer's internal buffers contain data from a previous deserialization
-	 *         attempt, <code>false</code> otherwise.
-	 */
-	boolean hasUnfinishedData();
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordDeserializerFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordDeserializerFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordDeserializerFactory.java
deleted file mode 100644
index eadd326..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordDeserializerFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-/**
- * A simple factory to instantiate record deserializer objects. Since a deserializer might be stateful, the system
- * must be able to instantiate an arbitrary number of them, equal to the number of data channels.
- * 
- * If the created deserializers are in fact not stateful, the factory should return a shared object.
- */
-public interface RecordDeserializerFactory<T>
-{
-	/**
-	 * Creates a new instance of the deserializer. The returned instance may not share any state with
-	 * any previously returned instance.
-	 * 
-	 * @return An instance of the deserializer.
-	 */
-	RecordDeserializer<T> createDeserializer();
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordReader.java
deleted file mode 100644
index 5010013..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordReader.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
-import eu.stratosphere.nephele.template.AbstractTask;
-
-/**
- * A record writer connects an input gate to an application. It allows the application
- * query for incoming records and read them from input gate.
- * 
- * @param <T> The type of the record that can be read from this record reader.
- */
-public class RecordReader<T extends IOReadableWritable> extends AbstractSingleGateRecordReader<T> implements Reader<T> {
-	
-	private final Class<T> recordType;
-	
-	/**
-	 * Stores the last read record.
-	 */
-	private T lookahead;
-
-	/**
-	 * Stores if more no more records will be received from the assigned input gate.
-	 */
-	private boolean noMoreRecordsWillFollow;
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Constructs a new record reader and registers a new input gate with the application's environment.
-	 * 
-	 * @param taskBase
-	 *        The application that instantiated the record reader.
-	 * @param recordType
-	 *        The class of records that can be read from the record reader.
-	 */
-	public RecordReader(AbstractTask taskBase, Class<T> recordType) {
-		super(taskBase, MutableRecordDeserializerFactory.<T>get(), 0);
-		this.recordType = recordType;
-	}
-
-	/**
-	 * Constructs a new record reader and registers a new input gate with the application's environment.
-	 * 
-	 * @param outputBase
-	 *        The application that instantiated the record reader.
-	 * @param recordType
-	 *        The class of records that can be read from the record reader.
-	 */
-	public RecordReader(AbstractOutputTask outputBase, Class<T> recordType) {
-		super(outputBase, MutableRecordDeserializerFactory.<T>get(), 0);
-		this.recordType = recordType;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Checks if at least one more record can be read from the associated input gate. This method may block
-	 * until the associated input gate is able to read the record from one of its input channels.
-	 * 
-	 * @return <code>true</code>it at least one more record can be read from the associated input gate, otherwise
-	 *         <code>false</code>
-	 */
-	@Override
-	public boolean hasNext() throws IOException, InterruptedException{
-		if (this.lookahead != null) {
-			return true;
-		} else {
-			if (this.noMoreRecordsWillFollow) {
-				return false;
-			}
-			
-			T record = instantiateRecordType();
-			
-			while (true) {
-				InputChannelResult result = this.inputGate.readRecord(record);
-				switch (result) {
-					case INTERMEDIATE_RECORD_FROM_BUFFER:
-					case LAST_RECORD_FROM_BUFFER:
-						this.lookahead = record;
-						return true;
-						
-					case END_OF_SUPERSTEP:
-						if (incrementEndOfSuperstepEventAndCheck()) {
-							return false;
-						}
-						else {
-							break; // fall through and wait for next record/event
-						}
-						
-					case TASK_EVENT:
-						handleEvent(this.inputGate.getCurrentEvent());
-						break;
-						
-					case END_OF_STREAM:
-						this.noMoreRecordsWillFollow = true;
-						return false;
-				
-					default:
-						; // fall through the loop
-				}
-			}
-		}
-	}
-
-	/**
-	 * Reads the current record from the associated input gate.
-	 * 
-	 * @return the current record from the associated input gate.
-	 * @throws IOException
-	 *         thrown if any error occurs while reading the record from the input gate
-	 */
-	@Override
-	public T next() throws IOException, InterruptedException {
-		if (hasNext()) {
-			T tmp = this.lookahead;
-			this.lookahead = null;
-			return tmp;
-		} else {
-			return null;
-		}
-	}
-	
-	@Override
-	public boolean isInputClosed() {
-		return this.noMoreRecordsWillFollow;
-	}
-	
-	private T instantiateRecordType() {
-		try {
-			return this.recordType.newInstance();
-		} catch (InstantiationException e) {
-			throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
-		} catch (IllegalAccessException e) {
-			throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordWriter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordWriter.java
deleted file mode 100644
index 2f4c21b..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordWriter.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.template.AbstractInputTask;
-import eu.stratosphere.nephele.template.AbstractTask;
-
-/**
- * A record writer connects the application to an output gate. It allows the application
- * of emit (send out) to the output gate. The output gate will then take care of distributing
- * the emitted records among the output channels.
- * 
- * @param <T>
- *        the type of the record that can be emitted with this record writer
- */
-public class RecordWriter<T extends IOReadableWritable> extends AbstractRecordWriter<T> {
-
-	/**
-	 * Constructs a new record writer and registers a new output gate with the application's environment.
-	 * 
-	 * @param taskBase
-	 *        the application that instantiated the record writer
-	 * @param outputClass
-	 *        the class of records that can be emitted with this record writer
-	 * @param selector
-	 *        the channel selector to be used to determine the output channel to be used for a record
-	 */
-	public RecordWriter(AbstractTask taskBase, Class<T> outputClass, ChannelSelector<T> selector) {
-		super(taskBase, outputClass, selector, false);
-	}
-
-	/**
-	 * Constructs a new record writer and registers a new output gate with the application's environment.
-	 * 
-	 * @param taskBase
-	 *        the application that instantiated the record writer
-	 * @param outputClass
-	 *        the class of records that can be emitted with this record writer
-	 */
-	public RecordWriter(AbstractTask taskBase, Class<T> outputClass) {
-		super(taskBase, outputClass, null, false);
-	}
-
-	/**
-	 * This method emits a record to the corresponding output gate. The method may block
-	 * until the record was transfered via any of the connected channels.
-	 * 
-	 * @param inputBase
-	 *        the application that instantiated the record writer
-	 * @param outputClass
-	 *        the class of records that can be emitted with this record writer
-	 */
-	public RecordWriter(AbstractInputTask<?> inputBase, Class<T> outputClass) {
-		super(inputBase, outputClass, null, false);
-	}
-
-	/**
-	 * Constructs a new record writer and registers a new output gate with the application's environment.
-	 * 
-	 * @param inputBase
-	 *        the application that instantiated the record writer
-	 * @param outputClass
-	 *        the class of records that can be emitted with this record writer
-	 * @param selector
-	 *        the channel selector to be used to determine the output channel to be used for a record
-	 */
-	public RecordWriter(AbstractInputTask<?> inputBase, Class<T> outputClass, ChannelSelector<T> selector) {
-		super(inputBase, outputClass, selector, false);
-	}
-}