You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:30:45 UTC

[10/30] Offer buffer-oriented API for I/O (#25)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
index 965a5aa..06c857e 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
@@ -13,44 +13,46 @@
 
 package eu.stratosphere.nephele.jobmanager;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
 import eu.stratosphere.configuration.ConfigConstants;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.configuration.GlobalConfiguration;
 import eu.stratosphere.core.fs.Path;
 import eu.stratosphere.nephele.client.JobClient;
 import eu.stratosphere.nephele.client.JobExecutionException;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.io.library.FileLineReader;
-import eu.stratosphere.nephele.io.library.FileLineWriter;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.jobgraph.JobFileInputVertex;
 import eu.stratosphere.nephele.jobgraph.JobFileOutputVertex;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
 import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
 import eu.stratosphere.nephele.jobmanager.JobManager.ExecutionMode;
+import eu.stratosphere.nephele.taskmanager.Task;
 import eu.stratosphere.nephele.taskmanager.TaskManager;
-import eu.stratosphere.nephele.taskmanager.runtime.RuntimeTask;
+import eu.stratosphere.nephele.util.FileLineReader;
+import eu.stratosphere.nephele.util.FileLineWriter;
 import eu.stratosphere.nephele.util.JarFileCreator;
 import eu.stratosphere.nephele.util.ServerTestUtils;
 import eu.stratosphere.util.LogUtils;
+import eu.stratosphere.util.StringUtils;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  * This test is intended to cover the basic functionality of the {@link JobManager}.
@@ -185,8 +187,8 @@ public class JobManagerITCase {
 			// connect vertices
 			try {
 				i1.connectTo(t1, ChannelType.NETWORK);
-				t1.connectTo(t2, ChannelType.INMEMORY);
-				t2.connectTo(o1, ChannelType.INMEMORY);
+				t1.connectTo(t2, ChannelType.IN_MEMORY);
+				t2.connectTo(o1, ChannelType.IN_MEMORY);
 			} catch (JobGraphDefinitionException e) {
 				e.printStackTrace();
 			}
@@ -286,8 +288,8 @@ public class JobManagerITCase {
 			o1.setVertexToShareInstancesWith(i1);
 
 			// connect vertices
-			i1.connectTo(t1, ChannelType.INMEMORY);
-			t1.connectTo(o1, ChannelType.INMEMORY);
+			i1.connectTo(t1, ChannelType.IN_MEMORY);
+			t1.connectTo(o1, ChannelType.IN_MEMORY);
 
 			// add jar
 			jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + exceptionClassName + ".jar")
@@ -297,7 +299,7 @@ public class JobManagerITCase {
 			jobClient = new JobClient(jg, configuration);
 			
 			// deactivate logging of expected test exceptions
-			Logger rtLogger = Logger.getLogger(RuntimeTask.class);
+			Logger rtLogger = Logger.getLogger(Task.class);
 			Level rtLevel = rtLogger.getEffectiveLevel();
 			rtLogger.setLevel(Level.OFF);
 			
@@ -382,8 +384,8 @@ public class JobManagerITCase {
 			o1.setVertexToShareInstancesWith(i1);
 
 			// connect vertices
-			i1.connectTo(t1, ChannelType.INMEMORY);
-			t1.connectTo(o1, ChannelType.INMEMORY);
+			i1.connectTo(t1, ChannelType.IN_MEMORY);
+			t1.connectTo(o1, ChannelType.IN_MEMORY);
 
 			// add jar
 			jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + runtimeExceptionClassName
@@ -492,8 +494,8 @@ public class JobManagerITCase {
 			// connect vertices
 			try {
 				i1.connectTo(t1, ChannelType.NETWORK);
-				t1.connectTo(t2, ChannelType.INMEMORY);
-				t2.connectTo(o1, ChannelType.INMEMORY);
+				t1.connectTo(t2, ChannelType.IN_MEMORY);
+				t2.connectTo(o1, ChannelType.IN_MEMORY);
 			} catch (JobGraphDefinitionException e) {
 				e.printStackTrace();
 			}
@@ -583,9 +585,9 @@ public class JobManagerITCase {
 			o1.setVertexToShareInstancesWith(i1);
 
 			// connect vertices
-			i1.connectTo(t1, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+			i1.connectTo(t1, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 			i1.connectTo(t1, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-			t1.connectTo(o1, ChannelType.INMEMORY);
+			t1.connectTo(o1, ChannelType.IN_MEMORY);
 
 			// add jar
 			jg.addJar(new Path(jarFile.toURI()));
@@ -657,7 +659,7 @@ public class JobManagerITCase {
 			o1.setVertexToShareInstancesWith(i1);
 
 			// connect vertices
-			i1.connectTo(o1, ChannelType.INMEMORY);
+			i1.connectTo(o1, ChannelType.IN_MEMORY);
 
 			// add jar
 			jg.addJar(new Path(jarFile.toURI()));
@@ -751,9 +753,9 @@ public class JobManagerITCase {
 			u1.setVertexToShareInstancesWith(o1);
 
 			// connect vertices
-			i1.connectTo(u1, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
-			i2.connectTo(u1, ChannelType.INMEMORY);
-			u1.connectTo(o1, ChannelType.INMEMORY);
+			i1.connectTo(u1, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
+			i2.connectTo(u1, ChannelType.IN_MEMORY);
+			u1.connectTo(o1, ChannelType.IN_MEMORY);
 
 			// add jar
 			jg.addJar(new Path(jarFile.toURI()));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
index 3b95133..124a24d 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
@@ -14,9 +14,9 @@
 package eu.stratosphere.nephele.jobmanager;
 
 import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.MutableRecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.io.UnionRecordReader;
+import eu.stratosphere.runtime.io.api.MutableRecordReader;
+import eu.stratosphere.runtime.io.api.RecordWriter;
+import eu.stratosphere.runtime.io.api.UnionRecordReader;
 import eu.stratosphere.nephele.template.AbstractTask;
 
 /**
@@ -41,13 +41,17 @@ public class UnionTask extends AbstractTask {
 		recordReaders[1] = new MutableRecordReader<StringRecord>(this);
 		this.unionReader = new UnionRecordReader<StringRecord>(recordReaders, StringRecord.class);
 		
-		this.writer = new RecordWriter<StringRecord>(this, StringRecord.class);
+		this.writer = new RecordWriter<StringRecord>(this);
 	}
 
 	@Override
 	public void invoke() throws Exception {
+		this.writer.initializeSerializers();
+
 		while (this.unionReader.hasNext()) {
 			this.writer.emit(this.unionReader.next());
 		}
+
+		this.writer.flush();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java
index 9fc4256..f1e3191 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.util.List;
 
+import eu.stratosphere.runtime.io.api.RecordWriter;
 import org.junit.Test;
 
 import eu.stratosphere.core.io.StringRecord;
@@ -29,9 +30,8 @@ import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
 import eu.stratosphere.nephele.executiongraph.GraphConversionException;
 import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
 import eu.stratosphere.nephele.jobgraph.JobInputVertex;
@@ -57,7 +57,7 @@ public class QueueSchedulerTest {
 		 */
 		@Override
 		public void registerInputOutput() {
-			new RecordWriter<StringRecord>(this, StringRecord.class);
+			new RecordWriter<StringRecord>(this);
 		}
 
 		/**
@@ -145,7 +145,7 @@ public class QueueSchedulerTest {
 		final TestDeploymentManager tdm = new TestDeploymentManager();
 		final QueueScheduler scheduler = new QueueScheduler(tdm, tim);
 
-		final ExecutionGraph executionGraph = createExecutionGraph(ChannelType.INMEMORY, tim);
+		final ExecutionGraph executionGraph = createExecutionGraph(ChannelType.IN_MEMORY, tim);
 
 		try {
 			try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java
index 4fd1ac1..630f365 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java
@@ -19,7 +19,7 @@ import java.util.Iterator;
 
 import org.junit.Test;
 
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.util.ManagementTestUtils;
 
@@ -275,7 +275,7 @@ public class ManagementGraphTest {
 		// Group Edges
 		new ManagementGroupEdge(groupVertex1, 0, groupVertex2, 0, ChannelType.NETWORK);
 		new ManagementGroupEdge(groupVertex2, 0, groupVertex3, 0, ChannelType.NETWORK);
-		new ManagementGroupEdge(groupVertex3, 0, groupVertex4, 0, ChannelType.INMEMORY);
+		new ManagementGroupEdge(groupVertex3, 0, groupVertex4, 0, ChannelType.IN_MEMORY);
 
 		// Edges
 		new ManagementEdge(new ManagementEdgeID(), new ManagementEdgeID(), outputGate1_1, 0, inputGate2_1, 0,
@@ -287,7 +287,7 @@ public class ManagementGraphTest {
 		new ManagementEdge(new ManagementEdgeID(), new ManagementEdgeID(), outputGate2_2, 0, inputGate3_1, 1,
 			ChannelType.NETWORK);
 		new ManagementEdge(new ManagementEdgeID(), new ManagementEdgeID(), outputGate3_1, 0, inputGate4_1, 0,
-			ChannelType.INMEMORY);
+			ChannelType.IN_MEMORY);
 
 		return graph;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/profiling/impl/InstanceProfilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/profiling/impl/InstanceProfilerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/profiling/impl/InstanceProfilerTest.java
index dda9491..f1b83a6 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/profiling/impl/InstanceProfilerTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/profiling/impl/InstanceProfilerTest.java
@@ -74,7 +74,7 @@ public class InstanceProfilerTest {
 	@Before
 	public void setUp() throws Exception {
 		initMocks(this);
-		when(this.infoMock.getAddress()).thenReturn(this.addressMock);
+		when(this.infoMock.address()).thenReturn(this.addressMock);
 		when(this.addressMock.getHostAddress()).thenReturn("192.168.1.1");
 
 		whenNew(FileReader.class).withArguments(InstanceProfiler.PROC_STAT).thenReturn(this.cpuReaderMock);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultDeserializerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultDeserializerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultDeserializerTest.java
deleted file mode 100644
index 1c9efd5..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultDeserializerTest.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.transferenvelope;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.util.ArrayDeque;
-import java.util.Queue;
-
-import org.junit.Test;
-
-import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.io.channels.BufferFactory;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.MemoryBuffer;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferAvailabilityListener;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProviderBroker;
-import eu.stratosphere.nephele.util.BufferPoolConnector;
-import eu.stratosphere.nephele.util.InterruptibleByteChannel;
-import eu.stratosphere.util.StringUtils;
-
-/**
- * This class contains tests covering the deserialization of a byte stream to a transfer envelope.
- * 
- */
-public class DefaultDeserializerTest {
-
-	/**
-	 * The size of the test byte buffers in byte.
-	 */
-	private static final int TEST_BUFFER_CAPACITY = 1024;
-
-	/**
-	 * The sequence number to be used during the tests.
-	 */
-	private static final int SEQUENCE_NUMBER = 0;
-
-	/**
-	 * The job ID to be used during the tests.
-	 */
-	private static final JobID JOB_ID = new JobID();
-
-	/**
-	 * The channel ID to be used during the tests.
-	 */
-	private static final ChannelID CHANNEL_ID = new ChannelID();
-
-	/**
-	 * A dummy implementation of a {@link BufferProvider} which is used in this test.
-	 * <p>
-	 * This class is not thread-safe.
-	 * 
-	 */
-	private static final class TestBufferProvider implements BufferProvider {
-
-		/**
-		 * Stores the available byte buffers.
-		 */
-		private final Queue<MemorySegment> bufferPool;
-
-		/**
-		 * Constructs a new test buffer provider.
-		 * 
-		 * @param numberOfBuffers
-		 *        the number of byte buffers this pool has available.
-		 */
-		private TestBufferProvider(final int numberOfBuffers) {
-
-			this.bufferPool = new ArrayDeque<MemorySegment>(numberOfBuffers);
-			for (int i = 0; i < numberOfBuffers; ++i) {
-				this.bufferPool.add(new MemorySegment(new byte[TEST_BUFFER_CAPACITY]));
-			}
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public Buffer requestEmptyBuffer(final int minimumSizeOfBuffer) throws IOException {
-
-			if (this.bufferPool.isEmpty()) {
-				return null;
-			}
-
-			return BufferFactory.createFromMemory(minimumSizeOfBuffer, this.bufferPool.poll(),
-				new BufferPoolConnector(this.bufferPool));
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public Buffer requestEmptyBufferBlocking(final int minimumSizeOfBuffer) throws IOException,
-				InterruptedException {
-
-			throw new IllegalStateException("requestEmptyBufferBlocking called");
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public int getMaximumBufferSize() {
-
-			throw new IllegalStateException("getMaximumBufferSize called");
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public boolean isShared() {
-
-			throw new IllegalStateException("isShared called");
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public void reportAsynchronousEvent() {
-
-			throw new IllegalStateException("reportAsynchronousEvent called");
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public boolean registerBufferAvailabilityListener(final BufferAvailabilityListener bufferAvailabilityListener) {
-
-			throw new IllegalStateException("registerBufferAvailabilityListener called");
-		}
-	}
-
-	/**
-	 * A dummy implementation of a {@link BufferProviderBroker} which is used during this test.
-	 * <p>
-	 * This class is not thread-safe.
-	 * 
-	 */
-	private static final class TestBufferProviderBroker implements BufferProviderBroker {
-
-		private final BufferProvider bufferProvider;
-
-		private TestBufferProviderBroker(final BufferProvider bufferProvider) {
-			this.bufferProvider = bufferProvider;
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public BufferProvider getBufferProvider(final JobID jobID, final ChannelID sourceChannelID) throws IOException,
-				InterruptedException {
-
-			return this.bufferProvider;
-		}
-	}
-
-	/**
-	 * Constructs an {@link InterruptibleByteChannel} from which the deserializer to be tested can read its data.
-	 * 
-	 * @param readInterruptPositions
-	 *        the positions after which the byte stream shall be interrupted
-	 * @param testBufferSize
-	 *        the size of the test buffer to create
-	 * @return an {@link InterruptibleByteChannel} holding the serialized data in memory
-	 * @throws IOException
-	 *         thrown if an error occurs while serializing the original data
-	 */
-	private ReadableByteChannel createByteChannel(final int[] readInterruptPositions, final int testBufferSize)
-			throws IOException {
-
-		final TransferEnvelope te = new TransferEnvelope(SEQUENCE_NUMBER, JOB_ID, CHANNEL_ID);
-
-		if (testBufferSize >= 0) {
-
-			if (testBufferSize > 100) {
-				throw new IllegalStateException("Test buffer size can be 100 bytes at most");
-			}
-
-			final Queue<MemorySegment> bufferPool = new ArrayDeque<MemorySegment>();
-			final MemorySegment ms = new MemorySegment(new byte[TEST_BUFFER_CAPACITY]);
-
-			final MemoryBuffer buffer = BufferFactory.createFromMemory(ms.size(), ms, new BufferPoolConnector(bufferPool));
-
-			final ByteBuffer srcBuffer = ByteBuffer.allocate(testBufferSize);
-			for (int i = 0; i < testBufferSize; ++i) {
-				srcBuffer.put((byte) i);
-			}
-			srcBuffer.flip();
-
-			buffer.write(srcBuffer);
-			buffer.flip();
-			te.setBuffer(buffer);
-		}
-
-		final DefaultSerializer ds = new DefaultSerializer();
-		ds.setTransferEnvelope(te);
-
-		final InterruptibleByteChannel ibc = new InterruptibleByteChannel(null, readInterruptPositions);
-
-		while (ds.write(ibc));
-
-		ibc.switchToReadPhase();
-
-		return ibc;
-	}
-
-	/**
-	 * Executes the deserialization method.
-	 * 
-	 * @param rbc
-	 *        the byte channel to read the serialized data from
-	 * @param bpb
-	 *        the buffer provider broker to request empty buffers from
-	 * @return the deserialized transfer envelope
-	 * @throws IOException
-	 *         thrown if an error occurs during the deserialization process
-	 * @throws NoBufferAvailableException
-	 *         thrown if the buffer provider broker could not provide an empty buffer
-	 */
-	private TransferEnvelope executeDeserialization(final ReadableByteChannel rbc, final BufferProviderBroker bpb)
-			throws IOException, NoBufferAvailableException {
-
-		final DefaultDeserializer dd = new DefaultDeserializer(bpb);
-
-		TransferEnvelope te = dd.getFullyDeserializedTransferEnvelope();
-		while (te == null) {
-
-			dd.read(rbc);
-			te = dd.getFullyDeserializedTransferEnvelope();
-		}
-
-		assertEquals(SEQUENCE_NUMBER, te.getSequenceNumber());
-		assertEquals(JOB_ID, te.getJobID());
-		assertEquals(CHANNEL_ID, te.getSource());
-
-		return te;
-	}
-
-	/**
-	 * Tests the deserialization process of a {@link TransferEnvelope} with a buffer when no interruption of the byte
-	 * stream.
-	 */
-	@Test
-	public void testDeserializationWithBufferAndWithoutInterruption() {
-
-		try {
-
-			final ReadableByteChannel rbc = createByteChannel(null, 10);
-
-			final TestBufferProviderBroker tbpb = new TestBufferProviderBroker(new TestBufferProvider(1));
-
-			final TransferEnvelope te = executeDeserialization(rbc, tbpb);
-
-			assertNotNull(te.getBuffer());
-			assertEquals(10, te.getBuffer().size());
-
-		} catch (IOException ioe) {
-			fail(StringUtils.stringifyException(ioe));
-		} catch (NoBufferAvailableException nbae) {
-			fail(StringUtils.stringifyException(nbae));
-		}
-	}
-
-	/**
-	 * Tests the deserialization process of a {@link TransferEnvelope} with a buffer and interruptions of the byte
-	 * stream.
-	 */
-	@Test
-	public void testDeserializationWithBufferAndInterruptions() {
-
-		try {
-
-			final ReadableByteChannel rbc = createByteChannel(new int[] { 3, 7, 24, 52 }, 10);
-
-			final TestBufferProviderBroker tbpb = new TestBufferProviderBroker(new TestBufferProvider(1));
-
-			final TransferEnvelope te = executeDeserialization(rbc, tbpb);
-
-			assertNotNull(te.getBuffer());
-			assertEquals(10, te.getBuffer().size());
-
-		} catch (IOException ioe) {
-			fail(StringUtils.stringifyException(ioe));
-		} catch (NoBufferAvailableException nbae) {
-			fail(StringUtils.stringifyException(nbae));
-		}
-	}
-
-	/**
-	 * Tests the deserialization process of a {@link TransferEnvelope} without a buffer and without interruptions of the
-	 * byte stream.
-	 */
-	@Test
-	public void testDeserializationWithoutBufferAndInterruptions() {
-
-		try {
-
-			final ReadableByteChannel rbc = createByteChannel(null, -1);
-
-			final TestBufferProviderBroker tbpb = new TestBufferProviderBroker(new TestBufferProvider(1));
-
-			final TransferEnvelope te = executeDeserialization(rbc, tbpb);
-
-			assertNull(te.getBuffer());
-
-		} catch (IOException ioe) {
-			fail(StringUtils.stringifyException(ioe));
-		} catch (NoBufferAvailableException nbae) {
-			fail(StringUtils.stringifyException(nbae));
-		}
-	}
-
-	/**
-	 * Tests the deserialization process in case the buffer provide cannot deliver an empty buffer to read the byte
-	 * stream into.
-	 */
-	@Test
-	public void testDeserializationWithNoBufferAvailable() {
-
-		try {
-			final ReadableByteChannel rbc = createByteChannel(null, 10);
-			final TestBufferProviderBroker tbpb = new TestBufferProviderBroker(new TestBufferProvider(0));
-			executeDeserialization(rbc, tbpb);
-
-		} catch (IOException ioe) {
-			fail(StringUtils.stringifyException(ioe));
-		} catch (NoBufferAvailableException nbae) {
-			// Expected exception was successfully caught
-			return;
-		}
-
-		fail("Expected NoBufferAvailableException but has not been thrown");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultSerializerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultSerializerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultSerializerTest.java
deleted file mode 100644
index a50fbe2..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultSerializerTest.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.transferenvelope;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.ArrayDeque;
-import java.util.Deque;
-
-import org.junit.Test;
-
-import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.nephele.io.AbstractID;
-import eu.stratosphere.nephele.io.channels.BufferFactory;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.MemoryBuffer;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.util.BufferPoolConnector;
-import eu.stratosphere.nephele.util.ServerTestUtils;
-
-/**
- * This class contains tests covering the serialization of transfer envelopes to a byte stream.
- * 
- */
-public class DefaultSerializerTest {
-
-	/**
-	 * The maximum size of the transfer envelope's buffer.
-	 */
-	private static final int BUFFER_SIZE = 4096; // 4 KB;
-
-	/**
-	 * An arbitrarily chosen byte used to fill the transfer envelope's buffer.
-	 */
-	private static final byte BUFFER_CONTENT = 13;
-
-	/**
-	 * The size of a sequence number.
-	 */
-	private static final int SIZE_OF_SEQ_NR = 4;
-
-	/**
-	 * The size of an ID.
-	 */
-	private static final int SIZE_OF_ID = 16;
-
-	/**
-	 * The size of an integer number.
-	 */
-	private static final int SIZE_OF_INTEGER = 4;
-
-	/**
-	 * The job ID used during the serialization process.
-	 */
-	private final JobID jobID = new JobID();
-
-	/**
-	 * The target channel ID used during the serialization process.
-	 */
-	private final ChannelID sourceChannelID = new ChannelID();
-
-	/**
-	 * Auxiliary class to explicitly access the internal buffer of an ID object.
-	 * 
-	 */
-	private static class SerializationTestID extends AbstractID {
-
-		/**
-		 * Constructs a new ID.
-		 * 
-		 * @param content
-		 *        a byte buffer representing the ID
-		 */
-		private SerializationTestID(byte[] content) {
-			super(content);
-		}
-	}
-
-	/**
-	 * This test checks the correctness of the serialization of {@link TransferEnvelope} objects.
-	 */
-	@Test
-	public void testSerialization() {
-
-		try {
-
-			// Generate test file
-			final File testFile = generateDataStream();
-
-			// Analyze the test file
-			analyzeStream(testFile);
-
-			// Delete the test file
-			testFile.delete();
-
-		} catch (IOException e) {
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Generates and serializes a series of {@link TransferEnvelope} objects to a random file.
-	 * 
-	 * @return the file containing the serializes envelopes
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while writing the envelopes
-	 */
-	private File generateDataStream() throws IOException {
-
-		final File outputFile = new File(ServerTestUtils.getTempDir() + File.separator
-			+ ServerTestUtils.getRandomFilename());
-		final FileOutputStream outputStream = new FileOutputStream(outputFile);
-		final FileChannel fileChannel = outputStream.getChannel();
-		final Deque<MemorySegment> recycleQueue = new ArrayDeque<MemorySegment>();
-		final DefaultSerializer serializer = new DefaultSerializer();
-		final MemorySegment byteBuffer = new MemorySegment(new byte[BUFFER_SIZE]);
-		final ByteBuffer initBuffer = ByteBuffer.allocate(1);
-		
-		// The byte buffer is initialized from this buffer
-		initBuffer.put(BUFFER_CONTENT);
-		initBuffer.flip();
-		
-		// Put byte buffer to recycled queue
-		recycleQueue.add(byteBuffer);
-
-		for (int i = 0; i < BUFFER_SIZE; i++) {
-
-			final MemoryBuffer buffer = BufferFactory.createFromMemory(i, recycleQueue.poll(), new BufferPoolConnector(
-				recycleQueue));
-
-			// Initialize buffer
-			for (int j = 0; j < i; j++) {
-				buffer.write(initBuffer);
-				initBuffer.position(0);
-			}
-			buffer.flip();
-
-			final TransferEnvelope transferEnvelope = new TransferEnvelope(i, this.jobID, this.sourceChannelID);
-			transferEnvelope.setBuffer(buffer);
-
-			// set envelope to be serialized and write it to file channel
-			serializer.setTransferEnvelope(transferEnvelope);
-			while (serializer.write(fileChannel));
-
-			// Put buffer back to the recycling queue
-			buffer.recycleBuffer();
-		}
-
-		fileChannel.close();
-		
-		return outputFile;
-	}
-
-	/**
-	 * Analyzes the given test file and checks whether its content matches Nephele's serialization pattern.
-	 * 
-	 * @param testFile
-	 *        the test file to analyze
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while reading the test file
-	 */
-	private void analyzeStream(File testFile) throws IOException {
-
-		FileInputStream fileInputStream = new FileInputStream(testFile);
-
-		for (int i = 0; i < BUFFER_SIZE; i++) {
-
-			readAndCheckSequenceNumber(fileInputStream, i);
-			readAndCheckID(fileInputStream, this.jobID);
-			readAndCheckID(fileInputStream, this.sourceChannelID);
-			readAndCheckNotificationList(fileInputStream);
-			readAndCheckBuffer(fileInputStream, i);
-		}
-
-		fileInputStream.close();
-	}
-
-	/**
-	 * Attempts to read a buffer of the given size from the file stream and checks the buffer's content.
-	 * 
-	 * @param fileInputStream
-	 *        the file stream to read from
-	 * @param expectedBufferSize
-	 *        the expected size of the buffer
-	 * @throws IOException
-	 *         thrown if an error occurs while reading from the file stream
-	 */
-	private static void readAndCheckBuffer(FileInputStream fileInputStream, int expectedBufferSize) throws IOException {
-
-		// Check if buffer exists
-		assertEquals(1L, fileInputStream.read());
-
-		byte[] temp = new byte[SIZE_OF_INTEGER];
-		fileInputStream.read(temp);
-		int bufferSize = bufferToInteger(temp);
-
-		assertEquals(expectedBufferSize, bufferSize);
-
-		byte[] buffer = new byte[bufferSize];
-		int r = fileInputStream.read(buffer);
-		for (int i = 0; i < buffer.length; i++) {
-			assertEquals(BUFFER_CONTENT, buffer[i]);
-		}
-	}
-
-	/**
-	 * Attempts to read an empty notification list from the given file input stream.
-	 * 
-	 * @param fileInputStream
-	 *        the file input stream to read from
-	 * @throws IOException
-	 *         thrown if an I/O occurs while reading data from the stream
-	 */
-	private void readAndCheckNotificationList(FileInputStream fileInputStream) throws IOException {
-
-		if (fileInputStream.read() != 0) {
-
-			byte[] temp = new byte[SIZE_OF_INTEGER];
-
-			fileInputStream.read(temp);
-			final int sizeOfDataBlock = bufferToInteger(temp);
-
-			assertEquals(SIZE_OF_INTEGER, sizeOfDataBlock);
-
-			fileInputStream.read(temp);
-			final int sizeOfNotificationList = bufferToInteger(temp);
-
-			assertEquals(0, sizeOfNotificationList);
-		}
-	}
-
-	/**
-	 * Attempts to read an integer number from the given file input stream and compares it to
-	 * <code>expectedSequenceNumber</code>.
-	 * 
-	 * @param fileInputStream
-	 *        the file input stream to read from
-	 * @param expectedSeqNumber
-	 *        the integer number the read number is expected to match
-	 * @throws IOException
-	 *         thrown if an I/O occurs while reading data from the stream
-	 */
-	private void readAndCheckSequenceNumber(FileInputStream fileInputStream, int expectedSeqNumber) throws IOException {
-
-		byte[] temp = new byte[SIZE_OF_SEQ_NR];
-		fileInputStream.read(temp);
-		int seqNumber = bufferToInteger(temp);
-
-		assertEquals(seqNumber, expectedSeqNumber);
-	}
-
-	/**
-	 * Attempts to read a channel ID from the given file input stream and compares it to <code>expectedChannelID</code>.
-	 * 
-	 * @param fileInputStream
-	 *        the file input stream to read from
-	 * @param expectedID
-	 *        the ID which the read ID is expected to match
-	 * @throws IOException
-	 *         thrown if an I/O occurs while reading data from the stream
-	 */
-	private void readAndCheckID(FileInputStream fileInputStream, AbstractID expectedID) throws IOException {
-
-		byte[] temp = new byte[SIZE_OF_INTEGER];
-		fileInputStream.read(temp);
-
-		final int sizeOfID = bufferToInteger(temp); // ID has fixed size and therefore does not announce its size
-
-		assertEquals(sizeOfID, SIZE_OF_ID);
-
-		byte[] id = new byte[sizeOfID];
-		fileInputStream.read(id);
-
-		final AbstractID channelID = new SerializationTestID(id);
-		assertEquals(expectedID, channelID);
-	}
-
-	/**
-	 * Converts the first four bytes of the provided buffer's content to an integer number.
-	 * 
-	 * @param buffer
-	 *        the buffer to convert
-	 * @return the integer number converted from the first four bytes of the buffer's content
-	 */
-	private static int bufferToInteger(byte[] buffer) {
-
-		int integer = 0;
-
-		for (int i = 0; i < SIZE_OF_INTEGER; ++i) {
-			integer |= (buffer[SIZE_OF_INTEGER - 1 - i] & 0xff) << (i << 3);
-		}
-
-		return integer;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/BufferPoolConnector.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/BufferPoolConnector.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/BufferPoolConnector.java
deleted file mode 100644
index 3f2c79e..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/BufferPoolConnector.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.util;
-
-import java.util.Queue;
-
-import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.nephele.io.channels.MemoryBufferPoolConnector;
-
-/**
- * This is a simple implementation of a {@link MemoryBufferPoolConnector} used for the server unit tests.
- * <p>
- * This class is thread-safe.
- * 
- */
-public final class BufferPoolConnector implements MemoryBufferPoolConnector {
-
-	/**
-	 * Reference to the memory pool the byte buffer was originally taken from.
-	 */
-	private final Queue<MemorySegment> memoryPool;
-
-	/**
-	 * Constructs a new buffer pool connector
-	 * 
-	 * @param bufferPool
-	 *        a reference to the memory pool the byte buffer was originally taken from
-	 */
-	public BufferPoolConnector(final Queue<MemorySegment> bufferPool) {
-		this.memoryPool = bufferPool;
-	}
-
-
-	@Override
-	public void recycle(final MemorySegment memSeg) {
-
-		synchronized (this.memoryPool) {
-			this.memoryPool.add(memSeg);
-			this.memoryPool.notify();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/DiscardingRecycler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/DiscardingRecycler.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/DiscardingRecycler.java
new file mode 100644
index 0000000..9d4d2a5
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/DiscardingRecycler.java
@@ -0,0 +1,24 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.nephele.util;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.runtime.io.BufferRecycler;
+
+public class DiscardingRecycler implements BufferRecycler {
+
+	@Override
+	public void recycle(MemorySegment memSeg) {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java
new file mode 100644
index 0000000..fcb4fa1
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java
@@ -0,0 +1,80 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.util;
+
+import java.util.Iterator;
+
+import eu.stratosphere.core.fs.FSDataInputStream;
+import eu.stratosphere.core.fs.FileInputSplit;
+import eu.stratosphere.core.fs.FileSystem;
+import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.runtime.io.api.RecordWriter;
+import eu.stratosphere.nephele.template.AbstractFileInputTask;
+import eu.stratosphere.runtime.fs.LineReader;
+
+/**
+ * A file line reader reads the associated file input splits line by line and outputs the lines as string records.
+ * 
+ */
+public class FileLineReader extends AbstractFileInputTask {
+
+	private RecordWriter<StringRecord> output = null;
+
+	@Override
+	public void invoke() throws Exception {
+
+		output.initializeSerializers();
+
+		final Iterator<FileInputSplit> splitIterator = getFileInputSplits();
+
+		while (splitIterator.hasNext()) {
+
+			final FileInputSplit split = splitIterator.next();
+
+			long start = split.getStart();
+			long length = split.getLength();
+
+			final FileSystem fs = FileSystem.get(split.getPath().toUri());
+
+			final FSDataInputStream fdis = fs.open(split.getPath());
+
+			final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
+
+			byte[] line = lineReader.readLine();
+
+			while (line != null) {
+
+				// Create a string object from the data read
+				StringRecord str = new StringRecord();
+				str.set(line);
+
+				// Send out string
+				output.emit(str);
+
+				line = lineReader.readLine();
+			}
+
+			// Close the stream;
+			lineReader.close();
+		}
+
+		this.output.flush();
+	}
+
+	@Override
+	public void registerInputOutput() {
+		output = new RecordWriter<StringRecord>(this);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java
new file mode 100644
index 0000000..bc738df
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java
@@ -0,0 +1,75 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.util;
+
+import eu.stratosphere.core.fs.FSDataOutputStream;
+import eu.stratosphere.core.fs.FileStatus;
+import eu.stratosphere.core.fs.FileSystem;
+import eu.stratosphere.core.fs.Path;
+import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.nephele.template.AbstractFileOutputTask;
+
+/**
+ * A file line writer reads string records its input gate and writes them to the associated output file.
+ * 
+ */
+public class FileLineWriter extends AbstractFileOutputTask {
+
+	/**
+	 * The record reader through which incoming string records are received.
+	 */
+	private RecordReader<StringRecord> input = null;
+
+
+	@Override
+	public void invoke() throws Exception {
+
+		Path outputPath = getFileOutputPath();
+
+		FileSystem fs = FileSystem.get(outputPath.toUri());
+		if (fs.exists(outputPath)) {
+			FileStatus status = fs.getFileStatus(outputPath);
+
+			if (status.isDir()) {
+				outputPath = new Path(outputPath.toUri().toString() + "/file_" + getIndexInSubtaskGroup() + ".txt");
+			}
+		}
+
+		final FSDataOutputStream outputStream = fs.create(outputPath, true);
+
+		while (this.input.hasNext()) {
+
+			StringRecord record = this.input.next();
+			byte[] recordByte = (record.toString() + "\r\n").getBytes();
+			outputStream.write(recordByte, 0, recordByte.length);
+		}
+
+		outputStream.close();
+
+	}
+
+
+	@Override
+	public void registerInputOutput() {
+		this.input = new RecordReader<StringRecord>(this, StringRecord.class);
+	}
+
+
+	@Override
+	public int getMaximumNumberOfSubtasks() {
+		// The default implementation always returns -1
+		return -1;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java
new file mode 100644
index 0000000..09b244f
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java
@@ -0,0 +1,76 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.nephele.util;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.BufferRecycler;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+
+import java.io.IOException;
+import java.util.Random;
+
+public class TestBufferProvider implements BufferProvider {
+	
+	private final BufferRecycler recycler = new DiscardingRecycler();
+	
+	private final Random rnd = new Random();
+	
+	private final int sizeOfMemorySegments;
+	
+	private final float probabilityForNoneAvailable;
+	
+	
+	public TestBufferProvider(int sizeOfMemorySegments) {
+		this(sizeOfMemorySegments, -1.0f);
+	}
+	
+	public TestBufferProvider(int sizeOfMemorySegments, float probabilityForNoneAvailable) {
+		this.sizeOfMemorySegments = sizeOfMemorySegments;
+		this.probabilityForNoneAvailable = probabilityForNoneAvailable;
+	}
+
+	@Override
+	public Buffer requestBuffer(int sizeOfBuffer) throws IOException {
+		if (rnd.nextFloat() < this.probabilityForNoneAvailable) {
+			return null;
+		} else {
+			MemorySegment segment = new MemorySegment(new byte[this.sizeOfMemorySegments]);
+			return new Buffer(segment, sizeOfBuffer, this.recycler);
+		}
+	}
+
+	@Override
+	public Buffer requestBufferBlocking(int sizeOfBuffer) throws IOException, InterruptedException {
+		MemorySegment segment = new MemorySegment(new byte[this.sizeOfMemorySegments]);
+		return new Buffer(segment, sizeOfBuffer, this.recycler);
+	}
+
+	@Override
+	public int getBufferSize() {
+		return Integer.MAX_VALUE;
+	}
+	
+	@Override
+	public void reportAsynchronousEvent() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean registerBufferAvailabilityListener(BufferAvailabilityListener bufferAvailabilityListener) {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java
index 6c66b78..bfd0d42 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java
@@ -42,8 +42,12 @@ import eu.stratosphere.types.IntValue;
 import eu.stratosphere.types.Key;
 import eu.stratosphere.types.Record;
 
-public class DataSinkTaskTest extends TaskTestBase
-{
+public class DataSinkTaskTest extends TaskTestBase {
+
+	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024;
+
+	private static final int NETWORK_BUFFER_SIZE = 1024;
+
 	private static final Log LOG = LogFactory.getLog(DataSinkTaskTest.class);
 	
 	private final String tempTestPath = Path.constructTestPath("dst_test");
@@ -61,8 +65,8 @@ public class DataSinkTaskTest extends TaskTestBase
 
 		int keyCnt = 100;
 		int valCnt = 20;
-		
-		super.initEnvironment(1024 * 1024);
+
+		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
 		
 		DataSinkTask<Record> testTask = new DataSinkTask<Record>();
@@ -127,8 +131,8 @@ public class DataSinkTaskTest extends TaskTestBase
 
 		int keyCnt = 100;
 		int valCnt = 20;
-		
-		super.initEnvironment(1024 * 1024);
+
+		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, 0, 0, false), 0);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt, 0, false), 0);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt*2, 0, false), 0);
@@ -197,8 +201,8 @@ public class DataSinkTaskTest extends TaskTestBase
 
 		int keyCnt = 100;
 		int valCnt = 20;
-		
-		super.initEnvironment(1024 * 1024 * 4);
+
+		super.initEnvironment(MEMORY_MANAGER_SIZE * 4, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
 		
 		DataSinkTask<Record> testTask = new DataSinkTask<Record>();
@@ -275,8 +279,8 @@ public class DataSinkTaskTest extends TaskTestBase
 
 		int keyCnt = 100;
 		int valCnt = 20;
-		
-		super.initEnvironment(1024 * 1024);
+
+		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
 
 		DataSinkTask<Record> testTask = new DataSinkTask<Record>();
@@ -307,8 +311,8 @@ public class DataSinkTaskTest extends TaskTestBase
 
 		int keyCnt = 100;
 		int valCnt = 20;
-		
-		super.initEnvironment(4 * 1024 * 1024);
+
+		super.initEnvironment(MEMORY_MANAGER_SIZE * 4, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
 
 		DataSinkTask<Record> testTask = new DataSinkTask<Record>();
@@ -343,8 +347,8 @@ public class DataSinkTaskTest extends TaskTestBase
 	
 	@Test
 	public void testCancelDataSinkTask() {
-		
-		super.initEnvironment(1024 * 1024);
+
+		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new InfiniteInputIterator(), 0);
 		
 		final DataSinkTask<Record> testTask = new DataSinkTask<Record>();
@@ -385,8 +389,8 @@ public class DataSinkTaskTest extends TaskTestBase
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testCancelSortingDataSinkTask() {
-		
-		super.initEnvironment(4 * 1024 * 1024);
+
+		super.initEnvironment(MEMORY_MANAGER_SIZE * 4, NETWORK_BUFFER_SIZE);
 		super.addInput(new InfiniteInputIterator(), 0);
 		
 		final DataSinkTask<Record> testTask = new DataSinkTask<Record>();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSourceTaskTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSourceTaskTest.java
index 0198db2..732cf8d 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSourceTaskTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSourceTaskTest.java
@@ -37,8 +37,12 @@ import eu.stratosphere.types.IntValue;
 import eu.stratosphere.types.Record;
 import eu.stratosphere.util.MutableObjectIterator;
 
-public class DataSourceTaskTest extends TaskTestBase
-{
+public class DataSourceTaskTest extends TaskTestBase {
+
+	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024;
+
+	private static final int NETWORK_BUFFER_SIZE = 1024;
+
 	private List<Record> outList;
 	
 	private String tempTestPath = Path.constructTestPath("dst_test");
@@ -50,11 +54,9 @@ public class DataSourceTaskTest extends TaskTestBase
 			tempTestFile.delete();
 		}
 	}
-
 	
 	@Test
 	public void testDataSourceTask() {
-
 		int keyCnt = 100;
 		int valCnt = 20;
 		
@@ -67,7 +69,7 @@ public class DataSourceTaskTest extends TaskTestBase
 			Assert.fail("Unable to set-up test input file");
 		}
 		
-		super.initEnvironment(1024 * 1024);
+		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addOutput(this.outList);
 		
 		DataSourceTask<Record> testTask = new DataSourceTask<Record>();
@@ -110,7 +112,6 @@ public class DataSourceTaskTest extends TaskTestBase
 	
 	@Test
 	public void testFailingDataSourceTask() {
-
 		int keyCnt = 20;
 		int valCnt = 10;
 		
@@ -122,8 +123,8 @@ public class DataSourceTaskTest extends TaskTestBase
 		} catch (IOException e1) {
 			Assert.fail("Unable to set-up test input file");
 		}
-		
-		super.initEnvironment(1024 * 1024);
+
+		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addOutput(this.outList);
 		
 		DataSourceTask<Record> testTask = new DataSourceTask<Record>();
@@ -148,11 +149,10 @@ public class DataSourceTaskTest extends TaskTestBase
 	
 	@Test
 	public void testCancelDataSourceTask() {
-		
 		int keyCnt = 20;
 		int valCnt = 4;
-		
-		super.initEnvironment(1024 * 1024);
+
+		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addOutput(new NirvanaOutputList());
 		
 		try {
@@ -184,7 +184,7 @@ public class DataSourceTaskTest extends TaskTestBase
 		
 		try {
 			tct.join();
-			taskRunner.join();		
+			taskRunner.join();
 		} catch(InterruptedException ie) {
 			Assert.fail("Joining threads failed");
 		}
@@ -192,12 +192,10 @@ public class DataSourceTaskTest extends TaskTestBase
 		// assert that temp file was created
 		File tempTestFile = new File(this.tempTestPath);
 		Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
-				
 	}
 
 	
-	private static class InputFilePreparator
-	{
+	private static class InputFilePreparator {
 		public static void prepareInputFile(MutableObjectIterator<Record> inIt, String inputFilePath, boolean insertInvalidData)
 		throws IOException
 		{

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java
index 100bf7b..dda215e 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java
@@ -43,7 +43,11 @@ import eu.stratosphere.util.LogUtils;
 
 
 public class ChainTaskTest extends TaskTestBase {
-	
+
+	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;
+
+	private static final int NETWORK_BUFFER_SIZE = 1024;
+
 	private final List<Record> outList = new ArrayList<Record>();
 	
 	@SuppressWarnings("unchecked")
@@ -65,9 +69,8 @@ public class ChainTaskTest extends TaskTestBase {
 		final int valCnt = 20;
 		
 		try {
-		
 			// environment
-			initEnvironment(3*1024*1024);
+			super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 			addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
 			addOutput(this.outList);
 			
@@ -123,7 +126,7 @@ public class ChainTaskTest extends TaskTestBase {
 		
 		try {
 			// environment
-			initEnvironment(3*1024*1024);
+			super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 			addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
 			addOutput(this.outList);
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java
index cc00387..c66d821 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.junit.Test;
 
 import eu.stratosphere.api.common.typeutils.TypeComparator;
+import eu.stratosphere.runtime.io.api.ChannelSelector;
 import eu.stratosphere.api.common.typeutils.base.IntSerializer;
 import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparatorFactory;
 import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java
index 8afc78f..0b968d8 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java
@@ -27,6 +27,8 @@ import org.junit.Test;
 
 import eu.stratosphere.api.common.distributions.DataDistribution;
 import eu.stratosphere.api.common.distributions.UniformIntegerDistribution;
+import eu.stratosphere.runtime.io.api.ChannelSelector;
+import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordComparator;
 import eu.stratosphere.nephele.io.ChannelSelector;
 import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
 import eu.stratosphere.pact.runtime.shipping.RecordOutputEmitter;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
index 8e25082..a397312 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
@@ -13,38 +13,40 @@
 
 package eu.stratosphere.pact.runtime.test.util;
 
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.FutureTask;
-
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.fs.Path;
 import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.core.memory.MemorySegment;
 import eu.stratosphere.nephele.execution.Environment;
-import eu.stratosphere.nephele.io.ChannelSelector;
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.io.InputChannelResult;
-import eu.stratosphere.nephele.io.InputGate;
-import eu.stratosphere.nephele.io.MutableRecordDeserializerFactory;
-import eu.stratosphere.nephele.io.OutputGate;
-import eu.stratosphere.nephele.io.RecordAvailabilityListener;
-import eu.stratosphere.nephele.io.RecordDeserializerFactory;
-import eu.stratosphere.nephele.io.RuntimeInputGate;
-import eu.stratosphere.nephele.io.RuntimeOutputGate;
-import eu.stratosphere.nephele.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.gates.InputChannelResult;
+import eu.stratosphere.runtime.io.gates.RecordAvailabilityListener;
+import eu.stratosphere.runtime.io.serialization.AdaptiveSpanningRecordDeserializer;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.gates.GateID;
+import eu.stratosphere.runtime.io.gates.InputGate;
+import eu.stratosphere.runtime.io.gates.OutputGate;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.protocols.AccumulatorProtocol;
 import eu.stratosphere.nephele.services.iomanager.IOManager;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
 import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
+import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
 import eu.stratosphere.nephele.template.InputSplitProvider;
+import eu.stratosphere.runtime.io.serialization.RecordDeserializer;
+import eu.stratosphere.runtime.io.serialization.RecordDeserializer.DeserializationResult;
 import eu.stratosphere.types.Record;
 import eu.stratosphere.util.MutableObjectIterator;
 
-public class MockEnvironment implements Environment {
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+public class MockEnvironment implements Environment, BufferProvider, LocalBufferPoolOwner {
 	
 	private final MemoryManager memManager;
 
@@ -56,21 +58,24 @@ public class MockEnvironment implements Environment {
 
 	private final Configuration taskConfiguration;
 
-	private final List<RuntimeInputGate<Record>> inputs;
+	private final List<InputGate<Record>> inputs;
 
-	private final List<RuntimeOutputGate<Record>> outputs;
+	private final List<OutputGate> outputs;
 
 	private final JobID jobID = new JobID();
 
-	public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider) {
+	private final Buffer mockBuffer;
+
+	public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
 		this.jobConfiguration = new Configuration();
 		this.taskConfiguration = new Configuration();
-		this.inputs = new LinkedList<RuntimeInputGate<Record>>();
-		this.outputs = new LinkedList<RuntimeOutputGate<Record>>();
+		this.inputs = new LinkedList<InputGate<Record>>();
+		this.outputs = new LinkedList<OutputGate>();
 
 		this.memManager = new DefaultMemoryManager(memorySize);
 		this.ioManager = new IOManager(System.getProperty("java.io.tmpdir"));
 		this.inputSplitProvider = inputSplitProvider;
+		this.mockBuffer = new Buffer(new MemorySegment(new byte[bufferSize]), bufferSize, null);
 	}
 
 	public void addInput(MutableObjectIterator<Record> inputIterator) {
@@ -103,13 +108,62 @@ public class MockEnvironment implements Environment {
 		return this.jobID;
 	}
 
+	@Override
+	public Buffer requestBuffer(int minBufferSize) throws IOException {
+		return mockBuffer;
+	}
+
+	@Override
+	public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException {
+		return mockBuffer;
+	}
+
+	@Override
+	public int getBufferSize() {
+		return this.mockBuffer.size();
+	}
+
+	@Override
+	public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+		return false;
+	}
+
+	@Override
+	public int getNumberOfChannels() {
+		return 1;
+	}
+
+	@Override
+	public void setDesignatedNumberOfBuffers(int numBuffers) {
+
+	}
+
+	@Override
+	public void clearLocalBufferPool() {
+
+	}
+
+	@Override
+	public void registerGlobalBufferPool(GlobalBufferPool globalBufferPool) {
+
+	}
+
+	@Override
+	public void logBufferUtilization() {
+
+	}
+
+	@Override
+	public void reportAsynchronousEvent() {
 
-	private static class MockInputGate extends RuntimeInputGate<Record> {
+	}
+
+	private static class MockInputGate extends InputGate<Record> {
 		
 		private MutableObjectIterator<Record> it;
 
 		public MockInputGate(int id, MutableObjectIterator<Record> it) {
-			super(new JobID(), new GateID(), MutableRecordDeserializerFactory.<Record>get(), id);
+			super(new JobID(), new GateID(), id);
 			this.it = it;
 		}
 
@@ -132,18 +186,43 @@ public class MockEnvironment implements Environment {
 		}
 	}
 
-	private static class MockOutputGate extends RuntimeOutputGate<Record> {
+	private class MockOutputGate extends OutputGate {
 		
 		private List<Record> out;
 
+		private RecordDeserializer<Record> deserializer;
+
+		private Record record;
+
 		public MockOutputGate(int index, List<Record> outList) {
-			super(new JobID(), new GateID(), Record.class, index, null, false);
+			super(new JobID(), new GateID(), index);
 			this.out = outList;
+			this.deserializer = new AdaptiveSpanningRecordDeserializer<Record>();
+			this.record = new Record();
 		}
 
 		@Override
-		public void writeRecord(Record record) throws IOException, InterruptedException {
-			out.add(record.createCopy());
+		public void sendBuffer(Buffer buffer, int targetChannel) throws IOException, InterruptedException {
+
+			this.deserializer.setNextMemorySegment(MockEnvironment.this.mockBuffer.getMemorySegment(), MockEnvironment.this.mockBuffer.size());
+
+			while (this.deserializer.hasUnfinishedData()) {
+				DeserializationResult result = this.deserializer.getNextRecord(this.record);
+
+				if (result.isFullRecord()) {
+					this.out.add(this.record.createCopy());
+				}
+
+				if (result == DeserializationResult.LAST_RECORD_FROM_BUFFER ||
+					result == DeserializationResult.PARTIAL_RECORD) {
+					break;
+				}
+			}
+		}
+
+		@Override
+		public int getNumChannels() {
+			return 1;
 		}
 	}
 
@@ -188,11 +267,6 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
-	public GateID getNextUnboundOutputGateID() {
-		return null;
-	}
-
-	@Override
 	public int getNumberOfOutputGates() {
 		return this.outputs.size();
 	}
@@ -203,16 +277,6 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
-	public void registerOutputGate(final OutputGate<? extends IOReadableWritable> outputGate) {
-		// Nothing to do here
-	}
-
-	@Override
-	public void registerInputGate(final InputGate<? extends IOReadableWritable> inputGate) {
-		// Nothing to do here
-	}
-
-	@Override
 	public Set<ChannelID> getOutputChannelIDs() {
 		throw new IllegalStateException("getOutputChannelIDs called on MockEnvironment");
 	}
@@ -242,18 +306,14 @@ public class MockEnvironment implements Environment {
 		throw new IllegalStateException("getInputChannelIDsOfGate called on MockEnvironment");
 	}
 
-	@SuppressWarnings("unchecked")
 	@Override
-	public <T extends IOReadableWritable> OutputGate<T> createOutputGate(GateID gateID, Class<T> outputClass,
-			ChannelSelector<T> selector, boolean isBroadcast)
+	public OutputGate createAndRegisterOutputGate()
 	{
-		return (OutputGate<T>) this.outputs.remove(0);
+		return this.outputs.remove(0);
 	}
 
-	@SuppressWarnings("unchecked")
 	@Override
-	public <T extends IOReadableWritable> InputGate<T> createInputGate(GateID gateID,
-			RecordDeserializerFactory<T> deserializerFactory)
+	public <T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate()
 	{
 		return (InputGate<T>) this.inputs.remove(0);
 	}
@@ -275,8 +335,7 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
-	public 	Map<String, FutureTask<Path>> getCopyTask() {
-		return null;
+	public BufferProvider getOutputBufferProvider() {
+		return this;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java
index 826113c..a60b479 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java
@@ -49,10 +49,10 @@ public abstract class TaskTestBase {
 
 	protected MockEnvironment mockEnv;
 
-	public void initEnvironment(long memorySize) {
+	public void initEnvironment(long memorySize, int bufferSize) {
 		this.memorySize = memorySize;
 		this.inputSplitProvider = new MockInputSplitProvider();
-		this.mockEnv = new MockEnvironment(this.memorySize, this.inputSplitProvider);
+		this.mockEnv = new MockEnvironment(this.memorySize, this.inputSplitProvider, bufferSize);
 	}
 
 	public void addInput(MutableObjectIterator<Record> input, int groupId) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/LineReaderTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/LineReaderTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/LineReaderTest.java
new file mode 100644
index 0000000..af46689
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/LineReaderTest.java
@@ -0,0 +1,78 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.fs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.PrintWriter;
+
+import org.junit.Test;
+
+import eu.stratosphere.core.fs.FSDataInputStream;
+import eu.stratosphere.core.fs.Path;
+import eu.stratosphere.core.fs.local.LocalFileSystem;
+import eu.stratosphere.nephele.util.CommonTestUtils;
+
+/**
+ * This class tests the functionality of the LineReader class using a local filesystem.
+ * 
+ */
+
+public class LineReaderTest {
+
+	/**
+	 * This test tests the LineReader. So far only under usual conditions.
+	 */
+	@Test
+	public void testLineReader() {
+		final File testfile = new File(CommonTestUtils.getTempDir() + File.separator
+			+ CommonTestUtils.getRandomFilename());
+		final Path pathtotestfile = new Path(testfile.toURI().getPath());
+
+		try {
+			PrintWriter pw = new PrintWriter(testfile, "UTF8");
+
+			for (int i = 0; i < 100; i++) {
+				pw.append("line\n");
+			}
+			pw.close();
+
+			LocalFileSystem lfs = new LocalFileSystem();
+			FSDataInputStream fis = lfs.open(pathtotestfile);
+
+			// first, we test under "usual" conditions
+			final LineReader lr = new LineReader(fis, 0, testfile.length(), 256);
+
+			byte[] buffer;
+			int linecount = 0;
+			while ((buffer = lr.readLine()) != null) {
+				assertEquals(new String(buffer, "UTF8"), "line");
+				linecount++;
+			}
+			assertEquals(linecount, 100);
+
+			// the linereader can not handle situations with larger length than the total file...
+
+		} catch (Exception e) {
+			fail(e.toString());
+			e.printStackTrace();
+		} finally {
+			testfile.delete();
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/s3/S3FileSystemTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/s3/S3FileSystemTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/s3/S3FileSystemTest.java
new file mode 100644
index 0000000..30b5219
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/s3/S3FileSystemTest.java
@@ -0,0 +1,460 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.fs.s3;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.configuration.GlobalConfiguration;
+import eu.stratosphere.core.fs.BlockLocation;
+import eu.stratosphere.core.fs.FSDataInputStream;
+import eu.stratosphere.core.fs.FSDataOutputStream;
+import eu.stratosphere.core.fs.FileStatus;
+import eu.stratosphere.core.fs.FileSystem;
+import eu.stratosphere.core.fs.Path;
+
+/**
+ * This test checks the S3 implementation of the {@link FileSystem} interface.
+ * 
+ */
+public class S3FileSystemTest {
+
+	/**
+	 * The length of the bucket/object names used in this test.
+	 */
+	private static final int NAME_LENGTH = 32;
+
+	/**
+	 * The alphabet to generate the random bucket/object names from.
+	 */
+	private static final char[] ALPHABET = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o',
+		'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9' };
+
+	/**
+	 * The size of the byte buffer used during the tests in bytes.
+	 */
+	private static final int TEST_BUFFER_SIZE = 128;
+
+	/**
+	 * The size of the small test file in bytes.
+	 */
+	private static final int SMALL_FILE_SIZE = 512;
+
+	/**
+	 * The size of the large test file in bytes.
+	 */
+	private static final int LARGE_FILE_SIZE = 1024 * 1024 * 12; // 12 MB
+
+	/**
+	 * The modulus to be used when generating the test data. Must not be larger than 128.
+	 */
+	private static final int MODULUS = 128;
+
+	private static final String S3_BASE_URI = "s3:///";
+
+	/**
+	 * Tries to read the AWS access key and the AWS secret key from the environments variables. If accessing these keys
+	 * fails, all tests will be skipped and marked as successful.
+	 */
+	@Before
+	public void initKeys() {
+		final String accessKey = System.getenv("AK");
+		final String secretKey = System.getenv("SK");
+		
+		if (accessKey != null || secretKey != null) {
+			Configuration conf = new Configuration();
+			if (accessKey != null) {
+				conf.setString(S3FileSystem.S3_ACCESS_KEY_KEY, accessKey);
+			}
+			if (secretKey != null) {
+				conf.setString(S3FileSystem.S3_SECRET_KEY_KEY, secretKey);
+			}
+			GlobalConfiguration.includeConfiguration(conf);
+		}
+	}
+
+	/**
+	 * This test creates and deletes a bucket inside S3 and checks it is correctly displayed inside the directory
+	 * listing.
+	 */
+	@Test
+	public void createAndDeleteBucketTest() {
+
+		if (!testActivated()) {
+			return;
+		}
+
+		final String bucketName = getRandomName();
+		final Path bucketPath = new Path(S3_BASE_URI + bucketName + Path.SEPARATOR);
+
+		try {
+
+			final FileSystem fs = bucketPath.getFileSystem();
+
+			// Create directory
+			fs.mkdirs(bucketPath);
+
+			// Check if directory is correctly displayed in file system hierarchy
+			final FileStatus[] content = fs.listStatus(new Path(S3_BASE_URI));
+			boolean entryFound = false;
+			for (final FileStatus entry : content) {
+				if (bucketPath.equals(entry.getPath())) {
+					entryFound = true;
+					break;
+				}
+			}
+
+			if (!entryFound) {
+				fail("Cannot find entry " + bucketName + " in directory " + S3_BASE_URI);
+			}
+
+			// Check the concrete directory file status
+			try {
+				final FileStatus directoryFileStatus = fs.getFileStatus(bucketPath);
+				assertTrue(directoryFileStatus.isDir());
+				assertEquals(0L, directoryFileStatus.getAccessTime());
+				assertTrue(directoryFileStatus.getModificationTime() > 0L);
+
+			} catch (FileNotFoundException e) {
+				fail(e.getMessage());
+			}
+
+			// Delete the bucket
+			fs.delete(bucketPath, true);
+
+			// Make sure the bucket no longer exists
+			try {
+				fs.getFileStatus(bucketPath);
+				fail("Expected FileNotFoundException for " + bucketPath.toUri());
+			} catch (FileNotFoundException e) {
+				// This is an expected exception
+			}
+
+		} catch (IOException ioe) {
+			fail(ioe.getMessage());
+		}
+	}
+
+	/**
+	 * Creates and reads the a larger test file in S3. The test file is generated according to a specific pattern.
+	 * During the read phase the incoming data stream is also checked against this pattern.
+	 */
+	@Test
+	public void createAndReadLargeFileTest() {
+
+		try {
+			createAndReadFileTest(LARGE_FILE_SIZE);
+		} catch (IOException ioe) {
+			fail(ioe.getMessage());
+		}
+	}
+
+	/**
+	 * Creates and reads the a small test file in S3. The test file is generated according to a specific pattern.
+	 * During the read phase the incoming data stream is also checked against this pattern.
+	 */
+	@Test
+	public void createAndReadSmallFileTest() {
+
+		try {
+			createAndReadFileTest(SMALL_FILE_SIZE);
+		} catch (IOException ioe) {
+			fail(ioe.getMessage());
+		}
+	}
+
+	/**
+	 * The tests checks the mapping of the file system directory structure to the underlying bucket/object model of
+	 * Amazon S3.
+	 */
+	@Test
+	public void multiLevelDirectoryTest() {
+
+		if (!testActivated()) {
+			return;
+		}
+
+		final String dirName = getRandomName();
+		final String subdirName = getRandomName();
+		final String subsubdirName = getRandomName();
+		final String fileName = getRandomName();
+		final Path dir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR);
+		final Path subdir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + subdirName + Path.SEPARATOR);
+		final Path subsubdir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + subdirName + Path.SEPARATOR
+			+ subsubdirName + Path.SEPARATOR);
+		final Path file = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + subdirName + Path.SEPARATOR + fileName);
+
+		try {
+
+			final FileSystem fs = dir.getFileSystem();
+
+			fs.mkdirs(subsubdir);
+
+			final OutputStream os = fs.create(file, true);
+			generateTestData(os, SMALL_FILE_SIZE);
+			os.close();
+
+			// On this directory levels there should only be one subdirectory
+			FileStatus[] list = fs.listStatus(dir);
+			int numberOfDirs = 0;
+			int numberOfFiles = 0;
+			for (final FileStatus entry : list) {
+
+				if (entry.isDir()) {
+					++numberOfDirs;
+					assertEquals(subdir, entry.getPath());
+				} else {
+					fail(entry.getPath() + " is a file which must not appear on this directory level");
+				}
+			}
+
+			assertEquals(1, numberOfDirs);
+			assertEquals(0, numberOfFiles);
+
+			list = fs.listStatus(subdir);
+			numberOfDirs = 0;
+
+			for (final FileStatus entry : list) {
+				if (entry.isDir()) {
+					assertEquals(subsubdir, entry.getPath());
+					++numberOfDirs;
+				} else {
+					assertEquals(file, entry.getPath());
+					++numberOfFiles;
+				}
+			}
+
+			assertEquals(1, numberOfDirs);
+			assertEquals(1, numberOfFiles);
+
+			fs.delete(dir, true);
+
+		} catch (IOException ioe) {
+			fail(ioe.getMessage());
+		}
+	}
+
+	/**
+	 * This test checks the S3 implementation of the file system method to retrieve the block locations of a file.
+	 */
+	@Test
+	public void blockLocationTest() {
+
+		if (!testActivated()) {
+			return;
+		}
+
+		final String dirName = getRandomName();
+		final String fileName = getRandomName();
+		final Path dir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR);
+		final Path file = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + fileName);
+
+		try {
+
+			final FileSystem fs = dir.getFileSystem();
+
+			fs.mkdirs(dir);
+
+			final OutputStream os = fs.create(file, true);
+			generateTestData(os, SMALL_FILE_SIZE);
+			os.close();
+
+			final FileStatus fileStatus = fs.getFileStatus(file);
+			assertNotNull(fileStatus);
+
+			BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, 0, SMALL_FILE_SIZE + 1);
+			assertNull(blockLocations);
+
+			blockLocations = fs.getFileBlockLocations(fileStatus, 0, SMALL_FILE_SIZE);
+			assertEquals(1, blockLocations.length);
+
+			final BlockLocation bl = blockLocations[0];
+			assertNotNull(bl.getHosts());
+			assertEquals(1, bl.getHosts().length);
+			assertEquals(SMALL_FILE_SIZE, bl.getLength());
+			assertEquals(0, bl.getOffset());
+			final URI s3Uri = fs.getUri();
+			assertNotNull(s3Uri);
+			assertEquals(s3Uri.getHost(), bl.getHosts()[0]);
+
+			fs.delete(dir, true);
+
+		} catch (IOException ioe) {
+			fail(ioe.getMessage());
+		}
+	}
+
+	/**
+	 * Creates and reads a file with the given size in S3. The test file is generated according to a specific pattern.
+	 * During the read phase the incoming data stream is also checked against this pattern.
+	 * 
+	 * @param fileSize
+	 *        the size of the file to be generated in bytes
+	 * @throws IOException
+	 *         thrown if an I/O error occurs while writing or reading the test file
+	 */
+	private void createAndReadFileTest(final int fileSize) throws IOException {
+
+		if (!testActivated()) {
+			return;
+		}
+
+		final String bucketName = getRandomName();
+		final String objectName = getRandomName();
+		final Path bucketPath = new Path(S3_BASE_URI + bucketName + Path.SEPARATOR);
+		final Path objectPath = new Path(S3_BASE_URI + bucketName + Path.SEPARATOR + objectName);
+
+		FileSystem fs = bucketPath.getFileSystem();
+
+		// Create test bucket
+		fs.mkdirs(bucketPath);
+
+		// Write test file to S3
+		final FSDataOutputStream outputStream = fs.create(objectPath, false);
+		generateTestData(outputStream, fileSize);
+		outputStream.close();
+
+		// Now read the same file back from S3
+		final FSDataInputStream inputStream = fs.open(objectPath);
+		testReceivedData(inputStream, fileSize);
+		inputStream.close();
+
+		// Delete test bucket
+		fs.delete(bucketPath, true);
+	}
+
+	/**
+	 * Receives test data from the given input stream and checks the size of the data as well as the pattern inside the
+	 * received data.
+	 * 
+	 * @param inputStream
+	 *        the input stream to read the test data from
+	 * @param expectedSize
+	 *        the expected size of the data to be read from the input stream in bytes
+	 * @throws IOException
+	 *         thrown if an error occurs while reading the data
+	 */
+	private void testReceivedData(final InputStream inputStream, final int expectedSize) throws IOException {
+
+		final byte[] testBuffer = new byte[TEST_BUFFER_SIZE];
+
+		int totalBytesRead = 0;
+		int nextExpectedNumber = 0;
+		while (true) {
+
+			final int bytesRead = inputStream.read(testBuffer);
+			if (bytesRead < 0) {
+				break;
+			}
+
+			totalBytesRead += bytesRead;
+
+			for (int i = 0; i < bytesRead; ++i) {
+				if (testBuffer[i] != nextExpectedNumber) {
+					throw new IOException("Read number " + testBuffer[i] + " but expected " + nextExpectedNumber);
+				}
+
+				++nextExpectedNumber;
+
+				if (nextExpectedNumber == MODULUS) {
+					nextExpectedNumber = 0;
+				}
+			}
+		}
+
+		if (totalBytesRead != expectedSize) {
+			throw new IOException("Expected to read " + expectedSize + " bytes but only received " + totalBytesRead);
+		}
+	}
+
+	/**
+	 * Generates test data of the given size according to some specific pattern and writes it to the provided output
+	 * stream.
+	 * 
+	 * @param outputStream
+	 *        the output stream to write the data to
+	 * @param size
+	 *        the size of the test data to be generated in bytes
+	 * @throws IOException
+	 *         thrown if an error occurs while writing the data
+	 */
+	private void generateTestData(final OutputStream outputStream, final int size) throws IOException {
+
+		final byte[] testBuffer = new byte[TEST_BUFFER_SIZE];
+		for (int i = 0; i < testBuffer.length; ++i) {
+			testBuffer[i] = (byte) (i % MODULUS);
+		}
+
+		int bytesWritten = 0;
+		while (bytesWritten < size) {
+
+			final int diff = size - bytesWritten;
+			if (diff < testBuffer.length) {
+				outputStream.write(testBuffer, 0, diff);
+				bytesWritten += diff;
+			} else {
+				outputStream.write(testBuffer);
+				bytesWritten += testBuffer.length;
+			}
+		}
+	}
+
+	/**
+	 * Generates a random name.
+	 * 
+	 * @return a random name
+	 */
+	private String getRandomName() {
+
+		final StringBuilder stringBuilder = new StringBuilder();
+		for (int i = 0; i < NAME_LENGTH; ++i) {
+			final char c = ALPHABET[(int) (Math.random() * (double) ALPHABET.length)];
+			stringBuilder.append(c);
+		}
+
+		return stringBuilder.toString();
+	}
+
+	/**
+	 * Checks whether the AWS access key and the AWS secret keys have been successfully loaded from the configuration
+	 * and whether the S3 tests shall be performed.
+	 * 
+	 * @return <code>true</code> if the tests shall be performed, <code>false</code> if the tests shall be skipped
+	 *         because at least one AWS key is missing
+	 */
+	private boolean testActivated() {
+
+		final String accessKey = GlobalConfiguration.getString(S3FileSystem.S3_ACCESS_KEY_KEY, null);
+		final String secretKey = GlobalConfiguration.getString(S3FileSystem.S3_SECRET_KEY_KEY, null);
+
+		if (accessKey != null && secretKey != null) {
+			return true;
+		}
+
+		return false;
+	}
+}