You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/10 21:35:18 UTC

[21/34] Offer buffer-oriented API for I/O (#25)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/LocalChannelWithAccessInfo.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/LocalChannelWithAccessInfo.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/LocalChannelWithAccessInfo.java
deleted file mode 100644
index 173021f..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/LocalChannelWithAccessInfo.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-final class LocalChannelWithAccessInfo implements ChannelWithAccessInfo {
-
-	/**
-	 * The logging object.
-	 */
-	private static final Log LOG = LogFactory.getLog(LocalChannelWithAccessInfo.class);
-
-	private final File file;
-
-	private final FileChannel channel;
-
-	private final AtomicLong reservedWritePosition;
-
-	private final AtomicInteger referenceCounter;
-
-	private final AtomicBoolean deleteOnClose;
-
-	LocalChannelWithAccessInfo(final File file, final boolean deleteOnClose) throws IOException {
-
-		this.file = file;
-		this.channel = new RandomAccessFile(file, "rw").getChannel();
-		this.reservedWritePosition = new AtomicLong(0L);
-		this.referenceCounter = new AtomicInteger(0);
-		this.deleteOnClose = new AtomicBoolean(deleteOnClose);
-	}
-
-
-	@Override
-	public FileChannel getChannel() {
-
-		return this.channel;
-	}
-
-
-	@Override
-	public FileChannel getAndIncrementReferences() {
-
-		if (incrementReferences()) {
-			return this.channel;
-		} else {
-			return null;
-		}
-	}
-
-	@Override
-	public ChannelWithPosition reserveWriteSpaceAndIncrementReferences(final int spaceToReserve) {
-
-		if (incrementReferences()) {
-			return new ChannelWithPosition(this.channel, this.reservedWritePosition.getAndAdd(spaceToReserve));
-		} else {
-			return null;
-		}
-	}
-
-
-	@Override
-	public int decrementReferences() {
-
-		int current = this.referenceCounter.get();
-		while (true) {
-			if (current <= 0) {
-				// this is actually an error case, because the channel was deleted before
-				throw new IllegalStateException("The references to the file were already at zero.");
-			}
-
-			if (current == 1) {
-				// this call decrements to zero, so mark it as deleted
-				if (this.referenceCounter.compareAndSet(current, Integer.MIN_VALUE)) {
-					current = 0;
-					break;
-				}
-			} else if (this.referenceCounter.compareAndSet(current, current - 1)) {
-				current = current - 1;
-				break;
-			}
-			current = this.referenceCounter.get();
-		}
-
-		if (current > 0) {
-			return current;
-		} else if (current == 0) {
-			// delete the channel
-			this.referenceCounter.set(Integer.MIN_VALUE);
-			this.reservedWritePosition.set(Long.MIN_VALUE);
-			try {
-				this.channel.close();
-			} catch (IOException ioex) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Error while closing spill file for file buffers: " + ioex.getMessage(), ioex);
-				}
-			}
-			if (this.deleteOnClose.get()) {
-				this.file.delete();
-			}
-			return current;
-		} else {
-			throw new IllegalStateException("The references to the file were already at zero.");
-		}
-	}
-
-
-	@Override
-	public boolean incrementReferences() {
-
-		int current = this.referenceCounter.get();
-		while (true) {
-			// check whether it was disposed in the meantime
-			if (current < 0) {
-				return false;
-			}
-			// atomically check and increment
-			if (this.referenceCounter.compareAndSet(current, current + 1)) {
-				return true;
-			}
-			current = this.referenceCounter.get();
-		}
-	}
-
-
-	@Override
-	public void disposeSilently() {
-
-		this.referenceCounter.set(Integer.MIN_VALUE);
-		this.reservedWritePosition.set(Long.MIN_VALUE);
-
-		if (this.channel.isOpen()) {
-			try {
-				this.channel.close();
-			} catch (Throwable t) {
-			}
-		}
-
-		if (this.deleteOnClose.get()) {
-			this.file.delete();
-		}
-	}
-
-
-	@Override
-	public void updateDeleteOnCloseFlag(final boolean deleteOnClose) {
-
-		this.deleteOnClose.compareAndSet(true, deleteOnClose);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBuffer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBuffer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBuffer.java
deleted file mode 100644
index 8b2c0fb..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBuffer.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-
-import eu.stratosphere.core.memory.MemorySegment;
-
-public final class MemoryBuffer extends Buffer {
-
-	private final MemoryBufferRecycler bufferRecycler;
-
-	private final MemorySegment internalMemorySegment;
-	
-	/**
-	 * Internal index that points to the next byte to write
-	 */
-	private int index = 0;
-	
-	/**
-	 * Internal limit to simulate ByteBuffer behavior of MemorySegment. index > limit is not allowed.
-	 */
-	private int limit = 0;
-
-	MemoryBuffer(final int bufferSize, final MemorySegment memory, final MemoryBufferPoolConnector bufferPoolConnector) {
-		if (bufferSize > memory.size()) {
-			throw new IllegalArgumentException("Requested segment size is " + bufferSize
-				+ ", but provided MemorySegment only has a capacity of " + memory.size());
-		}
-
-		this.bufferRecycler = new MemoryBufferRecycler(memory, bufferPoolConnector);
-		this.internalMemorySegment = memory;
-		this.position(0);
-		this.limit(bufferSize);
-	}
-
-	private MemoryBuffer(final int bufferSize, final int pos, final MemorySegment memory, final MemoryBufferRecycler bufferRecycler) {
-		this.bufferRecycler = bufferRecycler;
-		this.internalMemorySegment = memory;
-		this.position(pos);
-		this.limit(bufferSize);
-	}
-
-	@Override
-	public int read(ByteBuffer dst) throws IOException {
-
-		if (!this.hasRemaining()) {
-			return -1;
-		}
-		int numBytes = dst.remaining();
-		final int remaining = this.remaining();
-		if (numBytes == 0) {
-			return 0;
-		}
-		if(numBytes > remaining) {
-			numBytes = remaining;
-		}
-		internalMemorySegment.get(position(), dst, numBytes);
-		index += numBytes;
-		return numBytes;
-	}
-	
-
-	@Override
-	public int writeTo(WritableByteChannel writableByteChannel) throws IOException {
-		if (!this.hasRemaining()) {
-			return -1;
-		}
-		
-		final ByteBuffer wrapped = this.internalMemorySegment.wrap(index, limit-index);
-		final int written = writableByteChannel.write(wrapped);
-		position(wrapped.position());
-		return written;
-	}
-
-
-	@Override
-	public void close() throws IOException {
-
-		this.position(this.limit());
-	}
-
-
-	@Override
-	public boolean isOpen() {
-
-		return this.hasRemaining();
-	}
-
-	/**
-	 * Resets the memory buffer.
-	 * 
-	 * @param bufferSize
-	 *        the size of buffer in bytes after the reset
-	 */
-	public final void reset(final int bufferSize) {
-		if(bufferSize > this.internalMemorySegment.size()) {
-			throw new RuntimeException("Given buffer size exceeds underlying buffer size");
-		}
-		this.position(0);
-		this.limit(bufferSize);
-	}
-
-	public final void position(final int i) {
-		if(i > limit) {
-			throw new IndexOutOfBoundsException("new position is larger than the limit");
-		}
-		index = i;
-	}
-	
-	@Override
-	public final int position() {
-		return index;
-	}
-	
-	public final void limit(final int l) {
-		if(limit > internalMemorySegment.size()) {
-			throw new RuntimeException("Limit is larger than MemoryBuffer size");
-		}
-		if (index > limit) {
-			index = limit;
-		}
-		limit = l;
-	}
-	
-	public final int limit() {
-		return limit;
-	}
-	
-	public final boolean hasRemaining() {
-		return index < limit;
-	}
-	
-	public final int remaining() {
-		return limit - index;
-	}
-
-	/**
-	 * Put MemoryBuffer into read mode
-	 */
-	public final void flip() {
-		limit = position();
-		position(0);
-	}
-
-	public void clear() {
-		this.limit = getTotalSize();
-		this.position(0);
-	}
-
-	/**
-	 * 
-	 * @return Returns the size of the underlying MemorySegment
-	 */
-	public int getTotalSize() {
-		return this.internalMemorySegment.size();
-	}
-	
-	@Override
-	public final int size() {
-		return this.limit();
-	}
-
-	public MemorySegment getMemorySegment() {
-		return this.internalMemorySegment;
-	}
-
-
-	@Override
-	protected void recycle() {
-		this.bufferRecycler.decreaseReferenceCounter();
-		if(bufferRecycler.referenceCounter.get() == 0) {
-			clear();
-		}
-	}
-
-
-
-	@Override
-	public boolean isBackedByMemory() {
-		return true;
-	}
-
-
-	@Override
-	public MemoryBuffer duplicate() {
-		final MemoryBuffer duplicatedMemoryBuffer = new MemoryBuffer(this.limit(), this.position(), this.internalMemorySegment, this.bufferRecycler);
-		this.bufferRecycler.increaseReferenceCounter();
-		return duplicatedMemoryBuffer;
-	}
-
-
-	@Override
-	public void copyToBuffer(final Buffer destinationBuffer) throws IOException {
-		if (size() > destinationBuffer.size()) {
-			throw new IllegalArgumentException("Destination buffer is too small to store content of source buffer: "
-				+ size() + " vs. " + destinationBuffer.size());
-		}
-		final MemoryBuffer target = (MemoryBuffer) destinationBuffer;
-		this.internalMemorySegment.copyTo(this.position(), target.getMemorySegment(), destinationBuffer.position(), limit()-position());
-		target.position(limit()-position()); // even if we do not change the source (this), we change the destination!!
-		destinationBuffer.flip();
-	}
-
-	
-
-	@Override
-	public int write(final ByteBuffer src) throws IOException {
-		int numBytes = src.remaining();
-		final int thisRemaining = this.remaining();
-		if(thisRemaining == 0) {
-			return 0;
-		}
-		if(numBytes > thisRemaining) {
-			numBytes = thisRemaining;
-		}
-		this.internalMemorySegment.put(position(), src, numBytes);
-		this.index += numBytes;
-		return numBytes;
-	}
-
-
-	@Override
-	public int write(final ReadableByteChannel readableByteChannel) throws IOException {
-
-		if (!this.hasRemaining()) {
-			return 0;
-		}
-		ByteBuffer wrapper = this.internalMemorySegment.wrap(index, limit-index);
-		final int written = readableByteChannel.read(wrapper);
-		this.position(wrapper.position());
-		this.limit(wrapper.limit());
-		return written;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBufferPoolConnector.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBufferPoolConnector.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBufferPoolConnector.java
deleted file mode 100644
index bd8519d..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBufferPoolConnector.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import eu.stratosphere.core.memory.MemorySegment;
-
-/**
- * The memory buffer pool connector provides a connection between {@link MemoryBuffer} and the {@link LocalBufferPool}
- * the memory buffer's encapsulated byte buffer has originally been taken from.
- * 
- */
-public interface MemoryBufferPoolConnector {
-
-	/**
-	 * Called by the {@link MemoryBufferRecycler} to return a buffer to its original buffer pool.
-	 * 
-	 * @param byteBuffer
-	 *        the buffer to be recycled
-	 */
-	void recycle(MemorySegment memSeg);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBufferRecycler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBufferRecycler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBufferRecycler.java
deleted file mode 100644
index 141e3a1..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBufferRecycler.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.memory.MemorySegment;
-
-/**
- * A memory buffer recycler takes care of the correct recycling of the internal byte buffer which backs a memory buffer.
- * Since buffer objects can be duplicated, i.e. multiple buffer objects point to the same physical buffer, it is
- * necessary to coordinate the recycling of the physical buffer.
- * <p>
- * This class is thread-safe.
- * 
- */
-public final class MemoryBufferRecycler {
-
-	/**
-	 * The log object used to report debug information and possible errors.
-	 */
-	private static final Log LOG = LogFactory.getLog(MemoryBufferRecycler.class);
-
-	/**
-	 * The original memory segment which has been taken from byte buffered channel manager's buffer pool.
-	 */
-	private final MemorySegment originalSegment;
-
-	/**
-	 * The connection to the pool from which the byte buffer has originally been taken.
-	 */
-	private final MemoryBufferPoolConnector bufferPoolConnector;
-
-	/**
-	 * The number of memory buffer objects which may still access the physical buffer.
-	 */
-	public final AtomicInteger referenceCounter = new AtomicInteger(1);
-
-	/**
-	 * Constructs a new memory buffer recycler.
-	 * 
-	 * @param originalBuffer
-	 *        the original byte buffer
-	 * @param bufferPoolConnector
-	 *        the connection to the pool from which the byte buffer has originally been taken
-	 */
-	MemoryBufferRecycler(final MemorySegment originalSegment, final MemoryBufferPoolConnector bufferPoolConnector) {
-
-		this.originalSegment = originalSegment;
-		this.bufferPoolConnector = bufferPoolConnector;
-	}
-
-	/**
-	 * Increases the number of references to the physical buffer by one.
-	 */
-	void increaseReferenceCounter() {
-
-		if (this.referenceCounter.getAndIncrement() == 0) {
-			LOG.error("Increasing reference counter from 0 to 1");
-		}
-	}
-
-	/**
-	 * Decreases the number of references to the physical buffer by one. If the number of references becomes zero the
-	 * physical buffer is recycled.
-	 */
-	void decreaseReferenceCounter() {
-
-		final int val = this.referenceCounter.decrementAndGet();
-		if (val == 0) {
-			this.bufferPoolConnector.recycle(this.originalSegment);
-
-		} else if (val < 0) {
-			LOG.error("reference counter is negative");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/SerializationBuffer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/SerializationBuffer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/SerializationBuffer.java
deleted file mode 100644
index 55bcb56..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/SerializationBuffer.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.DataOutputBuffer;
-
-/**
- * A class for serializing a record to its binary representation.
- * 
- * @param <T>
- *        the type of the record this serialization buffer can be used for
- */
-public class SerializationBuffer<T extends IOReadableWritable> {
-
-	private static final int SIZEOFINT = 4;
-
-	private DataOutputBuffer serializationBuffer = new DataOutputBuffer();
-
-	private ByteBuffer lengthBuf = ByteBuffer.allocate(SIZEOFINT);
-
-	private int bytesReadFromBuffer = 0;
-
-	/**
-	 * Translates an integer into an array of bytes.
-	 * 
-	 * @param val
-	 *        The integer to be translated
-	 * @param arr
-	 *        The byte buffer to store the data of the integer
-	 */
-	private void integerToByteBuffer(final int val, final ByteBuffer byteBuffer) {
-
-		for (int i = 0; i < SIZEOFINT; ++i) {
-			final int shift = i << (SIZEOFINT - 1); // i * 8
-			byteBuffer.put(SIZEOFINT - 1 - i, (byte) ((val & (0xff << shift)) >>> shift));
-		}
-
-		byteBuffer.position(0);
-		byteBuffer.limit(SIZEOFINT);
-	}
-
-	/**
-	 * Return <code>true</code> if the internal serialization buffer still contains data.
-	 * In this case the method serialize must not be called. If the internal buffer is empty
-	 * the method return <code>false</code>
-	 * 
-	 * @return <code>true</code> if the internal serialization buffer still contains data, <code>false</code> it it is
-	 *         empty
-	 */
-	public boolean dataLeftFromPreviousSerialization() {
-		return leftInSerializationBuffer() > 0;
-	}
-
-	/**
-	 * Reads the internal serialization buffer and writes the data to the given {@link WritableByteChannel} byte
-	 * channel.
-	 * 
-	 * @param writableByteChannel
-	 *        the byte channel to write the serialized data to
-	 * @return the number of bytes written the to given byte channel
-	 * @throws IOException
-	 *         thrown if an error occurs while writing to serialized data to the channel
-	 */
-	public int read(final WritableByteChannel writableByteChannel) throws IOException {
-
-		int bytesReadFromLengthBuf = 0;
-
-		// Deal with length buffer first
-		if (this.lengthBuf.hasRemaining()) { // There is data from the length buffer to be written
-			bytesReadFromLengthBuf = writableByteChannel.write(this.lengthBuf);
-		}
-
-		final int bytesReadFromSerializationBuf = writableByteChannel.write(this.serializationBuffer.getData());
-		// byteBuffer.put(this.serializationBuffer.getData(), this.bytesReadFromBuffer, length);
-		this.bytesReadFromBuffer += bytesReadFromSerializationBuf;
-
-		if (leftInSerializationBuffer() == 0) { // Record is entirely written to byteBuffer
-			this.serializationBuffer.reset();
-			this.bytesReadFromBuffer = 0;
-		}
-
-		return (bytesReadFromSerializationBuf + bytesReadFromLengthBuf);
-	}
-
-	/**
-	 * Return the number of bytes that have not been read from the internal serialization
-	 * buffer so far.
-	 * 
-	 * @return the number of bytes that have not been read from the internal serialization buffer so far
-	 */
-	private int leftInSerializationBuffer() {
-
-		return (this.serializationBuffer.getLength() - this.bytesReadFromBuffer);
-	}
-
-	/**
-	 * Serializes the record and writes it to an internal buffer. The buffer grows dynamically
-	 * in case more memory is required to serialization.
-	 * 
-	 * @param record
-	 *        The record to the serialized
-	 * @throws IOException
-	 *         Thrown if data from a previous serialization process is still in the internal buffer and has not yet been
-	 *         transfered to a byte buffer
-	 */
-	public void serialize(final T record) throws IOException {
-
-		// Check if there is data left in the buffer
-		if (dataLeftFromPreviousSerialization()) {
-			throw new IOException("Cannot write new data, " + leftInSerializationBuffer()
-				+ " bytes still left from previous call");
-		}
-
-		record.write(this.serializationBuffer); // serializationBuffer grows dynamically
-
-		// Now record is completely in serializationBuffer;
-		integerToByteBuffer(this.serializationBuffer.getLength(), this.lengthBuf);
-	}
-
-	public void clear() {
-		this.bytesReadFromBuffer = 0;
-		this.lengthBuf.clear();
-		this.serializationBuffer.reset();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedInputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedInputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedInputChannel.java
deleted file mode 100644
index 0009d5a..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedInputChannel.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.io.InputChannelResult;
-import eu.stratosphere.nephele.io.InputGate;
-import eu.stratosphere.nephele.io.RecordDeserializer;
-import eu.stratosphere.nephele.io.channels.AbstractInputChannel;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-
-/**
- * @param <T> The type of record that can be transported through this channel.
- */
-public abstract class AbstractByteBufferedInputChannel<T extends IOReadableWritable> extends AbstractInputChannel<T> {
-
-	/**
-	 * The log object used to report warnings and errors.
-	 */
-	private static final Log LOG = LogFactory.getLog(AbstractByteBufferedInputChannel.class);
-
-	/**
-	 * The deserializer used to deserialize records.
-	 */
-	private final RecordDeserializer<T> deserializer;
-
-	/**
-	 * Buffer for the uncompressed (raw) data.
-	 */
-	private Buffer dataBuffer;
-
-	private ByteBufferedInputChannelBroker inputChannelBroker;
-	
-	private AbstractTaskEvent currentEvent;
-
-	/**
-	 * The exception observed in this channel while processing the buffers. Checked and thrown
-	 * per-buffer.
-	 */
-	private volatile IOException ioException;
-
-	/**
-	 * Stores the number of bytes read through this input channel since its instantiation.
-	 */
-	private long amountOfDataTransmitted;
-	
-
-	private volatile boolean brokerAggreedToCloseChannel;
-
-	/**
-	 * Creates a new input channel.
-	 * 
-	 * @param inputGate
-	 *        the input gate this channel is wired to
-	 * @param channelIndex
-	 *        the channel's index at the associated input gate
-	 * @param type
-	 *        the type of record transported through this channel
-	 * @param channelID
-	 *        the ID of the channel
-	 * @param connectedChannelID
-	 *        the ID of the channel this channel is connected to
-	 */
-	public AbstractByteBufferedInputChannel(final InputGate<T> inputGate, final int channelIndex,
-			final RecordDeserializer<T> deserializer, final ChannelID channelID, final ChannelID connectedChannelID) {
-		super(inputGate, channelIndex, channelID, connectedChannelID);
-		this.deserializer = deserializer;
-	}
-
-	@Override
-	public InputChannelResult readRecord(T target) throws IOException {
-		if (this.dataBuffer == null) {
-			if (isClosed()) {
-				return InputChannelResult.END_OF_STREAM;
-			}
-
-			// get the next element we need to handle (buffer or event)
-			BufferOrEvent boe = this.inputChannelBroker.getNextBufferOrEvent();
-			
-			if (boe == null) {
-				throw new IllegalStateException("Input channel was queries for data even though none was announced available.");
-			}
-			
-			// handle events
-			if (boe.isEvent())
-			{
-				// sanity check: an event may only come after a complete record.
-				if (this.deserializer.hasUnfinishedData()) {
-					throw new IOException("Channel received an event before completing the current partial record.");
-				}
-				
-				AbstractEvent evt = boe.getEvent();
-				if (evt.getClass() == ByteBufferedChannelCloseEvent.class) {
-					this.brokerAggreedToCloseChannel = true;
-					return InputChannelResult.END_OF_STREAM;
-				}
-				else if (evt.getClass() == EndOfSuperstepEvent.class) {
-					return InputChannelResult.END_OF_SUPERSTEP;
-				}
-				else if (evt instanceof AbstractTaskEvent) {
-					this.currentEvent = (AbstractTaskEvent) evt;
-					return InputChannelResult.TASK_EVENT;
-				}
-				else {
-					LOG.error("Received unknown event: " + evt);
-					return InputChannelResult.NONE;
-				}
-			} else {
-				// buffer case
-				this.dataBuffer = boe.getBuffer();
-			}
-		}
-
-		// get the next record form the buffer
-		T nextRecord = this.deserializer.readData(target, this.dataBuffer);
-
-		// release the buffer if it is empty
-		if (this.dataBuffer.remaining() == 0) {
-			releasedConsumedReadBuffer(this.dataBuffer);
-			this.dataBuffer = null;
-			return nextRecord == null ? InputChannelResult.NONE : InputChannelResult.LAST_RECORD_FROM_BUFFER;
-		} else {
-			return nextRecord == null ? InputChannelResult.NONE : InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER;
-		}
-	}
-
-	@Override
-	public boolean isClosed() throws IOException{
-		if (this.ioException != null) {
-			throw new IOException("An error occurred in the channel: " + this.ioException.getMessage(), this.ioException);
-		} else {
-			return this.brokerAggreedToCloseChannel;
-		}
-	}
-
-
-	@Override
-	public void close() throws IOException, InterruptedException {
-
-		this.deserializer.clear();
-		if (this.dataBuffer != null) {
-			releasedConsumedReadBuffer(this.dataBuffer);
-			this.dataBuffer = null;
-		}
-
-		// This code fragment makes sure the isClosed method works in case the channel input has not been fully consumed
-		while (!this.brokerAggreedToCloseChannel)
-		{
-			BufferOrEvent next = this.inputChannelBroker.getNextBufferOrEvent();
-			if (next != null) {
-				if (next.isEvent()) {
-					if (next.getEvent() instanceof ByteBufferedChannelCloseEvent) {
-						this.brokerAggreedToCloseChannel = true;
-					}
-				} else {
-					releasedConsumedReadBuffer(next.getBuffer());
-				}
-			} else {
-				Thread.sleep(200);
-			}
-		}
-
-		// Send close event to indicate the input channel has successfully
-		// processed all data it is interested in.
-		transferEvent(new ByteBufferedChannelCloseEvent());
-	}
-
-	
-	private void releasedConsumedReadBuffer(Buffer buffer) {
-		this.amountOfDataTransmitted += buffer.size();
-		buffer.recycleBuffer();
-	}
-	
-
-	public void setInputChannelBroker(ByteBufferedInputChannelBroker inputChannelBroker) {
-		this.inputChannelBroker = inputChannelBroker;
-	}
-
-
-	public void notifyGateThatInputIsAvailable() {
-		this.getInputGate().notifyRecordIsAvailable(getChannelIndex());
-	}
-
-	
-	@Override
-	public void transferEvent(AbstractEvent event) throws IOException, InterruptedException {
-		this.inputChannelBroker.transferEventToOutputChannel(event);
-	}
-
-	
-	public void reportIOException(IOException ioe) {
-		this.ioException = ioe;
-	}
-
-	
-	@Override
-	public void releaseAllResources() {
-		this.brokerAggreedToCloseChannel = true;
-		this.deserializer.clear();
-
-		// The buffers are recycled by the input channel wrapper
-	}
-
-	
-	@Override
-	public long getAmountOfDataTransmitted() {
-		return this.amountOfDataTransmitted;
-	}
-
-	
-	/**
-	 * Notify the channel that a data unit has been consumed.
-	 */
-	public void notifyDataUnitConsumed() {
-		this.getInputGate().notifyDataUnitConsumed(getChannelIndex());
-	}
-	
-	@Override
-	public AbstractTaskEvent getCurrentEvent() {
-		AbstractTaskEvent e = this.currentEvent;
-		this.currentEvent = null;
-		return e;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedOutputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedOutputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedOutputChannel.java
deleted file mode 100644
index 50d4a50..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedOutputChannel.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.io.OutputGate;
-import eu.stratosphere.nephele.io.channels.AbstractOutputChannel;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.io.channels.SerializationBuffer;
-
-public abstract class AbstractByteBufferedOutputChannel<T extends IOReadableWritable> extends AbstractOutputChannel<T> {
-
-	/**
-	 * The serialization buffer used to serialize records.
-	 */
-	private final SerializationBuffer<T> serializationBuffer = new SerializationBuffer<T>();
-
-	/**
-	 * Buffer for the serialized output data.
-	 */
-	private Buffer dataBuffer = null;
-
-	/**
-	 * Stores whether the channel is requested to be closed.
-	 */
-	private boolean closeRequested = false;
-
-	/**
-	 * The output channel broker the channel should contact to request and release write buffers.
-	 */
-	private ByteBufferedOutputChannelBroker outputChannelBroker = null;
-
-
-	/**
-	 * Stores the number of bytes transmitted through this output channel since its instantiation.
-	 */
-	private long amountOfDataTransmitted = 0L;
-
-	private static final Log LOG = LogFactory.getLog(AbstractByteBufferedOutputChannel.class);
-
-	/**
-	 * Creates a new byte buffered output channel.
-	 * 
-	 * @param outputGate
-	 *        the output gate this channel is wired to
-	 * @param channelIndex
-	 *        the channel's index at the associated output gate
-	 * @param channelID
-	 *        the ID of the channel
-	 * @param connectedChannelID
-	 *        the ID of the channel this channel is connected to
-	 */
-	protected AbstractByteBufferedOutputChannel(final OutputGate<T> outputGate, final int channelIndex,
-			final ChannelID channelID, final ChannelID connectedChannelID) {
-		super(outputGate, channelIndex, channelID, connectedChannelID);
-	}
-
-
-	@Override
-	public boolean isClosed() throws IOException, InterruptedException {
-
-		if (this.closeRequested && this.dataBuffer == null
-			&& !this.serializationBuffer.dataLeftFromPreviousSerialization()) {
-
-			if (!this.outputChannelBroker.hasDataLeftToTransmit()) {
-				return true;
-			}
-		}
-
-		return false;
-	}
-
-
-	@Override
-	public void requestClose() throws IOException, InterruptedException {
-
-		if (!this.closeRequested) {
-			this.closeRequested = true;
-			if (this.serializationBuffer.dataLeftFromPreviousSerialization()) {
-				// make sure we serialized all data before we send the close event
-				flush();
-			}
-
-			if (getType() == ChannelType.INMEMORY || !isBroadcastChannel() || getChannelIndex() == 0) {
-				transferEvent(new ByteBufferedChannelCloseEvent());
-				flush();
-			}
-		}
-	}
-
-	/**
-	 * Requests a new write buffer from the framework. This method blocks until the requested buffer is available.
-	 * 
-	 * @throws InterruptedException
-	 *         thrown if the thread is interrupted while waiting for the buffer
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while waiting for the buffer
-	 */
-	private void requestWriteBufferFromBroker() throws InterruptedException, IOException {
-		if (Thread.interrupted()) {
-			throw new InterruptedException();
-		}
-		this.dataBuffer = this.outputChannelBroker.requestEmptyWriteBuffer();
-	}
-
-	/**
-	 * Returns the filled buffer to the framework and triggers further processing.
-	 * 
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while releasing the buffers
-	 * @throws InterruptedException
-	 *         thrown if the thread is interrupted while releasing the buffers
-	 */
-	private void releaseWriteBuffer() throws IOException, InterruptedException {
-		// Keep track of number of bytes transmitted through this channel
-		this.amountOfDataTransmitted += this.dataBuffer.size();
-
-		this.outputChannelBroker.releaseWriteBuffer(this.dataBuffer);
-		this.dataBuffer = null;
-	}
-
-
-	@Override
-	public void writeRecord(T record) throws IOException, InterruptedException {
-
-		// Get a write buffer from the broker
-		if (this.dataBuffer == null) {
-			requestWriteBufferFromBroker();
-		}
-
-		if (this.closeRequested) {
-			throw new IOException("Channel is aready requested to be closed");
-		}
-
-		// Check if we can accept new records or if there are still old
-		// records to be transmitted
-		while (this.serializationBuffer.dataLeftFromPreviousSerialization()) {
-
-			this.serializationBuffer.read(this.dataBuffer);
-			if (this.dataBuffer.remaining() == 0) {
-				releaseWriteBuffer();
-				requestWriteBufferFromBroker();
-			}
-		}
-
-		if (this.serializationBuffer.dataLeftFromPreviousSerialization()) {
-			throw new IOException("Serialization buffer is expected to be empty!");
-		}
-
-		this.serializationBuffer.serialize(record);
-
-		this.serializationBuffer.read(this.dataBuffer);
-		if (this.dataBuffer.remaining() == 0) {
-			releaseWriteBuffer();
-		}
-	}
-
-	/**
-	 * Sets the output channel broker this channel should contact to request and release write buffers.
-	 * 
-	 * @param byteBufferedOutputChannelBroker
-	 *        the output channel broker the channel should contact to request and release write buffers
-	 */
-	public void setByteBufferedOutputChannelBroker(ByteBufferedOutputChannelBroker byteBufferedOutputChannelBroker) {
-
-		this.outputChannelBroker = byteBufferedOutputChannelBroker;
-	}
-
-
-	public void processEvent(AbstractEvent event) {
-
-		if (event instanceof AbstractTaskEvent) {
-			getOutputGate().deliverEvent((AbstractTaskEvent) event);
-		} else {
-			LOG.error("Channel " + getID() + " received unknown event " + event);
-		}
-	}
-
-
-	@Override
-	public void transferEvent(AbstractEvent event) throws IOException, InterruptedException {
-
-		flush();
-		this.outputChannelBroker.transferEventToInputChannel(event);
-	}
-
-
-	@Override
-	public void flush() throws IOException, InterruptedException {
-
-		// Get rid of remaining data in the serialization buffer
-		while (this.serializationBuffer.dataLeftFromPreviousSerialization()) {
-
-			if (this.dataBuffer == null) {
-
-				try {
-					requestWriteBufferFromBroker();
-				} catch (InterruptedException e) {
-					LOG.error(e);
-				}
-			}
-			this.serializationBuffer.read(this.dataBuffer);
-			if (this.dataBuffer.remaining() == 0) {
-				releaseWriteBuffer();
-			}
-		}
-
-		// Get rid of the leased write buffer
-		if (this.dataBuffer != null) {
-			releaseWriteBuffer();
-		}
-	}
-
-
-	@Override
-	public void releaseAllResources() {
-
-		// TODO: Reconsider release of broker's resources here
-		this.closeRequested = true;
-
-		this.serializationBuffer.clear();
-
-		if (this.dataBuffer != null) {
-			this.dataBuffer.recycleBuffer();
-			this.dataBuffer = null;
-		}
-	}
-
-
-	@Override
-	public long getAmountOfDataTransmitted() {
-
-		return this.amountOfDataTransmitted;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/BufferOrEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/BufferOrEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/BufferOrEvent.java
deleted file mode 100644
index f89c7fb..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/BufferOrEvent.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.io.channels.Buffer;
-
-/**
- * Either type for {@link Buffer} and {@link AbstractEvent}.
- */
-public class BufferOrEvent {
-	
-	private final Buffer buffer;
-	
-	private final AbstractEvent event;
-	
-	public BufferOrEvent(Buffer buffer) {
-		this.buffer = buffer;
-		this.event = null;
-	}
-	
-	public BufferOrEvent(AbstractEvent event) {
-		this.buffer = null;
-		this.event = event;
-	}
-	
-	public boolean isBuffer() {
-		return this.buffer != null;
-	}
-	
-	public boolean isEvent() {
-		return this.event != null;
-	}
-	
-	public Buffer getBuffer() {
-		return this.buffer;
-	}
-	
-	public AbstractEvent getEvent() {
-		return this.event;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedChannelCloseEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedChannelCloseEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedChannelCloseEvent.java
deleted file mode 100644
index f427b70..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedChannelCloseEvent.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-
-public class ByteBufferedChannelCloseEvent extends AbstractEvent {
-
-	@Override
-	public void read(DataInput in) throws IOException {
-
-		// Nothing to do here
-	}
-
-	@Override
-	public void write(DataOutput out) throws IOException {
-		// Nothing to do here
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedInputChannelBroker.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedInputChannelBroker.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedInputChannelBroker.java
deleted file mode 100644
index 4d259ec..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedInputChannelBroker.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-
-
-public interface ByteBufferedInputChannelBroker {
-
-	public BufferOrEvent getNextBufferOrEvent() throws IOException;
-
-	/**
-	 * Forwards the given event to the connected network output channel on a best effort basis.
-	 * 
-	 * @param event
-	 *        the event to be transferred
-	 * @throws InterruptedException
-	 *         thrown if the thread is interrupted while waiting for the event to be transfered
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while transferring the event
-	 */
-	void transferEventToOutputChannel(AbstractEvent event) throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedOutputChannelBroker.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedOutputChannelBroker.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedOutputChannelBroker.java
deleted file mode 100644
index af45e4c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedOutputChannelBroker.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.io.channels.Buffer;
-
-public interface ByteBufferedOutputChannelBroker {
-
-	/**
-	 * Requests an empty write buffer from the broker. This method will block
-	 * until the requested write buffer is available.
-	 * 
-	 * @return the byte buffers to write in
-	 * @throws InterruptedException
-	 *         thrown if the connected task is interrupted while waiting for the buffer
-	 * @throws IOException
-	 *         thrown if an error occurs while requesting the empty write buffer.
-	 */
-	Buffer requestEmptyWriteBuffer() throws InterruptedException, IOException;
-
-	/**
-	 * Returns a filled write buffer to the broker. The broker will take care
-	 * of the buffers and transfer the user data to the connected input channel on a best effort basis.
-	 * 
-	 * @param buffer
-	 *        the buffer to be returned to the broker
-	 * @throws InterruptedException
-	 *         thrown if the thread is interrupted while waiting for the buffers to be released
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while releasing the buffers
-	 */
-	void releaseWriteBuffer(Buffer buffer) throws IOException, InterruptedException;
-
-	/**
-	 * Checks if there is still data created by this output channel that must be transfered to the corresponding input
-	 * channel.
-	 * 
-	 * @return <code>true</code> if the channel has data left to transmit, <code>false</code> otherwise
-	 * @throws InterruptedException
-	 *         thrown if the connected task is interrupted while waiting for the remaining data to be transmitted
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the remaining data
-	 */
-	boolean hasDataLeftToTransmit() throws IOException, InterruptedException;
-
-	/**
-	 * Forwards the given event to the connected network input channel on a best effort basis.
-	 * 
-	 * @param event
-	 *        the event to be transferred
-	 * @throws InterruptedException
-	 *         thrown if the thread is interrupted while waiting for the event to be transfered
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while transfering the event
-	 */
-	void transferEventToInputChannel(AbstractEvent event) throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/EndOfSuperstepEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/EndOfSuperstepEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/EndOfSuperstepEvent.java
deleted file mode 100644
index b0b20d4..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/EndOfSuperstepEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-
-/**
- * Marks the end of a superstep of one particular iteration head
- */
-public class EndOfSuperstepEvent extends AbstractEvent {
-	
-	public static final EndOfSuperstepEvent INSTANCE = new EndOfSuperstepEvent();
-
-	@Override
-	public void write(DataOutput out) throws IOException {}
-
-	@Override
-	public void read(DataInput in) throws IOException {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/InMemoryInputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/InMemoryInputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/InMemoryInputChannel.java
deleted file mode 100644
index 0609fcb..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/InMemoryInputChannel.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.InputGate;
-import eu.stratosphere.nephele.io.RecordDeserializer;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-
-public final class InMemoryInputChannel<T extends IOReadableWritable> extends AbstractByteBufferedInputChannel<T> {
-
-	public InMemoryInputChannel(InputGate<T> inputGate, int channelIndex, RecordDeserializer<T> deserializer,
-			ChannelID channelID, ChannelID connectedChannelID) {
-		super(inputGate, channelIndex, deserializer, channelID, connectedChannelID);
-	}
-
-	@Override
-	public ChannelType getType() {
-
-		return ChannelType.INMEMORY;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/InMemoryOutputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/InMemoryOutputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/InMemoryOutputChannel.java
deleted file mode 100644
index 0113d6f..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/InMemoryOutputChannel.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.OutputGate;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-
-public final class InMemoryOutputChannel<T extends IOReadableWritable> extends AbstractByteBufferedOutputChannel<T> {
-
-	public InMemoryOutputChannel(OutputGate<T> outputGate, int channelIndex, ChannelID channelID,
-			ChannelID connectedChannelID) {
-		super(outputGate, channelIndex, channelID, connectedChannelID);
-	}
-
-	@Override
-	public ChannelType getType() {
-
-		return ChannelType.INMEMORY;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/NetworkInputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/NetworkInputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/NetworkInputChannel.java
deleted file mode 100644
index ee11c0e..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/NetworkInputChannel.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.InputGate;
-import eu.stratosphere.nephele.io.RecordDeserializer;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-
-public final class NetworkInputChannel<T extends IOReadableWritable> extends AbstractByteBufferedInputChannel<T> {
-
-	public NetworkInputChannel(InputGate<T> inputGate, int channelIndex, RecordDeserializer<T> deserializer,
-			ChannelID channelID, ChannelID connectedChannelID) {
-		super(inputGate, channelIndex, deserializer, channelID, connectedChannelID);
-	}
-
-	@Override
-	public ChannelType getType() {
-
-		return ChannelType.NETWORK;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/NetworkOutputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/NetworkOutputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/NetworkOutputChannel.java
deleted file mode 100644
index c98f41a..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/NetworkOutputChannel.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.OutputGate;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-
-public final class NetworkOutputChannel<T extends IOReadableWritable> extends AbstractByteBufferedOutputChannel<T> {
-
-	public NetworkOutputChannel(OutputGate<T> outputGate, int channelIndex, ChannelID channelID,
-			ChannelID connectedChannelID) {
-		super(outputGate, channelIndex, channelID, connectedChannelID);
-	}
-
-	@Override
-	public ChannelType getType() {
-
-		return ChannelType.NETWORK;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/DirectoryReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/DirectoryReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/DirectoryReader.java
deleted file mode 100644
index acfddde..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/DirectoryReader.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.library;
-
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.fs.FSDataInputStream;
-import eu.stratosphere.core.fs.FileInputSplit;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractFileInputTask;
-import eu.stratosphere.nephele.types.FileRecord;
-
-public class DirectoryReader extends AbstractFileInputTask {
-
-	/**
-	 * The record writer to write the output strings to.
-	 */
-	private RecordWriter<FileRecord> output = null;
-
-	// buffer
-	private byte[] buffer;
-
-	private static final Log LOG = LogFactory.getLog(DirectoryReader.class);
-
-
-	@Override
-	public void invoke() throws Exception {
-
-		final Iterator<FileInputSplit> splitIterator = getFileInputSplits();
-		FileRecord fr = null;
-
-		while (splitIterator.hasNext()) {
-
-			final FileInputSplit split = splitIterator.next();
-
-			final long start = split.getStart();
-			final long end = start + split.getLength();
-
-			if (buffer == null || buffer.length < end - start) {
-				buffer = new byte[(int) (end - start)];
-			}
-
-			if (fr == null || fr.getFileName().compareTo(split.getPath().getName()) != 0) {
-				if (fr != null) {
-					try {
-						output.emit(fr);
-					} catch (InterruptedException e) {
-						// TODO: Respond to interruption properly
-						LOG.error(e);
-					}
-				}
-				fr = new FileRecord(split.getPath().getName());
-			}
-
-			final FileSystem fs = FileSystem.get(split.getPath().toUri());
-
-			final FSDataInputStream fdis = fs.open(split.getPath());
-			fdis.seek(split.getStart());
-
-			int read = fdis.read(buffer, 0, buffer.length);
-			if (read == -1) {
-				continue;
-			}
-
-			fr.append(buffer, 0, read);
-
-			if (read != end - start) {
-				System.err.println("Unexpected number of bytes read! Expected: " + (end - start) + " Read: " + read);
-			}
-		}
-
-		if (fr != null) {
-			try {
-				output.emit(fr);
-			} catch (InterruptedException e) {
-				// TODO: Respond to interruption properly
-				LOG.error(e);
-			}
-		}
-	}
-
-
-	@Override
-	public void registerInputOutput() {
-		output = new RecordWriter<FileRecord>(this, FileRecord.class);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/DirectoryWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/DirectoryWriter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/DirectoryWriter.java
deleted file mode 100644
index 915aaa7..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/DirectoryWriter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.library;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.fs.FSDataOutputStream;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.template.AbstractFileOutputTask;
-import eu.stratosphere.nephele.types.FileRecord;
-
-public class DirectoryWriter extends AbstractFileOutputTask {
-
-	/**
-	 * The record reader to read the incoming strings from.
-	 */
-	private RecordReader<FileRecord> input = null;
-
-	private static final Log LOG = LogFactory.getLog(DirectoryWriter.class);
-
-	@Override
-	public void invoke() throws Exception {
-
-		final Path path = getFileOutputPath();
-		final FileSystem fs = path.getFileSystem();
-
-		try {
-			while (input.hasNext()) {
-
-				final FileRecord record = input.next();
-				Path newPath = new Path(path + Path.SEPARATOR + record.getFileName());
-				FSDataOutputStream outputStream = fs.create(newPath, true);
-
-				outputStream.write(record.getDataBuffer(), 0, record.getDataBuffer().length);
-				outputStream.close();
-				// TODO: Implement me
-				// System.out.println(input.next());
-				// ODO Auto-generated catch block
-			}
-		} catch (InterruptedException e) {
-			// TODO Auto-generated catch block
-			LOG.error(e);
-		}
-
-	}
-
-	@Override
-	public void registerInputOutput() {
-		input = new RecordReader<FileRecord>(this, FileRecord.class);
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/FileLineReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/FileLineReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/FileLineReader.java
deleted file mode 100644
index a26c56c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/FileLineReader.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.library;
-
-import java.util.Iterator;
-
-import eu.stratosphere.core.fs.FSDataInputStream;
-import eu.stratosphere.core.fs.FileInputSplit;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractFileInputTask;
-import eu.stratosphere.runtime.fs.LineReader;
-
-/**
- * A file line reader reads the associated file input splits line by line and outputs the lines as string records.
- * 
- */
-public class FileLineReader extends AbstractFileInputTask {
-
-	private RecordWriter<StringRecord> output = null;
-
-	@Override
-	public void invoke() throws Exception {
-
-		final Iterator<FileInputSplit> splitIterator = getFileInputSplits();
-
-		while (splitIterator.hasNext()) {
-
-			final FileInputSplit split = splitIterator.next();
-
-			long start = split.getStart();
-			long length = split.getLength();
-
-			final FileSystem fs = FileSystem.get(split.getPath().toUri());
-
-			final FSDataInputStream fdis = fs.open(split.getPath());
-
-			final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
-
-			byte[] line = lineReader.readLine();
-
-			while (line != null) {
-
-				// Create a string object from the data read
-				StringRecord str = new StringRecord();
-				str.set(line);
-
-				// Send out string
-				output.emit(str);
-
-				line = lineReader.readLine();
-			}
-
-			// Close the stream;
-			lineReader.close();
-		}
-	}
-
-	@Override
-	public void registerInputOutput() {
-		output = new RecordWriter<StringRecord>(this, StringRecord.class);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/FileLineWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/FileLineWriter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/FileLineWriter.java
deleted file mode 100644
index 5be3e1b..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/FileLineWriter.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.library;
-
-import eu.stratosphere.core.fs.FSDataOutputStream;
-import eu.stratosphere.core.fs.FileStatus;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.template.AbstractFileOutputTask;
-
-/**
- * A file line writer reads string records its input gate and writes them to the associated output file.
- * 
- */
-public class FileLineWriter extends AbstractFileOutputTask {
-
-	/**
-	 * The record reader through which incoming string records are received.
-	 */
-	private RecordReader<StringRecord> input = null;
-
-
-	@Override
-	public void invoke() throws Exception {
-
-		Path outputPath = getFileOutputPath();
-
-		FileSystem fs = FileSystem.get(outputPath.toUri());
-		if (fs.exists(outputPath)) {
-			FileStatus status = fs.getFileStatus(outputPath);
-
-			if (status.isDir()) {
-				outputPath = new Path(outputPath.toUri().toString() + "/file_" + getIndexInSubtaskGroup() + ".txt");
-			}
-		}
-
-		final FSDataOutputStream outputStream = fs.create(outputPath, true);
-
-		while (this.input.hasNext()) {
-
-			StringRecord record = this.input.next();
-			byte[] recordByte = (record.toString() + "\r\n").getBytes();
-			outputStream.write(recordByte, 0, recordByte.length);
-		}
-
-		outputStream.close();
-
-	}
-
-
-	@Override
-	public void registerInputOutput() {
-		this.input = new RecordReader<StringRecord>(this, StringRecord.class);
-	}
-
-
-	@Override
-	public int getMaximumNumberOfSubtasks() {
-		// The default implementation always returns -1
-		return -1;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Client.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Client.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Client.java
index 0204ec6..ea7bd4a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Client.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Client.java
@@ -19,6 +19,16 @@
 
 package eu.stratosphere.nephele.ipc;
 
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.nephele.net.NetUtils;
+import eu.stratosphere.nephele.util.IOUtils;
+import eu.stratosphere.runtime.io.serialization.DataOutputSerializer;
+import eu.stratosphere.util.ClassUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.net.SocketFactory;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -31,24 +41,13 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.net.SocketFactory;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.DataOutputBuffer;
-import eu.stratosphere.nephele.net.NetUtils;
-import eu.stratosphere.nephele.util.IOUtils;
-import eu.stratosphere.util.ClassUtils;
-
 /**
  * A client for an IPC service. IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value. A service runs on
@@ -379,13 +378,14 @@ public class Client {
 			out.write(Server.HEADER.array());
 
 			// Write out the ConnectionHeader
-			DataOutputBuffer buf = new DataOutputBuffer();
+			DataOutputSerializer buf = new DataOutputSerializer(4 + header.getProtocol().getBytes().length + 1);
 			header.write(buf);
 
 			// Write out the payload length
-			int bufLen = buf.getLength();
+			ByteBuffer wrapper = buf.wrapAsByteBuffer();
+			int bufLen = wrapper.limit();
 			out.writeInt(bufLen);
-			out.write(buf.getData().array(), 0, bufLen);
+			out.write(wrapper.array(), 0, bufLen);
 		}
 
 		/*
@@ -456,30 +456,27 @@ public class Client {
 				return;
 			}
 
-			DataOutputBuffer d = null;
+			DataOutputSerializer d = null;
 			try {
 				synchronized (this.out) {
 
 					// for serializing the
 					// data to be written
-					d = new DataOutputBuffer();
+					d = new DataOutputSerializer(64);
 					// First, write call id to buffer d
 					d.writeInt(call.id);
 					// Then write RPC data (the actual call) to buffer d
 					call.param.write(d);
 
-					byte[] data = d.getData().array();
-					int dataLength = d.getLength();
+					ByteBuffer wrapper = d.wrapAsByteBuffer();
+					byte[] data = wrapper.array();
+					int dataLength = wrapper.limit();
 					out.writeInt(dataLength); // first put the data length
 					out.write(data, 0, dataLength);// write the data
 					out.flush();
 				}
 			} catch (IOException e) {
 				markClosed(e);
-			} finally {
-				// the buffer is just an in-memory buffer, but it is still polite to
-				// close early
-				IOUtils.closeStream(d);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
index 6af9d74..b4c51f2 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
@@ -23,8 +23,7 @@ import eu.stratosphere.configuration.IllegalConfigurationException;
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.core.io.StringRecord;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.nephele.util.EnumUtils;
 import eu.stratosphere.util.StringUtils;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/DistributionPattern.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/DistributionPattern.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/DistributionPattern.java
new file mode 100644
index 0000000..3b8c9a0
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/DistributionPattern.java
@@ -0,0 +1,33 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.jobgraph;
+
+/**
+ * A distribution pattern determines which subtasks of a producing Nephele task a wired to which
+ * subtasks of a consuming subtask.
+ * 
+ */
+
+public enum DistributionPattern {
+
+	/**
+	 * Each subtask of the producing Nephele task is wired to each subtask of the consuming Nephele task.
+	 */
+	BIPARTITE,
+
+	/**
+	 * The i-th subtask of the producing Nephele task is wired to the i-th subtask of the consuming Nephele task.
+	 */
+	POINTWISE
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobEdge.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobEdge.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobEdge.java
index 51396ff..45788dd 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobEdge.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobEdge.java
@@ -13,8 +13,7 @@
 
 package eu.stratosphere.nephele.jobgraph;
 
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 
 /**
  * Objects of this class represent edges in the user's job graph.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java
index e4467a0..2ec2ed6 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java
@@ -1,5 +1,5 @@
 /***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  * the License. You may obtain a copy of the License at
@@ -13,81 +13,44 @@
 
 package eu.stratosphere.nephele.jobgraph;
 
+import java.nio.ByteBuffer;
+
 import javax.xml.bind.DatatypeConverter;
 
-import eu.stratosphere.nephele.io.AbstractID;
+import eu.stratosphere.nephele.AbstractID;
 
-/**
- * A class for statistically unique job IDs.
- * <p>
- * This class is thread-safe.
- * 
- */
 public final class JobID extends AbstractID {
 
-	/**
-	 * Constructs a new random ID from a uniform distribution.
-	 */
 	public JobID() {
 		super();
 	}
 
-	/**
-	 * Constructs a new job ID.
-	 * 
-	 * @param lowerPart
-	 *        the lower bytes of the ID
-	 * @param upperPart
-	 *        the higher bytes of the ID
-	 */
-	private JobID(final long lowerPart, final long upperPart) {
+	public JobID(long lowerPart, long upperPart) {
 		super(lowerPart, upperPart);
 	}
 
-	/**
-	 * Constructs a new job ID from the given bytes.
-	 * 
-	 * @param bytes
-	 *        the bytes to initialize the job ID with
-	 */
-	public JobID(final byte[] bytes) {
+	public JobID(byte[] bytes) {
 		super(bytes);
 	}
 
-	/**
-	 * Generates a new statistically unique job ID.
-	 * 
-	 * @return a new statistically unique job ID
-	 */
 	public static JobID generate() {
-
-		final long lowerPart = AbstractID.generateRandomBytes();
-		final long upperPart = AbstractID.generateRandomBytes();
+		long lowerPart = AbstractID.generateRandomLong();
+		long upperPart = AbstractID.generateRandomLong();
 
 		return new JobID(lowerPart, upperPart);
 	}
 
-	/**
-	 * Constructs a new job ID and initializes it with the given bytes.
-	 * 
-	 * @param bytes
-	 *        the bytes to initialize the new job ID with
-	 * @return the new job ID
-	 */
-	public static JobID fromByteArray(final byte[] bytes) {
-
+	public static JobID fromByteArray(byte[] bytes) {
 		return new JobID(bytes);
 	}
-	
-	/**
-	 * Constructs a new job ID and initializes it with the given bytes.
-	 * 
-	 * @param bytes
-	 *        the bytes to initialize the new job ID with
-	 * @return the new job ID
-	 */
-	public static JobID fromHexString(final String hexString) {
 
+	public static JobID fromByteBuffer(ByteBuffer buf, int offset) {
+		long lower = buf.getLong(offset);
+		long upper = buf.getLong(offset + 8);
+		return new JobID(lower, upper);
+	}
+
+	public static JobID fromHexString(String hexString) {
 		return new JobID(DatatypeConverter.parseHexBinary(hexString));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobVertexID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobVertexID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobVertexID.java
index c6f63ca..689cc02 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobVertexID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobVertexID.java
@@ -13,7 +13,7 @@
 
 package eu.stratosphere.nephele.jobgraph;
 
-import eu.stratosphere.nephele.io.AbstractID;
+import eu.stratosphere.nephele.AbstractID;
 
 /**
  * A class for statistically unique job vertex IDs.