You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/10 21:35:19 UTC

[22/34] Offer buffer-oriented API for I/O (#25)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RuntimeInputGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RuntimeInputGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RuntimeInputGate.java
deleted file mode 100644
index 71cad15..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RuntimeInputGate.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.io.channels.AbstractInputChannel;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.bytebuffered.InMemoryInputChannel;
-import eu.stratosphere.nephele.io.channels.bytebuffered.NetworkInputChannel;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * In Nephele input gates are a specialization of general gates and connect input channels and record readers. As
- * channels, input gates are always parameterized to a specific type of record which they can transport. In contrast to
- * output gates input gates can be associated with a {@link DistributionPattern} object which dictates the concrete
- * wiring between two groups of vertices.
- * 
- * @param <T> The type of record that can be transported through this gate.
- */
-public class RuntimeInputGate<T extends IOReadableWritable> extends AbstractGate<T> implements InputGate<T> {
-	
-	/**
-	 * The log object used for debugging.
-	 */
-	private static final Log LOG = LogFactory.getLog(InputGate.class);
-
-	/**
-	 * The deserializer factory used to instantiate the deserializers that construct records from byte streams.
-	 */
-	private final RecordDeserializerFactory<T> deserializerFactory;
-
-	/**
-	 * The list of input channels attached to this input gate.
-	 */
-	private final ArrayList<AbstractInputChannel<T>> inputChannels = new ArrayList<AbstractInputChannel<T>>();
-
-	/**
-	 * Queue with indices of channels that store at least one available record.
-	 */
-	private final BlockingQueue<Integer> availableChannels = new LinkedBlockingQueue<Integer>();
-
-	/**
-	 * The listener object to be notified when a channel has at least one record available.
-	 */
-	private final AtomicReference<RecordAvailabilityListener<T>> recordAvailabilityListener = new AtomicReference<RecordAvailabilityListener<T>>(null);
-	
-	
-	private AbstractTaskEvent currentEvent;
-
-	/**
-	 * If the value of this variable is set to <code>true</code>, the input gate is closed.
-	 */
-	private boolean isClosed = false;
-
-	/**
-	 * The channel to read from next.
-	 */
-	private int channelToReadFrom = -1;
-
-	/**
-	 * Constructs a new runtime input gate.
-	 * 
-	 * @param jobID
-	 *        the ID of the job this input gate belongs to
-	 * @param gateID
-	 *        the ID of the gate
-	 * @param deserializerFactory
-	 *        The factory used to instantiate the deserializers that construct records from byte streams.
-	 * @param index
-	 *        the index assigned to this input gate at the {@link Environment} object
-	 */
-	public RuntimeInputGate(final JobID jobID, final GateID gateID,
-						final RecordDeserializerFactory<T> deserializerFactory, final int index) {
-		super(jobID, gateID, index);
-		this.deserializerFactory = deserializerFactory;
-	}
-
-	/**
-	 * Adds a new input channel to the input gate.
-	 * 
-	 * @param inputChannel
-	 *        the input channel to be added.
-	 */
-	private void addInputChannel(AbstractInputChannel<T> inputChannel) {
-		// in high DOPs, this can be a serious performance issue, as adding all channels and checking linearly has a
-		// quadratic complexity!
-		if (!this.inputChannels.contains(inputChannel)) {
-			this.inputChannels.add(inputChannel);
-		}
-	}
-
-	/**
-	 * Removes the input channel with the given ID from the input gate if it exists.
-	 * 
-	 * @param inputChannelID
-	 *        the ID of the channel to be removed
-	 */
-	public void removeInputChannel(ChannelID inputChannelID) {
-
-		for (int i = 0; i < this.inputChannels.size(); i++) {
-
-			final AbstractInputChannel<T> inputChannel = this.inputChannels.get(i);
-			if (inputChannel.getID().equals(inputChannelID)) {
-				this.inputChannels.remove(i);
-				return;
-			}
-		}
-		
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Cannot find output channel with ID " + inputChannelID + " to remove");
-		}
-	}
-
-	@Override
-	public boolean isInputGate() {
-		return true;
-	}
-
-	@Override
-	public int getNumberOfInputChannels() {
-		return this.inputChannels.size();
-	}
-
-	@Override
-	public AbstractInputChannel<T> getInputChannel(int pos) {
-		return this.inputChannels.get(pos);
-	}
-
-
-	@Override
-	public NetworkInputChannel<T> createNetworkInputChannel(final InputGate<T> inputGate, final ChannelID channelID,
-			final ChannelID connectedChannelID) {
-
-		final NetworkInputChannel<T> enic = new NetworkInputChannel<T>(inputGate, this.inputChannels.size(),
-			this.deserializerFactory.createDeserializer(), channelID, connectedChannelID);
-		addInputChannel(enic);
-
-		return enic;
-	}
-
-
-	@Override
-	public InMemoryInputChannel<T> createInMemoryInputChannel(final InputGate<T> inputGate, final ChannelID channelID,
-			final ChannelID connectedChannelID) {
-
-		final InMemoryInputChannel<T> eimic = new InMemoryInputChannel<T>(inputGate, this.inputChannels.size(),
-			this.deserializerFactory.createDeserializer(), channelID, connectedChannelID);
-		addInputChannel(eimic);
-
-		return eimic;
-	}
-
-
-	@Override
-	public InputChannelResult readRecord(T target) throws IOException, InterruptedException {
-
-		if (this.channelToReadFrom == -1) {
-			if (this.isClosed()) {
-				return InputChannelResult.END_OF_STREAM;
-			}
-				
-			if (Thread.interrupted()) {
-				throw new InterruptedException();
-			}
-				
-			this.channelToReadFrom = waitForAnyChannelToBecomeAvailable();
-		}
-			
-		InputChannelResult result = this.getInputChannel(this.channelToReadFrom).readRecord(target);
-		switch (result) {
-			case INTERMEDIATE_RECORD_FROM_BUFFER: // full record and we can stay on the same channel
-				return InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER;
-				
-			case LAST_RECORD_FROM_BUFFER: // full record, but we must switch the channel afterwards
-				this.channelToReadFrom = -1;
-				return InputChannelResult.LAST_RECORD_FROM_BUFFER;
-				
-			case END_OF_SUPERSTEP:
-				this.channelToReadFrom = -1;
-				return InputChannelResult.END_OF_SUPERSTEP;
-				
-			case TASK_EVENT: // task event
-				this.currentEvent = this.getInputChannel(this.channelToReadFrom).getCurrentEvent();
-				this.channelToReadFrom = -1;	// event always marks a unit as consumed
-				return InputChannelResult.TASK_EVENT;
-					
-			case NONE: // internal event or an incomplete record that needs further chunks
-				// the current unit is exhausted
-				this.channelToReadFrom = -1;
-				return InputChannelResult.NONE;
-				
-			case END_OF_STREAM: // channel is done
-				this.channelToReadFrom = -1;
-				return isClosed() ? InputChannelResult.END_OF_STREAM : InputChannelResult.NONE;
-				
-			default:   // silence the compiler
-				throw new RuntimeException();
-		}
-	}
-	
-	@Override
-	public AbstractTaskEvent getCurrentEvent() {
-		AbstractTaskEvent e = this.currentEvent;
-		this.currentEvent = null;
-		return e;
-	}
-
-	@Override
-	public void notifyRecordIsAvailable(int channelIndex) {
-		this.availableChannels.add(Integer.valueOf(channelIndex));
-
-		RecordAvailabilityListener<T> listener = this.recordAvailabilityListener.get();
-		if (listener != null) {
-			listener.reportRecordAvailability(this);
-		}
-	}
-
-	/**
-	 * This method returns the index of a channel which has at least
-	 * one record available. The method may block until at least one
-	 * channel has become ready.
-	 * 
-	 * @return the index of the channel which has at least one record available
-	 */
-	public int waitForAnyChannelToBecomeAvailable() throws InterruptedException {
-		return this.availableChannels.take().intValue();
-	}
-
-
-	@Override
-	public boolean isClosed() throws IOException, InterruptedException {
-
-		if (this.isClosed) {
-			return true;
-		}
-
-		for (int i = 0; i < this.getNumberOfInputChannels(); i++) {
-			final AbstractInputChannel<T> inputChannel = this.inputChannels.get(i);
-			if (!inputChannel.isClosed()) {
-				return false;
-			}
-		}
-
-		this.isClosed = true;
-		
-		return true;
-	}
-
-
-	@Override
-	public void close() throws IOException, InterruptedException {
-
-		for (int i = 0; i < this.getNumberOfInputChannels(); i++) {
-			final AbstractInputChannel<T> inputChannel = this.inputChannels.get(i);
-			inputChannel.close();
-		}
-
-	}
-
-
-	@Override
-	public String toString() {
-		return "Input " + super.toString();
-	}
-
-
-	@Override
-	public void publishEvent(AbstractEvent event) throws IOException, InterruptedException {
-
-		// Copy event to all connected channels
-		final Iterator<AbstractInputChannel<T>> it = this.inputChannels.iterator();
-		while (it.hasNext()) {
-			it.next().transferEvent(event);
-		}
-	}
-
-	/**
-	 * Returns the {@link RecordDeserializerFactory} used by this input gate.
-	 * 
-	 * @return The {@link RecordDeserializerFactory} used by this input gate.
-	 */
-	public RecordDeserializerFactory<T> getRecordDeserializerFactory() {
-		return this.deserializerFactory;
-	}
-
-
-	@Override
-	public void releaseAllChannelResources() {
-
-		final Iterator<AbstractInputChannel<T>> it = this.inputChannels.iterator();
-		while (it.hasNext()) {
-			it.next().releaseAllResources();
-		}
-	}
-
-	@Override
-	public void registerRecordAvailabilityListener(final RecordAvailabilityListener<T> listener) {
-		if (!this.recordAvailabilityListener.compareAndSet(null, listener)) {
-			throw new IllegalStateException(this.recordAvailabilityListener
-				+ " is already registered as a record availability listener");
-		}
-	}
-
-	public void notifyDataUnitConsumed(int channelIndex) {
-		this.channelToReadFrom = -1;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RuntimeOutputGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RuntimeOutputGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RuntimeOutputGate.java
deleted file mode 100644
index 20efd94..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RuntimeOutputGate.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.io.channels.AbstractOutputChannel;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.io.channels.bytebuffered.InMemoryOutputChannel;
-import eu.stratosphere.nephele.io.channels.bytebuffered.NetworkOutputChannel;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * In Nephele output gates are a specialization of general gates and connect
- * record writers and output channels. As channels, output gates are always
- * parameterized to a specific type of record which they can transport.
- * <p>
- * This class is in general not thread-safe.
- * 
- * @param <T>
- *        the type of record that can be transported through this gate
- */
-public class RuntimeOutputGate<T extends IOReadableWritable> extends AbstractGate<T> implements OutputGate<T> {
-
-	/**
-	 * The log object used for debugging.
-	 */
-	private static final Log LOG = LogFactory.getLog(OutputGate.class);
-
-	/**
-	 * The list of output channels attached to this gate.
-	 */
-	private final ArrayList<AbstractOutputChannel<T>> outputChannels = new ArrayList<AbstractOutputChannel<T>>();
-
-	/**
-	 * Channel selector to determine which channel is supposed receive the next record.
-	 */
-	private final ChannelSelector<T> channelSelector;
-
-	/**
-	 * The class of the record transported through this output gate.
-	 */
-	private final Class<T> type;
-
-	/**
-	 * Stores whether all records passed to this output gate shall be transmitted through all connected output channels.
-	 */
-	private final boolean isBroadcast;
-
-	/**
-	 * Constructs a new runtime output gate.
-	 * 
-	 * @param jobID
-	 *        the ID of the job this input gate belongs to
-	 * @param gateID
-	 *        the ID of the gate
-	 * @param inputClass
-	 *        the class of the record that can be transported through this
-	 *        gate
-	 * @param index
-	 *        the index assigned to this output gate at the {@link Environment} object
-	 * @param channelSelector
-	 *        the channel selector to be used for this output gate
-	 * @param isBroadcast
-	 *        <code>true</code> if every records passed to this output gate shall be transmitted through all connected
-	 *        output channels, <code>false</code> otherwise
-	 */
-	public RuntimeOutputGate(final JobID jobID, final GateID gateID, final Class<T> inputClass, final int index,
-			final ChannelSelector<T> channelSelector, final boolean isBroadcast) {
-
-		super(jobID, gateID, index);
-
-		this.isBroadcast = isBroadcast;
-		this.type = inputClass;
-
-		if (this.isBroadcast) {
-			this.channelSelector = null;
-		} else {
-			if (channelSelector == null) {
-				this.channelSelector = new DefaultChannelSelector<T>();
-			} else {
-				this.channelSelector = channelSelector;
-			}
-		}
-	}
-
-
-	@Override
-	public final Class<T> getType() {
-		return this.type;
-	}
-
-	/**
-	 * Adds a new output channel to the output gate.
-	 * 
-	 * @param outputChannel
-	 *        the output channel to be added.
-	 */
-	private void addOutputChannel(AbstractOutputChannel<T> outputChannel) {
-		if (!this.outputChannels.contains(outputChannel)) {
-			this.outputChannels.add(outputChannel);
-		}
-	}
-
-	/**
-	 * Removes the output channel with the given ID from the output gate if it
-	 * exists.
-	 * 
-	 * @param outputChannelID
-	 *        the ID of the channel to be removed
-	 */
-	public void removeOutputChannel(ChannelID outputChannelID) {
-
-		for (int i = 0; i < this.outputChannels.size(); i++) {
-
-			final AbstractOutputChannel<T> outputChannel = this.outputChannels.get(i);
-			if (outputChannel.getID().equals(outputChannelID)) {
-				this.outputChannels.remove(i);
-				return;
-			}
-		}
-
-		LOG.debug("Cannot find output channel with ID " + outputChannelID + " to remove");
-	}
-
-	/**
-	 * Removes all output channels from the output gate.
-	 */
-	public void removeAllOutputChannels() {
-
-		this.outputChannels.clear();
-	}
-
-
-	@Override
-	public boolean isInputGate() {
-
-		return false;
-	}
-
-
-	@Override
-	public int getNumberOfOutputChannels() {
-
-		return this.outputChannels.size();
-	}
-
-	/**
-	 * Returns the output channel from position <code>pos</code> of the gate's
-	 * internal channel list.
-	 * 
-	 * @param pos
-	 *        the position to retrieve the channel from
-	 * @return the channel from the given position or <code>null</code> if such
-	 *         position does not exist.
-	 */
-	public AbstractOutputChannel<T> getOutputChannel(int pos) {
-
-		if (pos < this.outputChannels.size()) {
-			return this.outputChannels.get(pos);
-		} else {
-			return null;
-		}
-	}
-
-
-	@Override
-	public NetworkOutputChannel<T> createNetworkOutputChannel(final OutputGate<T> outputGate,
-			final ChannelID channelID, final ChannelID connectedChannelID) {
-
-		final NetworkOutputChannel<T> enoc = new NetworkOutputChannel<T>(outputGate, this.outputChannels.size(),
-			channelID, connectedChannelID);
-		addOutputChannel(enoc);
-
-		return enoc;
-	}
-
-
-	@Override
-	public InMemoryOutputChannel<T> createInMemoryOutputChannel(final OutputGate<T> outputGate,
-			final ChannelID channelID, final ChannelID connectedChannelID) {
-
-		final InMemoryOutputChannel<T> einoc = new InMemoryOutputChannel<T>(outputGate, this.outputChannels.size(),
-			channelID, connectedChannelID);
-		addOutputChannel(einoc);
-
-		return einoc;
-	}
-
-
-	@Override
-	public void requestClose() throws IOException, InterruptedException {
-		// Close all output channels
-		for (int i = 0; i < this.getNumberOfOutputChannels(); i++) {
-			final AbstractOutputChannel<T> outputChannel = this.getOutputChannel(i);
-			outputChannel.requestClose();
-		}
-	}
-
-
-	@Override
-	public boolean isClosed() throws IOException, InterruptedException {
-
-		boolean allClosed = true;
-
-		for (int i = 0; i < this.getNumberOfOutputChannels(); i++) {
-			final AbstractOutputChannel<T> outputChannel = this.getOutputChannel(i);
-			if (!outputChannel.isClosed()) {
-				allClosed = false;
-			}
-		}
-
-		return allClosed;
-	}
-
-
-	@Override
-	public void writeRecord(final T record) throws IOException, InterruptedException {
-
-		if (this.isBroadcast) {
-
-			if (getChannelType() == ChannelType.INMEMORY) {
-
-				final int numberOfOutputChannels = this.outputChannels.size();
-				for (int i = 0; i < numberOfOutputChannels; ++i) {
-					this.outputChannels.get(i).writeRecord(record);
-				}
-
-			} else {
-
-				// Use optimization for byte buffered channels
-				this.outputChannels.get(0).writeRecord(record);
-			}
-
-		} else {
-
-			// Non-broadcast gate, use channel selector to select output channels
-			final int numberOfOutputChannels = this.outputChannels.size();
-			final int[] selectedOutputChannels = this.channelSelector.selectChannels(record, numberOfOutputChannels);
-			
-			if (selectedOutputChannels == null) {
-				return;
-			}
-
-			
-			for (int i = 0; i < selectedOutputChannels.length; ++i) {
-				if (selectedOutputChannels[i] < numberOfOutputChannels) {
-					final AbstractOutputChannel<T> outputChannel = this.outputChannels.get(selectedOutputChannels[i]);
-					outputChannel.writeRecord(record);
-				}
-			}
-		}
-	}
-
-
-	@Override
-	public List<AbstractOutputChannel<T>> getOutputChannels() {
-		return this.outputChannels;
-	}
-
-
-	@Override
-	public String toString() {
-		return "Output " + super.toString();
-	}
-
-
-	@Override
-	public void publishEvent(AbstractEvent event) throws IOException, InterruptedException {
-
-		// Copy event to all connected channels
-		final Iterator<AbstractOutputChannel<T>> it = this.outputChannels.iterator();
-		while (it.hasNext()) {
-			it.next().transferEvent(event);
-		}
-	}
-
-
-	@Override
-	public void flush() throws IOException, InterruptedException {
-		// Flush all connected channels
-		final Iterator<AbstractOutputChannel<T>> it = this.outputChannels.iterator();
-		while (it.hasNext()) {
-			it.next().flush();
-		}
-	}
-
-
-	@Override
-	public boolean isBroadcast() {
-
-		return this.isBroadcast;
-	}
-
-
-	@Override
-	public ChannelSelector<T> getChannelSelector() {
-
-		return this.channelSelector;
-	}
-
-
-	@Override
-	public void releaseAllChannelResources() {
-
-		final Iterator<AbstractOutputChannel<T>> it = this.outputChannels.iterator();
-
-		while (it.hasNext()) {
-			it.next().releaseAllResources();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/UnionRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/UnionRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/UnionRecordReader.java
deleted file mode 100644
index ec8ef0c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/UnionRecordReader.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-public final class UnionRecordReader<T extends IOReadableWritable> extends AbstractUnionRecordReader<T> implements Reader<T> {
-	
-	private final Class<T> recordType;
-	
-	private T lookahead;
-	
-
-	public UnionRecordReader(MutableRecordReader<T>[] recordReaders, Class<T> recordType) {
-		super(recordReaders);
-		this.recordType = recordType;
-	}
-
-	@Override
-	public boolean hasNext() throws IOException, InterruptedException {
-		if (this.lookahead != null) {
-			return true;
-		} else {
-			T record = instantiateRecordType();
-			if (getNextRecord(record)) {
-				this.lookahead = record;
-				return true;
-			} else {
-				return false;
-			}
-		}
-	}
-
-	@Override
-	public T next() throws IOException, InterruptedException {
-		if (hasNext()) {
-			T tmp = this.lookahead;
-			this.lookahead = null;
-			return tmp;
-		} else {
-			return null;
-		}
-	}
-	
-	private T instantiateRecordType() {
-		try {
-			return this.recordType.newInstance();
-		} catch (InstantiationException e) {
-			throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
-		} catch (IllegalAccessException e) {
-			throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Writer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Writer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Writer.java
deleted file mode 100644
index 91fa3b7..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Writer.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-/**
- * A writer that sends records.
- * 
- * @param <T> The type of the record that can be emitted with this record writer.
- */
-public interface Writer<T extends IOReadableWritable> {
-	
-	void emit(T record) throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractChannel.java
deleted file mode 100644
index b48e5f0..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractChannel.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * An abstract base class for channel objects.
- */
-public abstract class AbstractChannel {
-
-	/**
-	 * The ID of the channel.
-	 */
-	private final ChannelID channelID;
-
-	/**
-	 * The ID of the connected channel.
-	 */
-	private final ChannelID connectedChannelID;
-
-	private final int channelIndex;
-
-	/**
-	 * Auxiliary constructor for channels
-	 * 
-	 * @param channelIndex
-	 *        the index of the channel in either the output or input gate
-	 * @param channelID
-	 *        the ID of the channel
-	 * @param connectedChannelID
-	 *        the ID of the channel this channel is connected to
-	 * @param compressionLevel
-	 *        the level of compression to be used for this channel
-	 */
-	protected AbstractChannel(final int channelIndex, final ChannelID channelID, final ChannelID connectedChannelID) {
-		this.channelIndex = channelIndex;
-		this.channelID = channelID;
-		this.connectedChannelID = connectedChannelID;
-	}
-
-	/**
-	 * Returns the ID of the channel.
-	 * 
-	 * @return the ID of the channel.
-	 */
-	public ChannelID getID() {
-		return this.channelID;
-	}
-
-	/**
-	 * Returns the channel's input at the associated gate.
-	 * 
-	 * @return the channel's input at the associated gate
-	 */
-	public int getChannelIndex() {
-		return this.channelIndex;
-	}
-
-	/**
-	 * Returns the type of the channel.
-	 * 
-	 * @return the type of the channel.
-	 */
-	public abstract ChannelType getType();
-
-	/**
-	 * Checks if the channel is closed, i.e. no more records can be transported through the channel.
-	 * 
-	 * @return <code>true</code> if the channel is closed, <code>false</code> otherwise
-	 * @throws IOException
-	 *         thrown if an error occurred while closing the channel
-	 * @throws InterruptedException
-	 *         thrown if the channel is interrupted while waiting for this operation to complete
-	 */
-	public abstract boolean isClosed() throws IOException, InterruptedException;
-
-	
-	public ChannelID getConnectedChannelID() {
-		return this.connectedChannelID;
-	}
-
-
-	/**
-	 * Returns the ID of the job this channel belongs to.
-	 * 
-	 * @return the ID of the job this channel belongs to
-	 */
-	public abstract JobID getJobID();
-
-	/**
-	 * Returns <code>true</code> if this channel is an input channel, <code>false</code> otherwise.
-	 * 
-	 * @return <code>true</code> if this channel is an input channel, <code>false</code> otherwise
-	 */
-	public abstract boolean isInputChannel();
-
-	
-	public abstract void transferEvent(AbstractEvent event) throws IOException, InterruptedException;
-
-	/**
-	 * Releases all resources (especially buffers) which are currently allocated by this channel. This method should be
-	 * called in case of a task error or as a result of a cancel operation.
-	 */
-	public abstract void releaseAllResources();
-
-	/**
-	 * Returns the number of bytes which have been transmitted through this channel since its instantiation.
-	 * 
-	 * @return the number of bytes which have been transmitted through this channel since its instantiation
-	 */
-	public abstract long getAmountOfDataTransmitted();
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractInputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractInputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractInputChannel.java
deleted file mode 100644
index 4b88aff..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractInputChannel.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.io.InputChannelResult;
-import eu.stratosphere.nephele.io.InputGate;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * InputChannel is an abstract base class to all different kinds of concrete
- * input channels that can be used. Input channels are always parameterized to
- * a specific type that can be transported through the channel.
-
- * @param <T> The Type of the record that can be transported through the channel.
- */
-public abstract class AbstractInputChannel<T extends IOReadableWritable> extends AbstractChannel {
-
-	private final InputGate<T> inputGate;
-
-	/**
-	 * Constructs an input channel with a given input gate associated.
-	 * 
-	 * @param inputGate
-	 *        the input gate this channel is connected to
-	 * @param channelIndex
-	 *        the index of the channel in the input gate
-	 * @param channelID
-	 *        the ID of the channel
-	 * @param connectedChannelID
-	 *        the ID of the channel this channel is connected to
-	 * @param compressionLevel
-	 *        the level of compression to be used for this channel
-	 */
-	protected AbstractInputChannel(final InputGate<T> inputGate, final int channelIndex, final ChannelID channelID,
-			final ChannelID connectedChannelID) {
-		super(channelIndex, channelID, connectedChannelID);
-		this.inputGate = inputGate;
-	}
-
-	/**
-	 * Returns the input gate associated with the input channel.
-	 * 
-	 * @return the input gate associated with the input channel.
-	 */
-	public InputGate<T> getInputGate() {
-		return this.inputGate;
-	}
-
-	/**
-	 * Reads a record from the input channel. If currently no record is available the method
-	 * returns <code>null</code>. If the channel is closed (i.e. no more records will be received), the method
-	 * throws an {@link EOFException}.
-	 * 
-	 * @return a record that has been transported through the channel or <code>null</code> if currently no record is
-	 *         available
-	 * @throws IOException
-	 *         thrown if the input channel is already closed {@link EOFException} or a transmission error has occurred
-	 */
-	public abstract InputChannelResult readRecord(T target) throws IOException;
-
-	/**
-	 * Immediately closes the input channel. The corresponding output channels are
-	 * notified if necessary. Any remaining records in any buffers or queue is considered
-	 * irrelevant and is discarded.
-	 * 
-	 * @throws InterruptedException
-	 *         thrown if the thread is interrupted while waiting for the channel to close
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while closing the channel
-	 */
-	public abstract void close() throws IOException, InterruptedException;
-
-
-
-	@Override
-	public boolean isInputChannel() {
-		return true;
-	}
-
-
-	@Override
-	public JobID getJobID() {
-		return this.inputGate.getJobID();
-	}
-	
-	public abstract AbstractTaskEvent getCurrentEvent();
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractOutputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractOutputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractOutputChannel.java
deleted file mode 100644
index 7974c24..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractOutputChannel.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.OutputGate;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * OutputChannel is an abstract base class to all different kinds of concrete
- * output channels that can be used. Input channels are always parameterized to
- * a specific type that can be transported through the channel.
- * 
- * @param <T>
- *        The Type of the record that can be transported through the channel.
- */
-public abstract class AbstractOutputChannel<T extends IOReadableWritable> extends AbstractChannel {
-
-	private final OutputGate<T> outputGate;
-
-	/**
-	 * Creates a new output channel object.
-	 * 
-	 * @param outputGate
-	 *        the output gate this channel is connected to
-	 * @param channelIndex
-	 *        the index of the channel in the output gate
-	 * @param channelID
-	 *        the ID of the channel
-	 * @param connectedChannelID
-	 *        the ID of the channel this channel is connected to
-	 * @param compressionLevel
-	 *        the level of compression to be used for this channel
-	 */
-	public AbstractOutputChannel(final OutputGate<T> outputGate, final int channelIndex, final ChannelID channelID,
-			final ChannelID connectedChannelID) {
-		super(channelIndex, channelID, connectedChannelID);
-		this.outputGate = outputGate;
-	}
-
-	/**
-	 * Returns the output gate this channel is connected to.
-	 * 
-	 * @return the output gate this channel is connected to
-	 */
-	public OutputGate<T> getOutputGate() {
-		return this.outputGate;
-	}
-
-	/**
-	 * Writes a record to the channel. The operation may block until the record
-	 * is completely written to the channel.
-	 * 
-	 * @param record
-	 *        the record to be written to the channel
-	 * @throws IOException
-	 *         thrown if an error occurred while transmitting the record
-	 */
-	public abstract void writeRecord(T record) throws IOException, InterruptedException;
-
-	/**
-	 * Requests the output channel to close. After calling this method no more records can be written
-	 * to the channel. The channel is finally closed when all remaining data that may exist in internal buffers
-	 * are written to the channel.
-	 * 
-	 * @throws InterruptedException
-	 *         thrown if the thread is interrupted while requesting the close operation
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while requesting the close operation
-	 */
-	public abstract void requestClose() throws IOException, InterruptedException;
-
-
-
-	@Override
-	public boolean isInputChannel() {
-		return false;
-	}
-
-	public abstract void flush() throws IOException, InterruptedException;
-
-
-	@Override
-	public JobID getJobID() {
-		return this.outputGate.getJobID();
-	}
-
-	/**
-	 * Returns <code>true</code> if this channel is connected to an output gate which operates in broadcast mode,
-	 * <code>false</code> otherwise.
-	 * 
-	 * @return <code>true</code> if the connected output gate operates in broadcase mode, <code>false</code> otherwise
-	 */
-	public boolean isBroadcastChannel() {
-
-		return this.outputGate.isBroadcast();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/Buffer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/Buffer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/Buffer.java
deleted file mode 100644
index 80dfa8f..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/Buffer.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-
-/**
- * This class represents the general buffer abstraction that is used by Nephele
- * to transport data through the network or the file system.
- * <p>
- * Buffers may be backed by actual main memory or files.
- * <p>
- * Each buffer is expected to be written and read exactly once. Initially, the every buffer is in write mode. Before
- * reading from the buffer, it must be explicitly switched to read mode.
- * <p>
- * This class is in general not thread-safe.
- * 
- */
-public abstract class Buffer implements ReadableByteChannel, WritableByteChannel
-{
-	/**
-	 * Stores whether this buffer has already been recycled.
-	 */
-	private final AtomicBoolean isRecycled = new AtomicBoolean(false);
-
-	/**
-	 * Constructs a new buffer object.
-	 * 
-	 * @param internalBuffer
-	 *        the concrete implementation which backs the buffer
-	 */
-	protected Buffer()
-	{}
-
-	/**
-	 * Reads data from the buffer and writes it to the
-	 * given {@link WritableByteChannel} object.
-	 * 
-	 * @param destination The {@link WritableByteChannel} object to write the data to
-	 * @return The number of bytes read from the buffer, potentially <code>0</code> or <code>-1</code to indicate the
-	 *         end of the stream.
-	 * @throws IOException Thrown if an error occurs while writing to the {@link WritableByteChannel} object.
-	 */
-
-	public abstract boolean isOpen();
-	
-	
-
-	@Override
-	public abstract void close() throws IOException;
-	
-	/**
-	 * Reads data from the given {@link ReadableByteChannel} object and
-	 * writes it to the buffer.
-	 * 
-	 * @param source The {@link ReadableByteChannel} object to read data from.
-	 * @return The number of bytes written to the buffer, possibly <code>0</code>.
-	 * @throws IOException Thrown if an error occurs while writing data to the buffer.
-	 */
-	public abstract int write(ReadableByteChannel source) throws IOException;
-	
-
-	/**
-	 * Returns the number of bytes which can be either still written to or read from
-	 * the buffer, depending whether the buffer is still in write mode or not.
-	 * <p>
-	 * If in write mode, the method returns the number of bytes which can be written to be buffer, before its capacity
-	 * limit is reached. In read mode, the method returns the number of bytes which can be read from the number until
-	 * all data previously written to the buffer is consumed.
-	 * 
-	 * @return the number of bytes which can be either written to or read from the buffer
-	 */
-	public abstract int remaining();
-
-	/**
-	 * Checks whether data can still be written to or read from the buffer.
-	 * 
-	 * @return <code>true</code> if data can be still written to or read from
-	 *         the buffer, <code>false</code> otherwise
-	 */
-	public boolean hasRemaining() {
-		return remaining() > 0;
-	}
-
-	/**
-	 * Returns the size of the buffer. In write mode, the size of the buffer is the initial capacity
-	 * of the buffer. In read mode, the size of the buffer is number of bytes which have been
-	 * previously written to the buffer.
-	 * 
-	 * @return the size of the buffer in bytes
-	 */
-	public abstract int size();
-
-	/**
-	 * Recycles the buffer. In case of a memory backed buffer, the internal memory buffer
-	 * is returned to a global buffer queue. In case of a file backed buffer, the temporary
-	 * file created for this buffer is deleted. A buffer can only be recycled once. Calling this method more than once
-	 * will therefore have no effect.
-	 */
-	public final void recycleBuffer()
-	{
-		if (this.isRecycled.compareAndSet(false, true)) {
-			recycle();
-		}
-	}
-	
-	protected abstract void recycle();
-
-
-	/**
-	 * Returns whether the buffer is backed by main memory or a file.
-	 * 
-	 * @return <code>true</code> if the buffer is backed by main memory
-	 *         or <code>false</code> if it is backed by a file
-	 */
-	public abstract boolean isBackedByMemory();
-
-	/**
-	 * Copies the content of the buffer to the given destination buffer. The state of the source buffer is not modified
-	 * by this operation.
-	 * 
-	 * @param destinationBuffer
-	 *        the destination buffer to copy this buffer's content to
-	 * @throws IOException
-	 *         thrown if an error occurs while copying the data
-	 */
-	public abstract void copyToBuffer(Buffer destinationBuffer) throws IOException;
-
-	/**
-	 * Duplicates the buffer. This operation does not duplicate the actual
-	 * content of the buffer, only the reading/writing state. As a result,
-	 * modifications to the original buffer will affect the duplicate and vice-versa.
-	 * 
-	 * @return the duplicated buffer
-	 */
-	public abstract Buffer duplicate() throws IOException, InterruptedException;
-
-	/**
-	 * Reads data from the buffer and writes it to the
-	 * given {@link WritableByteChannel} object.
-	 * 
-	 * @param destination The {@link WritableByteChannel} object to write the data to
-	 * @return The number of bytes read from the buffer, potentially <code>0</code> or <code>-1</code to indicate the
-	 *         end of the stream.
-	 * @throws IOException Thrown if an error occurs while writing to the {@link WritableByteChannel} object.
-	 */
-	public abstract int writeTo(WritableByteChannel writableByteChannel) throws IOException;
-
-	/**
-	 * Flip buffer (exchange limit and position).
-	 */
-	public abstract void flip();
-	
-	/**
-	 * Returns the current read/write position for relative operations.
-	 * @return
-	 */
-	public abstract int position();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/BufferFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/BufferFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/BufferFactory.java
deleted file mode 100644
index 001e5a7..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/BufferFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import eu.stratosphere.core.memory.MemorySegment;
-
-
-public final class BufferFactory {
-
-	public static MemoryBuffer createFromMemory(final int bufferSize, final MemorySegment byteBuffer,
-			final MemoryBufferPoolConnector bufferPoolConnector) {
-
-		return new MemoryBuffer(bufferSize, byteBuffer, bufferPoolConnector);
-	}
-
-	/**
-	 * Private constructor to prevent instantiation.
-	 */
-	private BufferFactory() {
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelID.java
deleted file mode 100644
index a4e3ba2..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelID.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import eu.stratosphere.nephele.io.AbstractID;
-
-/**
- * A class for statistically unique channel IDs.
- * 
- */
-public class ChannelID extends AbstractID {
-
-	/**
-	 * Constructs a new, random channel ID.
-	 */
-	public ChannelID() {
-		super();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelType.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelType.java
deleted file mode 100644
index 17fc980..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelType.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-/**
- * An enumeration for declaring the type of channel.
- * 
- */
-public enum ChannelType {
-	
-	/**
-	 * Enumeration type for network channels.
-	 */
-	NETWORK,
-
-	/**
-	 * Enumeration type for in-memory channels.
-	 */
-	INMEMORY
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelWithAccessInfo.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelWithAccessInfo.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelWithAccessInfo.java
deleted file mode 100644
index a44b1bd..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelWithAccessInfo.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.nio.channels.FileChannel;
-
-interface ChannelWithAccessInfo {
-
-	FileChannel getChannel();
-
-	FileChannel getAndIncrementReferences();
-
-	/**
-	 * Increments the references to this channel. Returns <code>true</code>, if successful, and <code>false</code>,
-	 * if the channel has been disposed in the meantime.
-	 * 
-	 * @return True, if successful, false, if the channel has been disposed.
-	 */
-	boolean incrementReferences();
-
-	ChannelWithPosition reserveWriteSpaceAndIncrementReferences(int spaceToReserve);
-
-	/**
-	 * Decrements the number of references to this channel. If the number of references is zero after the
-	 * decrement, the channel is deleted.
-	 * 
-	 * @return The number of references remaining after the decrement.
-	 * @throws IllegalStateException
-	 *         Thrown, if the number of references is already zero or below.
-	 */
-	int decrementReferences();
-
-	/**
-	 * Disposes the channel without further notice. Tries to close it (swallowing all exceptions) and tries
-	 * to delete the file.
-	 */
-	void disposeSilently();
-
-	/**
-	 * Updates the flag which indicates whether the underlying physical file shall be deleted when it is closed. Once
-	 * the flag was updated to <code>false</code> it cannot be set to <code>true</code> again.
-	 * 
-	 * @param deleteOnClose
-	 *        <code>true</code> to indicate the file shall be deleted when closed, <code>false</code> otherwise
-	 */
-	void updateDeleteOnCloseFlag(final boolean deleteOnClose);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelWithPosition.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelWithPosition.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelWithPosition.java
deleted file mode 100644
index 5678439..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelWithPosition.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.nio.channels.FileChannel;
-
-/**
- * A simple encapsulation of a file channel with an offset. This object is used for purposes, where
- * the channel is accessed by multiple threads and its internal position may be changed.
- */
-public class ChannelWithPosition {
-
-	private final FileChannel channel;
-
-	private final long offset;
-
-	ChannelWithPosition(final FileChannel channel, final long offset) {
-		this.channel = channel;
-		this.offset = offset;
-	}
-
-	public FileChannel getChannel() {
-
-		return this.channel;
-	}
-
-	public long getOffset() {
-
-		return this.offset;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/DefaultDeserializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/DefaultDeserializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/DefaultDeserializer.java
deleted file mode 100644
index 3f47fe1..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/DefaultDeserializer.java
+++ /dev/null
@@ -1,781 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.ReadableByteChannel;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.core.memory.DataInputView;
-import eu.stratosphere.nephele.io.RecordDeserializer;
-
-/**
- * A class for deserializing a portion of binary data into records of type <code>T</code>. The internal
- * buffer grows dynamically to the size that is required for deserialization.
- * 
- * @param <T>
- *        The type of the record this deserialization buffer can be used for.
- */
-public class DefaultDeserializer<T extends IOReadableWritable> implements RecordDeserializer<T> {
-	/**
-	 * The size of an integer in byte.
-	 */
-	private static final int SIZEOFINT = 4;
-
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * The data input buffer used for deserialization.
-	 */
-	private final DataInputWrapper deserializationWrapper;
-
-	/**
-	 * Buffer to reconstruct the length field.
-	 */
-	private final ByteBuffer lengthBuf;
-
-	/**
-	 * Temporary buffer.
-	 */
-	private ByteBuffer tempBuffer;
-
-	/**
-	 * The type of the record to be deserialized.
-	 */
-	private final Class<? extends T> recordType;
-
-	/**
-	 * Size of the record to be deserialized in bytes.
-	 */
-	private int recordLength = -1;
-
-	/**
-	 * Flag indicating whether to throw an exception if nothing can be read any more.
-	 */
-	private final boolean propagateEndOfStream;
-
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Constructs a new deserialization buffer with the specified type.
-	 * 
-	 * @param recordType
-	 *        The type of the record to be deserialized.
-	 */
-	public DefaultDeserializer(final Class<? extends T> recordType) {
-		this(recordType, false);
-	}
-
-	/**
-	 * Constructs a new deserialization buffer with the specified type.
-	 * 
-	 * @param recordType
-	 *        The type of the record to be deserialized.
-	 * @param propagateEndOfStream
-	 *        <code>True</code>, if end of stream notifications during the
-	 *        deserialization process shall be propagated to the caller, <code>false</code> otherwise.
-	 */
-	public DefaultDeserializer(final Class<? extends T> recordType, final boolean propagateEndOfStream) {
-		this.recordType = recordType;
-		this.propagateEndOfStream = propagateEndOfStream;
-
-		this.lengthBuf = ByteBuffer.allocate(SIZEOFINT);
-		this.lengthBuf.order(ByteOrder.BIG_ENDIAN);
-
-		this.tempBuffer = ByteBuffer.allocate(128);
-		this.tempBuffer.order(ByteOrder.BIG_ENDIAN);
-
-		this.deserializationWrapper = new DataInputWrapper();
-		this.deserializationWrapper.setArray(this.tempBuffer.array());
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	/*
-	 * (non-Javadoc)
-	 * @see eu.stratosphere.nephele.io.RecordDeserializer#readData(java.lang.Object,
-	 * java.nio.channels.ReadableByteChannel)
-	 */
-	@Override
-	public T readData(T target, final ReadableByteChannel readableByteChannel) throws IOException {
-		// check whether the length has already been de-serialized
-		final int len;
-		if (this.recordLength < 0) {
-			if (readableByteChannel.read(this.lengthBuf) == -1 && this.propagateEndOfStream) {
-				if (this.lengthBuf.position() == 0) {
-					throw new EOFException();
-				} else {
-					throw new IOException("Deserialization error: Expected to read " + this.lengthBuf.remaining()
-						+ " more bytes of length information from the stream!");
-				}
-			}
-
-			if (this.lengthBuf.hasRemaining()) {
-				return null;
-			}
-
-			len = this.lengthBuf.getInt(0);
-			this.lengthBuf.clear();
-
-			if (this.tempBuffer.capacity() < len) {
-				this.tempBuffer = ByteBuffer.allocate(len);
-				this.tempBuffer.order(ByteOrder.BIG_ENDIAN);
-				this.deserializationWrapper.setArray(this.tempBuffer.array());
-			}
-
-			// Important: limit the number of bytes that can be read into the buffer
-			this.tempBuffer.position(0);
-			this.tempBuffer.limit(len);
-		} else {
-			len = this.recordLength;
-		}
-
-		if (readableByteChannel.read(this.tempBuffer) == -1 && this.propagateEndOfStream) {
-			throw new IOException("Deserilization error: Expected to read " + this.tempBuffer.remaining()
-				+ " more bytes from stream!");
-		}
-
-		if (this.tempBuffer.hasRemaining()) {
-			this.recordLength = len;
-			return null;
-		} else {
-			this.recordLength = -1;
-		}
-
-		this.deserializationWrapper.reset(len);
-
-		if (target == null) {
-			target = instantiateTarget();
-		}
-
-		// now de-serialize the target
-		try {
-			target.read(this.deserializationWrapper);
-			return target;
-		} catch (BufferUnderflowException buex) {
-			throw new EOFException();
-		}
-	}
-
-	private final T instantiateTarget() throws IOException {
-		try {
-			return this.recordType.newInstance();
-		} catch (Exception e) {
-			throw new IOException("Could not instantiate the given record type: " + e.getMessage(), e);
-		}
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * @see eu.stratosphere.nephele.io.RecordDeserializer#clear()
-	 */
-	@Override
-	public void clear() {
-
-		this.recordLength = -1;
-		if (this.tempBuffer != null) {
-			this.tempBuffer.clear();
-		}
-		if (this.lengthBuf != null) {
-			this.lengthBuf.clear();
-		}
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * @see eu.stratosphere.nephele.io.RecordDeserializer#hasUnfinishedData()
-	 */
-	@Override
-	public boolean hasUnfinishedData() {
-		if (this.recordLength != -1) {
-			return true;
-		}
-
-		if (this.lengthBuf.position() > 0) {
-			return true;
-		}
-
-		return false;
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	// private static final class DataInputWrapper implements DataInputView
-	// {
-	// private ByteBuffer source;
-	//
-	// private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
-	// private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
-	//
-	//
-	// void set(ByteBuffer source) {
-	// this.source = source;
-	// }
-	//
-	//
-	// /* (non-Javadoc)
-	// * @see java.io.DataInput#readFully(byte[])
-	// */
-	// @Override
-	// public void readFully(byte[] b) {
-	// this.source.get(b);
-	// }
-	//
-	// /* (non-Javadoc)
-	// * @see java.io.DataInput#readFully(byte[], int, int)
-	// */
-	// @Override
-	// public void readFully(byte[] b, int off, int len) {
-	// this.source.get(b, off, len);
-	// }
-	//
-	// /* (non-Javadoc)
-	// * @see java.io.DataInput#skipBytes(int)
-	// */
-	// @Override
-	// public int skipBytes(int n) {
-	// int newPos = this.source.position() + n;
-	// if (newPos > this.source.limit()) {
-	// newPos = this.source.limit();
-	// n = newPos - this.source.position();
-	// }
-	// this.source.position(newPos);
-	// return n;
-	// }
-	//
-	// /* (non-Javadoc)
-	// * @see java.io.DataInput#readBoolean()
-	// */
-	// @Override
-	// public boolean readBoolean() {
-	// return this.source.get() != 0;
-	// }
-	//
-	// /* (non-Javadoc)
-	// * @see java.io.DataInput#readByte()
-	// */
-	// @Override
-	// public byte readByte() {
-	// return this.source.get();
-	// }
-	//
-	// /* (non-Javadoc)
-	// * @see java.io.DataInput#readUnsignedByte()
-	// */
-	// @Override
-	// public int readUnsignedByte() {
-	// return this.source.get() & 0xff;
-	// }
-	//
-	// /* (non-Javadoc)
-	// * @see java.io.DataInput#readShort()
-	// */
-	// @Override
-	// public short readShort() {
-	// return this.source.getShort();
-	// }
-	//
-	// /* (non-Javadoc)
-	// * @see java.io.DataInput#readUnsignedShort()
-	// */
-	// @Override
-	// public int readUnsignedShort() {
-	// return this.source.getShort() & 0xffff;
-	// }
-	//
-	// /* (non-Javadoc)
-	// * @see java.io.DataInput#readChar()
-	// */
-	// @Override
-	// public char readChar() {
-	// return this.source.getChar();
-	// }
-	//
-	// /* (non-Javadoc)
-	// * @see java.io.DataInput#readInt()
-	// */
-	// @Override
-	// public int readInt() {
-	// return this.source.getInt();
-	// }
-	//
-	// /* (non-Javadoc)
-	// * @see java.io.DataInput#readLong()
-	// */
-	// @Override
-	// public long readLong() {
-	// return this.source.getLong();
-	// }
-	//
-	// /* (non-Javadoc)
-	// * @see java.io.DataInput#readFloat()
-	// */
-	// @Override
-	// public float readFloat() {
-	// return Float.intBitsToFloat(this.source.getInt());
-	// }
-	//
-	// /* (non-Javadoc)
-	// * @see java.io.DataInput#readDouble()
-	// */
-	// @Override
-	// public double readDouble() {
-	// return Double.longBitsToDouble(this.source.getLong());
-	// }
-	//
-	// /* (non-Javadoc)
-	// * @see java.io.DataInput#readLine()
-	// */
-	// @Override
-	// public String readLine()
-	// {
-	// if (this.source.hasRemaining()) {
-	// // read until a newline is found
-	// StringBuilder bld = new StringBuilder();
-	// char curr;
-	// while (this.source.hasRemaining() && (curr = (char) readUnsignedByte()) != '\n') {
-	// bld.append(curr);
-	// }
-	// // trim a trailing carriage return
-	// int len = bld.length();
-	// if (len > 0 && bld.charAt(len - 1) == '\r') {
-	// bld.setLength(len - 1);
-	// }
-	// String s = bld.toString();
-	// bld.setLength(0);
-	// return s;
-	// } else {
-	// return null;
-	// }
-	// }
-	//
-	// /* (non-Javadoc)
-	// * @see java.io.DataInput#readUTF()
-	// */
-	// @Override
-	// public String readUTF() throws IOException
-	// {
-	// final int utflen = readUnsignedShort();
-	//
-	// final byte[] bytearr;
-	// final char[] chararr;
-	//
-	// if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) {
-	// bytearr = new byte[utflen];
-	// this.utfByteBuffer = bytearr;
-	// } else {
-	// bytearr = this.utfByteBuffer;
-	// }
-	// if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) {
-	// chararr = new char[utflen];
-	// this.utfCharBuffer = chararr;
-	// } else {
-	// chararr = this.utfCharBuffer;
-	// }
-	//
-	// int c, char2, char3;
-	// int count = 0;
-	// int chararr_count = 0;
-	//
-	// readFully(bytearr, 0, utflen);
-	//
-	// while (count < utflen) {
-	// c = (int) bytearr[count] & 0xff;
-	// if (c > 127)
-	// break;
-	// count++;
-	// chararr[chararr_count++] = (char) c;
-	// }
-	//
-	// while (count < utflen) {
-	// c = (int) bytearr[count] & 0xff;
-	// switch (c >> 4) {
-	// case 0:
-	// case 1:
-	// case 2:
-	// case 3:
-	// case 4:
-	// case 5:
-	// case 6:
-	// case 7:
-	// /* 0xxxxxxx */
-	// count++;
-	// chararr[chararr_count++] = (char) c;
-	// break;
-	// case 12:
-	// case 13:
-	// /* 110x xxxx 10xx xxxx */
-	// count += 2;
-	// if (count > utflen)
-	// throw new UTFDataFormatException("malformed input: partial character at end");
-	// char2 = (int) bytearr[count - 1];
-	// if ((char2 & 0xC0) != 0x80)
-	// throw new UTFDataFormatException("malformed input around byte " + count);
-	// chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
-	// break;
-	// case 14:
-	// /* 1110 xxxx 10xx xxxx 10xx xxxx */
-	// count += 3;
-	// if (count > utflen)
-	// throw new UTFDataFormatException("malformed input: partial character at end");
-	// char2 = (int) bytearr[count - 2];
-	// char3 = (int) bytearr[count - 1];
-	// if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
-	// throw new UTFDataFormatException("malformed input around byte " + (count - 1));
-	// chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
-	// break;
-	// default:
-	// /* 10xx xxxx, 1111 xxxx */
-	// throw new UTFDataFormatException("malformed input around byte " + count);
-	// }
-	// }
-	// // The number of chars produced may be less than utflen
-	// return new String(chararr, 0, chararr_count);
-	// }
-	//
-	//
-	// /* (non-Javadoc)
-	// * @see eu.stratosphere.nephele.services.memorymanager.DataInputView#skipBytesToRead(int)
-	// */
-	// @Override
-	// public void skipBytesToRead(int numBytes) throws IOException {
-	// if (this.source.remaining() < numBytes) {
-	// throw new EOFException();
-	// } else {
-	// this.source.position(this.source.position() + numBytes);
-	// }
-	// }
-	// }
-
-	private static final class DataInputWrapper implements DataInputView {
-		private byte[] source;
-
-		private int position;
-
-		private int limit;
-
-		private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
-
-		void setArray(byte[] source) {
-			this.source = source;
-		}
-
-		void reset(int limit) {
-			this.position = 0;
-			this.limit = limit;
-		}
-
-		/*
-		 * (non-Javadoc)
-		 * @see java.io.DataInput#readFully(byte[])
-		 */
-		@Override
-		public void readFully(byte[] b) throws EOFException {
-			readFully(b, 0, b.length);
-		}
-
-		/*
-		 * (non-Javadoc)
-		 * @see java.io.DataInput#readFully(byte[], int, int)
-		 */
-		@Override
-		public void readFully(byte[] b, int off, int len) throws EOFException {
-			if (this.position <= this.limit - len) {
-				System.arraycopy(this.source, this.position, b, off, len);
-				this.position += len;
-			} else {
-				throw new EOFException();
-			}
-		}
-
-		/*
-		 * (non-Javadoc)
-		 * @see java.io.DataInput#skipBytes(int)
-		 */
-		@Override
-		public int skipBytes(int n) {
-			if (n < 0) {
-				throw new IllegalArgumentException("Number of bytes to skip must not be negative.");
-			}
-
-			int toSkip = Math.min(this.limit - this.position, n);
-			this.position += toSkip;
-			return toSkip;
-		}
-
-		/*
-		 * (non-Javadoc)
-		 * @see java.io.DataInput#readBoolean()
-		 */
-		@Override
-		public boolean readBoolean() throws EOFException {
-			return readByte() != 0;
-		}
-
-		/*
-		 * (non-Javadoc)
-		 * @see java.io.DataInput#readByte()
-		 */
-		@Override
-		public byte readByte() throws EOFException {
-			if (this.position < this.limit) {
-				return this.source[this.position++];
-			} else {
-				throw new EOFException();
-			}
-		}
-
-		/*
-		 * (non-Javadoc)
-		 * @see java.io.DataInput#readUnsignedByte()
-		 */
-		@Override
-		public int readUnsignedByte() throws EOFException {
-			return readByte() & 0xff;
-		}
-
-		/*
-		 * (non-Javadoc)
-		 * @see java.io.DataInput#readShort()
-		 */
-		@Override
-		public short readShort() throws EOFException {
-			if (this.position < this.limit - 1) {
-				short num = (short) (
-						((this.source[this.position + 0] & 0xff) << 8) |
-						((this.source[this.position + 1] & 0xff)));
-				this.position += 2;
-				return num;
-			} else {
-				throw new EOFException();
-			}
-		}
-
-		/*
-		 * (non-Javadoc)
-		 * @see java.io.DataInput#readUnsignedShort()
-		 */
-		@Override
-		public int readUnsignedShort() throws EOFException {
-			return readShort() & 0xffff;
-		}
-
-		/*
-		 * (non-Javadoc)
-		 * @see java.io.DataInput#readChar()
-		 */
-		@Override
-		public char readChar() throws EOFException {
-			if (this.position < this.limit - 1) {
-				char c = (char) (
-						((this.source[this.position + 0] & 0xff) << 8) |
-						((this.source[this.position + 1] & 0xff)));
-				this.position += 2;
-				return c;
-			} else {
-				throw new EOFException();
-			}
-		}
-
-		/*
-		 * (non-Javadoc)
-		 * @see java.io.DataInput#readInt()
-		 */
-		@Override
-		public int readInt() throws EOFException {
-			if (this.position < this.limit - 3) {
-				final int num = ((this.source[this.position + 0] & 0xff) << 24) |
-								((this.source[this.position + 1] & 0xff) << 16) |
-								((this.source[this.position + 2] & 0xff) << 8) |
-								((this.source[this.position + 3] & 0xff));
-				this.position += 4;
-				return num;
-			} else {
-				throw new EOFException();
-			}
-		}
-
-		/*
-		 * (non-Javadoc)
-		 * @see java.io.DataInput#readLong()
-		 */
-		@Override
-		public long readLong() throws EOFException {
-			if (this.position < this.limit - 7) {
-				final long num = (((long) this.source[this.position + 0] & 0xff) << 56) |
-									(((long) this.source[this.position + 1] & 0xff) << 48) |
-									(((long) this.source[this.position + 2] & 0xff) << 40) |
-									(((long) this.source[this.position + 3] & 0xff) << 32) |
-									(((long) this.source[this.position + 4] & 0xff) << 24) |
-									(((long) this.source[this.position + 5] & 0xff) << 16) |
-									(((long) this.source[this.position + 6] & 0xff) << 8) |
-									(((long) this.source[this.position + 7] & 0xff) << 0);
-				this.position += 8;
-				return num;
-			} else {
-				throw new EOFException();
-			}
-		}
-
-		/*
-		 * (non-Javadoc)
-		 * @see java.io.DataInput#readFloat()
-		 */
-		@Override
-		public float readFloat() throws EOFException {
-			return Float.intBitsToFloat(readInt());
-		}
-
-		/*
-		 * (non-Javadoc)
-		 * @see java.io.DataInput#readDouble()
-		 */
-		@Override
-		public double readDouble() throws EOFException {
-			return Double.longBitsToDouble(readLong());
-		}
-
-		/*
-		 * (non-Javadoc)
-		 * @see java.io.DataInput#readLine()
-		 */
-		@Override
-		public String readLine() {
-			if (this.position < this.limit) {
-				// read until a newline is found
-				StringBuilder bld = new StringBuilder();
-				char curr;
-				while (this.position < this.limit && (curr = (char) (this.source[this.position++] & 0xff)) != '\n') {
-					bld.append(curr);
-				}
-				// trim a trailing carriage return
-				int len = bld.length();
-				if (len > 0 && bld.charAt(len - 1) == '\r') {
-					bld.setLength(len - 1);
-				}
-				String s = bld.toString();
-				bld.setLength(0);
-				return s;
-			} else {
-				return null;
-			}
-		}
-
-		/*
-		 * (non-Javadoc)
-		 * @see java.io.DataInput#readUTF()
-		 */
-		@Override
-		public String readUTF() throws IOException {
-			final int utflen = readUnsignedShort();
-			final int utfLimit = this.position + utflen;
-
-			if (utfLimit > this.limit) {
-				throw new EOFException();
-			}
-
-			final byte[] bytearr = this.source;
-			final char[] chararr;
-			if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) {
-				chararr = new char[utflen];
-				this.utfCharBuffer = chararr;
-			} else {
-				chararr = this.utfCharBuffer;
-			}
-
-			int c, char2, char3;
-			int count = this.position;
-			int chararr_count = 0;
-
-			while (count < utfLimit) {
-				c = (int) bytearr[count] & 0xff;
-				if (c > 127) {
-					break;
-				}
-				count++;
-				chararr[chararr_count++] = (char) c;
-			}
-
-			while (count < utfLimit) {
-				c = (int) bytearr[count] & 0xff;
-				switch (c >> 4) {
-				case 0:
-				case 1:
-				case 2:
-				case 3:
-				case 4:
-				case 5:
-				case 6:
-				case 7:
-					/* 0xxxxxxx */
-					count++;
-					chararr[chararr_count++] = (char) c;
-					break;
-				case 12:
-				case 13:
-					/* 110x xxxx 10xx xxxx */
-					count += 2;
-					if (count > utfLimit) {
-						throw new UTFDataFormatException("Malformed input: partial character at end");
-					}
-					char2 = (int) bytearr[count - 1];
-					if ((char2 & 0xC0) != 0x80) {
-						throw new UTFDataFormatException("Malformed input around byte " + count);
-					}
-					chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
-					break;
-				case 14:
-					/* 1110 xxxx 10xx xxxx 10xx xxxx */
-					count += 3;
-					if (count > utfLimit) {
-						throw new UTFDataFormatException("Malformed input: partial character at end");
-					}
-					char2 = (int) bytearr[count - 2];
-					char3 = (int) bytearr[count - 1];
-					if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
-						throw new UTFDataFormatException("Malformed input around byte " + (count - 1));
-					}
-					chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
-					break;
-				default:
-					/* 10xx xxxx, 1111 xxxx */
-					throw new UTFDataFormatException("Malformed input around byte " + count);
-				}
-			}
-			// The number of chars produced may be less than utflen
-			this.position += utflen;
-			return new String(chararr, 0, chararr_count);
-		}
-
-		/*
-		 * (non-Javadoc)
-		 * @see eu.stratosphere.nephele.services.memorymanager.DataInputView#skipBytesToRead(int)
-		 */
-		@Override
-		public void skipBytesToRead(int numBytes) throws EOFException {
-			if (numBytes < 0) {
-				throw new IllegalArgumentException("Number of bytes to skip must not be negative.");
-			} else if (this.limit - this.position < numBytes) {
-				throw new EOFException();
-			} else {
-				this.position += numBytes;
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/DistributedChannelWithAccessInfo.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/DistributedChannelWithAccessInfo.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/DistributedChannelWithAccessInfo.java
deleted file mode 100644
index 6873c44..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/DistributedChannelWithAccessInfo.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.fs.FileChannelWrapper;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.fs.Path;
-
-final class DistributedChannelWithAccessInfo implements ChannelWithAccessInfo {
-
-	/**
-	 * The logging object.
-	 */
-	private static final Log LOG = LogFactory.getLog(DistributedChannelWithAccessInfo.class);
-
-	private final FileSystem fs;
-
-	private final Path checkpointFile;
-
-	private final FileChannelWrapper channel;
-
-	private final AtomicLong reservedWritePosition;
-
-	private final AtomicInteger referenceCounter;
-
-	private final AtomicBoolean deleteOnClose;
-
-	DistributedChannelWithAccessInfo(final FileSystem fs, final Path checkpointFile, final int bufferSize,
-			final boolean deleteOnClose) throws IOException {
-
-		this.fs = fs;
-		this.checkpointFile = checkpointFile;
-		this.channel = new FileChannelWrapper(fs, checkpointFile, bufferSize, (short) 2);
-		this.reservedWritePosition = new AtomicLong(0L);
-		this.referenceCounter = new AtomicInteger(0);
-		this.deleteOnClose = new AtomicBoolean(deleteOnClose);
-	}
-
-
-	@Override
-	public FileChannel getChannel() {
-
-		return this.channel;
-	}
-
-
-	@Override
-	public FileChannel getAndIncrementReferences() {
-
-		if (incrementReferences()) {
-			return this.channel;
-		} else {
-			return null;
-		}
-	}
-
-	@Override
-	public ChannelWithPosition reserveWriteSpaceAndIncrementReferences(final int spaceToReserve) {
-
-		if (incrementReferences()) {
-			return new ChannelWithPosition(this.channel, this.reservedWritePosition.getAndAdd(spaceToReserve));
-		} else {
-			return null;
-		}
-	}
-
-
-	@Override
-	public int decrementReferences() {
-
-		int current = this.referenceCounter.get();
-		while (true) {
-			if (current <= 0) {
-				// this is actually an error case, because the channel was deleted before
-				throw new IllegalStateException("The references to the file were already at zero.");
-			}
-
-			if (current == 1) {
-				// this call decrements to zero, so mark it as deleted
-				if (this.referenceCounter.compareAndSet(current, Integer.MIN_VALUE)) {
-					current = 0;
-					break;
-				}
-			} else if (this.referenceCounter.compareAndSet(current, current - 1)) {
-				current = current - 1;
-				break;
-			}
-			current = this.referenceCounter.get();
-		}
-
-		if (current > 0) {
-			return current;
-		} else if (current == 0) {
-			// delete the channel
-			this.referenceCounter.set(Integer.MIN_VALUE);
-			this.reservedWritePosition.set(Long.MIN_VALUE);
-			try {
-				this.channel.close();
-				if (this.deleteOnClose.get()) {
-					this.fs.delete(this.checkpointFile, false);
-				}
-
-			} catch (IOException ioex) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Error while closing spill file for file buffers: " + ioex.getMessage(), ioex);
-				}
-			}
-			return current;
-		} else {
-			throw new IllegalStateException("The references to the file were already at zero.");
-		}
-	}
-
-
-	@Override
-	public boolean incrementReferences() {
-
-		int current = this.referenceCounter.get();
-		while (true) {
-			// check whether it was disposed in the meantime
-			if (current < 0) {
-				return false;
-			}
-			// atomically check and increment
-			if (this.referenceCounter.compareAndSet(current, current + 1)) {
-				return true;
-			}
-			current = this.referenceCounter.get();
-		}
-	}
-
-
-	@Override
-	public void disposeSilently() {
-
-		this.referenceCounter.set(Integer.MIN_VALUE);
-		this.reservedWritePosition.set(Long.MIN_VALUE);
-
-		if (this.channel.isOpen()) {
-			try {
-				this.channel.close();
-				if (this.deleteOnClose.get()) {
-					this.fs.delete(this.checkpointFile, false);
-				}
-			} catch (Throwable t) {
-			}
-		}
-	}
-
-
-	@Override
-	public void updateDeleteOnCloseFlag(final boolean deleteOnClose) {
-
-		this.deleteOnClose.compareAndSet(true, deleteOnClose);
-	}
-}