You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:30:44 UTC

[09/30] Offer buffer-oriented API for I/O (#25)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/DefaultChannelSelectorTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/DefaultChannelSelectorTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/DefaultChannelSelectorTest.java
new file mode 100644
index 0000000..890b34f
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/DefaultChannelSelectorTest.java
@@ -0,0 +1,47 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io;
+
+import static org.junit.Assert.assertEquals;
+
+import eu.stratosphere.runtime.io.api.RoundRobinChannelSelector;
+import org.junit.Test;
+
+import eu.stratosphere.core.io.StringRecord;
+
+/**
+ * This class checks the functionality of the {@link RoundRobinChannelSelector} class.
+ * 
+ */
+public class DefaultChannelSelectorTest {
+
+	/**
+	 * This test checks the channel selection
+	 */
+	@Test
+	public void channelSelect() {
+
+		final StringRecord dummyRecord = new StringRecord("abc");
+		final RoundRobinChannelSelector<StringRecord> selector = new RoundRobinChannelSelector<StringRecord>();
+		// Test with two channels
+		final int numberOfOutputChannels = 2;
+		int[] selectedChannels = selector.selectChannels(dummyRecord, numberOfOutputChannels);
+		assertEquals(1, selectedChannels.length);
+		assertEquals(1, selectedChannels[0]);
+		selectedChannels = selector.selectChannels(dummyRecord, numberOfOutputChannels);
+		assertEquals(1, selectedChannels.length);
+		assertEquals(0, selectedChannels[0]);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/library/FileLineReadWriteTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/library/FileLineReadWriteTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/library/FileLineReadWriteTest.java
new file mode 100644
index 0000000..17c2f58
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/library/FileLineReadWriteTest.java
@@ -0,0 +1,136 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.library;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import java.io.File;
+import java.io.IOException;
+
+import eu.stratosphere.runtime.io.api.RecordWriter;
+import eu.stratosphere.nephele.util.FileLineReader;
+import eu.stratosphere.nephele.util.FileLineWriter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.core.fs.FileInputSplit;
+import eu.stratosphere.core.fs.Path;
+import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.nephele.execution.Environment;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.nephele.template.InputSplitProvider;
+
+/**
+ * This class checks the functionality of the {@link eu.stratosphere.nephele.util.FileLineReader} and the {@link eu.stratosphere.nephele.util.FileLineWriter} class.
+ * 
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FileLineReader.class)
+public class FileLineReadWriteTest {
+
+	@Mock
+	private Environment environment;
+
+	@Mock
+	private Configuration conf;
+
+	@Mock
+	private RecordReader<StringRecord> recordReader;
+
+	@Mock
+	private RecordWriter<StringRecord> recordWriter;
+
+	@Mock
+	private InputSplitProvider inputSplitProvider;
+
+	private File file = new File("./tmp");
+
+	/**
+	 * Set up mocks
+	 * 
+	 * @throws IOException
+	 */
+	@Before
+	public void before() throws Exception {
+
+		MockitoAnnotations.initMocks(this);
+	}
+
+	/**
+	 * remove the temporary file
+	 */
+	@After
+	public void after() {
+		this.file.delete();
+	}
+
+	/**
+	 * Tests the read and write methods
+	 * 
+	 * @throws Exception
+	 */
+	@Test
+	public void testReadWrite() throws Exception {
+
+		this.file.createNewFile();
+		FileLineWriter writer = new FileLineWriter();
+		Whitebox.setInternalState(writer, "environment", this.environment);
+		Whitebox.setInternalState(writer, "input", this.recordReader);
+		when(this.environment.getTaskConfiguration()).thenReturn(this.conf);
+
+		when(this.conf.getString("outputPath", null)).thenReturn(this.file.toURI().toString());
+		when(this.recordReader.hasNext()).thenReturn(true, true, true, false);
+		StringRecord in = new StringRecord("abc");
+		try {
+			when(this.recordReader.next()).thenReturn(in);
+		} catch (IOException e) {
+			fail();
+			e.printStackTrace();
+		} catch (InterruptedException e) {
+			fail();
+			e.printStackTrace();
+		}
+		writer.invoke();
+
+		final FileInputSplit split = new FileInputSplit(0, new Path(this.file.toURI().toString()), 0,
+			this.file.length(), null);
+		when(this.environment.getInputSplitProvider()).thenReturn(this.inputSplitProvider);
+		when(this.inputSplitProvider.getNextInputSplit()).thenReturn(split, (FileInputSplit) null);
+
+		FileLineReader reader = new FileLineReader();
+		Whitebox.setInternalState(reader, "environment", this.environment);
+		Whitebox.setInternalState(reader, "output", this.recordWriter);
+		StringRecord record = mock(StringRecord.class);
+
+		whenNew(StringRecord.class).withNoArguments().thenReturn(record);
+
+		reader.invoke();
+
+		// verify the correct bytes have been written and read
+		verify(record, times(3)).set(in.getBytes());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReaderWriterTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReaderWriterTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReaderWriterTest.java
new file mode 100644
index 0000000..b8914a8
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReaderWriterTest.java
@@ -0,0 +1,394 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.envelope;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.nephele.util.DiscardingRecycler;
+import eu.stratosphere.nephele.util.TestBufferProvider;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProviderBroker;
+import eu.stratosphere.runtime.io.BufferRecycler;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+public class EnvelopeReaderWriterTest {
+	
+	private final long RANDOM_SEED = 520346508276087l;
+
+	private static final int BUFFER_SIZE = 32768;
+	
+	private static final byte BUFFER_CONTENT = 13;
+	
+	private final int[] BUFFER_SIZES = { 0, 2, BUFFER_SIZE, 3782, 88, 0, 23};
+	
+	private final AbstractEvent[][] EVENT_LISTS = {
+		{},
+		{},
+		{},
+		{ new TestEvent1(34872527) },
+		{ new TestEvent1(8749653), new TestEvent1(365345) },
+		{ new TestEvent2(34563456), new TestEvent1(598432), new TestEvent2(976293845) },
+		{}
+	};
+
+	@Test
+	public void testWriteAndRead() {
+		
+		Assert.assertTrue("Test broken.", BUFFER_SIZES.length == EVENT_LISTS.length);
+
+		File testFile = null;
+		RandomAccessFile raf = null;
+		try {
+			testFile = File.createTempFile("envelopes", ".tmp");
+			raf = new RandomAccessFile(testFile, "rw");
+			
+			// write
+			FileChannel c = raf.getChannel();
+			writeEnvelopes(c);
+			
+			// read
+			c.position(0);
+			readEnvelopes(c, -1.0f);
+			c.close();
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+		finally {
+			if (raf != null)
+				try { raf.close(); } catch (Throwable t) {}
+			
+			if (testFile != null)
+				testFile.delete();
+		}
+	}
+	
+	@Test
+	public void testWriteAndReadChunked() {
+		
+		Assert.assertTrue("Test broken.", BUFFER_SIZES.length == EVENT_LISTS.length);
+
+		File testFile = null;
+		RandomAccessFile raf = null;
+		try {
+			testFile = File.createTempFile("envelopes", ".tmp");
+			raf = new RandomAccessFile(testFile, "rw");
+			
+			// write
+			FileChannel c = raf.getChannel();
+			writeEnvelopes(new ChunkedWriteableChannel(c));
+			
+			// read
+			c.position(0);
+			readEnvelopes(new ChunkedReadableChannel(c), 0.75f);
+			c.close();
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+		finally {
+			if (raf != null)
+				try { raf.close(); } catch (Throwable t) {}
+			
+			if (testFile != null)
+				testFile.delete();
+		}
+	}
+
+	private void writeEnvelopes(WritableByteChannel channel) throws IOException {
+
+		final BufferRecycler recycler = new DiscardingRecycler();
+		final Random rand = new Random(RANDOM_SEED);
+		
+		final EnvelopeWriter serializer = new EnvelopeWriter();
+		
+		final int NUM_ENVS = BUFFER_SIZES.length;
+		
+		for (int i = 0; i < NUM_ENVS; i++) {
+			int seqNum = Math.abs(rand.nextInt());
+			JobID jid = new JobID(rand.nextLong(), rand.nextLong());
+			ChannelID sid = new ChannelID(rand.nextLong(), rand.nextLong());
+			
+			Envelope env = new Envelope(seqNum, jid, sid);
+			if (EVENT_LISTS[i].length > 0) {
+				env.serializeEventList(Arrays.asList(EVENT_LISTS[i]));
+			}
+			
+			int bufferSize = BUFFER_SIZES[i];
+			if (bufferSize > 0) {
+				MemorySegment ms = new MemorySegment(new byte[BUFFER_SIZE]);
+				for (int x = 0; x < bufferSize; x++) {
+					ms.put(x, BUFFER_CONTENT);
+				}
+				
+				Buffer mb = new Buffer(ms, bufferSize, recycler);
+				env.setBuffer(mb);
+			}
+			
+			serializer.setEnvelopeForWriting(env);
+			
+			while (serializer.writeNextChunk(channel));
+		}
+	}
+	
+	private void readEnvelopes(ReadableByteChannel channel, float probabilityForNoBufferCurrently) throws IOException {
+		
+		final Random rand = new Random(RANDOM_SEED);
+		
+		final EnvelopeReader reader = new EnvelopeReader(new OneForAllBroker(BUFFER_SIZE, probabilityForNoBufferCurrently));
+		
+		final int NUM_ENVS = BUFFER_SIZES.length;
+		
+		for (int i = 0; i < NUM_ENVS; i++) {
+			int expectedSeqNum = Math.abs(rand.nextInt());
+			JobID expectedJid = new JobID(rand.nextLong(), rand.nextLong());
+			ChannelID expectedSid = new ChannelID(rand.nextLong(), rand.nextLong());
+			
+			// read the next envelope
+			while (reader.readNextChunk(channel) != EnvelopeReader.DeserializationState.COMPLETE);
+			Envelope env = reader.getFullyDeserializedTransferEnvelope();
+			
+			// check the basic fields from the header
+			Assert.assertEquals(expectedSeqNum, env.getSequenceNumber());
+			Assert.assertEquals(expectedJid, env.getJobID());
+			Assert.assertEquals(expectedSid, env.getSource());
+			
+			// check the events
+			List<? extends AbstractEvent> events = env.deserializeEvents();
+			Assert.assertEquals(EVENT_LISTS[i].length, events.size());
+			
+			for (int n = 0; n < EVENT_LISTS[i].length; n++) {
+				AbstractEvent expectedEvent = EVENT_LISTS[i][n];
+				AbstractEvent actualEvent = events.get(n);
+				
+				Assert.assertEquals(expectedEvent.getClass(), actualEvent.getClass());
+				Assert.assertEquals(expectedEvent, actualEvent);
+			}
+			
+			// check the buffer
+			Buffer buf = env.getBuffer();
+			if (buf == null) {
+				Assert.assertTrue(BUFFER_SIZES[i] == 0);
+			} else {
+				Assert.assertEquals(BUFFER_SIZES[i], buf.size());
+				for (int k = 0; k < BUFFER_SIZES[i]; k++) {
+					Assert.assertEquals(BUFFER_CONTENT, buf.getMemorySegment().get(k));
+				}
+			}
+			
+			reader.reset();
+		}
+		
+	}
+	
+	
+	public  static final class TestEvent1 extends AbstractEvent {
+
+		private long id;
+		
+		public TestEvent1() {}
+		
+		public TestEvent1(long id) {
+			this.id = id;
+		}
+		
+		@Override
+		public void write(DataOutput out) throws IOException {
+			out.writeLong(id);
+		}
+
+		@Override
+		public void read(DataInput in) throws IOException {
+			id = in.readLong();
+		}
+		
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == TestEvent1.class) {
+				return ((TestEvent1) obj).id == this.id;
+			} else {
+				return false;
+			}
+		}
+		
+		@Override
+		public int hashCode() {
+			return ((int) id) ^ ((int) (id >>> 32));
+		}
+		
+		@Override
+		public String toString() {
+			return "TestEvent1 (" + id + ")";
+		}
+	}
+	
+	public static final class TestEvent2 extends AbstractEvent {
+
+		private long id;
+		
+		public TestEvent2() {}
+		
+		public TestEvent2(long id) {
+			this.id = id;
+		}
+		
+		@Override
+		public void write(DataOutput out) throws IOException {
+			out.writeLong(id);
+		}
+
+		@Override
+		public void read(DataInput in) throws IOException {
+			id = in.readLong();
+		}
+		
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == TestEvent2.class) {
+				return ((TestEvent2) obj).id == this.id;
+			} else {
+				return false;
+			}
+		}
+		
+		@Override
+		public int hashCode() {
+			return ((int) id) ^ ((int) (id >>> 32));
+		}
+		
+		@Override
+		public String toString() {
+			return "TestEvent2 (" + id + ")";
+		}
+	}
+	
+	private static final class ChunkedWriteableChannel implements WritableByteChannel {
+		
+		private final WritableByteChannel delegate;
+		
+		private final Random rnd;
+		
+		private ChunkedWriteableChannel(WritableByteChannel delegate) {
+			this.delegate = delegate;
+			this.rnd = new Random();
+		}
+
+		@Override
+		public boolean isOpen() {
+			return this.delegate.isOpen();
+		}
+
+		@Override
+		public void close() throws IOException {
+			this.delegate.close();
+		}
+
+		@Override
+		public int write(ByteBuffer src) throws IOException {
+			final int available = src.remaining();
+			final int oldLimit = src.limit();
+			
+			int toWrite = rnd.nextInt(available) + 1;
+			toWrite = Math.min(Math.max(toWrite, 8), available);
+			
+			src.limit(src.position() + toWrite);
+			
+			int written = this.delegate.write(src);
+			
+			src.limit(oldLimit);
+			
+			return written;
+		}
+	}
+	
+	private static final class ChunkedReadableChannel implements ReadableByteChannel {
+		
+		private final ReadableByteChannel delegate;
+		
+		private final Random rnd;
+		
+		private ChunkedReadableChannel(ReadableByteChannel delegate) {
+			this.delegate = delegate;
+			this.rnd = new Random();
+		}
+
+		@Override
+		public boolean isOpen() {
+			return this.delegate.isOpen();
+		}
+
+		@Override
+		public void close() throws IOException {
+			this.delegate.close();
+		}
+
+		@Override
+		public int read(ByteBuffer dst) throws IOException {
+			final int available = dst.remaining();
+			final int oldLimit = dst.limit();
+			
+			int toRead = rnd.nextInt(available) + 1;
+			toRead = Math.min(Math.max(toRead, 8), available);
+			
+			dst.limit(dst.position() + toRead);
+			
+			int read = this.delegate.read(dst);
+			
+			dst.limit(oldLimit);
+			
+			return read;
+		}
+	}
+	
+	private static final class OneForAllBroker implements BufferProviderBroker {
+
+		private final TestBufferProvider provider;
+
+		private OneForAllBroker(int sizeOfMemorySegments) {
+			this.provider = new TestBufferProvider(sizeOfMemorySegments);
+		}
+		
+		private OneForAllBroker(int sizeOfMemorySegments, float probabilityForNoBufferCurrently) {
+			this.provider = new TestBufferProvider(sizeOfMemorySegments, probabilityForNoBufferCurrently);
+		}
+		
+		@Override
+		public BufferProvider getBufferProvider(JobID jobID, ChannelID sourceChannelID) {
+			return this.provider;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/DataInputOutputSerializerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/DataInputOutputSerializerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/DataInputOutputSerializerTest.java
new file mode 100644
index 0000000..55c0243
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/DataInputOutputSerializerTest.java
@@ -0,0 +1,115 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.runtime.io.serialization.types.SerializationTestType;
+import eu.stratosphere.runtime.io.serialization.types.SerializationTestTypeFactory;
+import eu.stratosphere.runtime.io.serialization.types.Util;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+
+public class DataInputOutputSerializerTest {
+
+	@Test
+	public void testWrapAsByteBuffer() {
+		SerializationTestType randomInt = Util.randomRecord(SerializationTestTypeFactory.INT);
+
+		DataOutputSerializer serializer = new DataOutputSerializer(randomInt.length());
+		MemorySegment segment = new MemorySegment(new byte[randomInt.length()]);
+
+		try {
+			// empty buffer, read buffer should be empty
+			ByteBuffer wrapper = serializer.wrapAsByteBuffer();
+
+			Assert.assertEquals(0, wrapper.position());
+			Assert.assertEquals(0, wrapper.limit());
+
+			// write to data output, read buffer should still be empty
+			randomInt.write(serializer);
+
+			Assert.assertEquals(0, wrapper.position());
+			Assert.assertEquals(0, wrapper.limit());
+
+			// get updated read buffer, read buffer should contain written data
+			wrapper = serializer.wrapAsByteBuffer();
+
+			Assert.assertEquals(0, wrapper.position());
+			Assert.assertEquals(randomInt.length(), wrapper.limit());
+
+			// clear data output, read buffer should still contain written data
+			serializer.clear();
+
+			Assert.assertEquals(0, wrapper.position());
+			Assert.assertEquals(randomInt.length(), wrapper.limit());
+
+			// get updated read buffer, should be empty
+			wrapper = serializer.wrapAsByteBuffer();
+
+			Assert.assertEquals(0, wrapper.position());
+			Assert.assertEquals(0, wrapper.limit());
+
+			// write to data output and read back to memory
+			randomInt.write(serializer);
+			wrapper = serializer.wrapAsByteBuffer();
+
+			segment.put(0, wrapper, randomInt.length());
+
+			Assert.assertEquals(randomInt.length(), wrapper.position());
+			Assert.assertEquals(randomInt.length(), wrapper.limit());
+		} catch (IOException e) {
+			e.printStackTrace();
+			Assert.fail("Test encountered an unexpected exception.");
+		}
+	}
+
+	@Test
+	public void testRandomValuesWriteRead() {
+		final int numElements = 100000;
+		final ArrayDeque<SerializationTestType> reference = new ArrayDeque<SerializationTestType>();
+
+		DataOutputSerializer serializer = new DataOutputSerializer(1);
+
+		for (SerializationTestType value : Util.randomRecords(numElements)) {
+			reference.add(value);
+
+			try {
+				value.write(serializer);
+			} catch (IOException e) {
+				e.printStackTrace();
+				Assert.fail("Test encountered an unexpected exception.");
+			}
+		}
+
+		DataInputDeserializer deserializer = new DataInputDeserializer(serializer.wrapAsByteBuffer());
+
+		for (SerializationTestType expected : reference) {
+			try {
+				SerializationTestType actual = expected.getClass().newInstance();
+				actual.read(deserializer);
+
+				Assert.assertEquals(expected, actual);
+			} catch (Exception e) {
+				e.printStackTrace();
+				Assert.fail("Test encountered an unexpected exception.");
+			}
+		}
+
+		reference.clear();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java
new file mode 100644
index 0000000..817c0e6
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java
@@ -0,0 +1,160 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.nephele.services.memorymanager.AbstractPagedInputView;
+import eu.stratosphere.nephele.services.memorymanager.AbstractPagedOutputView;
+import eu.stratosphere.runtime.io.serialization.types.SerializationTestType;
+import eu.stratosphere.runtime.io.serialization.types.SerializationTestTypeFactory;
+import eu.stratosphere.runtime.io.serialization.types.Util;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class PagedViewsTest {
+
+	@Test
+	public void testSequenceOfIntegersWithAlignedBuffers() {
+		try {
+			final int NUM_INTS = 1000000;
+
+			testSequenceOfTypes(Util.randomRecords(NUM_INTS, SerializationTestTypeFactory.INT), 2048);
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail("Test encountered an unexpected exception.");
+		}
+	}
+
+	@Test
+	public void testSequenceOfIntegersWithUnalignedBuffers() {
+		try {
+			final int NUM_INTS = 1000000;
+
+			testSequenceOfTypes(Util.randomRecords(NUM_INTS, SerializationTestTypeFactory.INT), 2047);
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail("Test encountered an unexpected exception.");
+		}
+	}
+
+	@Test
+	public void testRandomTypes() {
+		try {
+			final int NUM_TYPES = 100000;
+
+			// test with an odd buffer size to force many unaligned cases
+			testSequenceOfTypes(Util.randomRecords(NUM_TYPES), 57);
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail("Test encountered an unexpected exception.");
+		}
+	}
+
+	private static void testSequenceOfTypes(Iterable<SerializationTestType> sequence, int segmentSize) throws Exception {
+
+		List<SerializationTestType> elements = new ArrayList<SerializationTestType>(512);
+		TestOutputView outView = new TestOutputView(segmentSize);
+
+		// write
+		for (SerializationTestType type : sequence) {
+			// serialize the record
+			type.write(outView);
+			elements.add(type);
+		}
+		outView.close();
+
+		// check the records
+		TestInputView inView = new TestInputView(outView.segments);
+
+		for (SerializationTestType reference : elements) {
+			SerializationTestType result = reference.getClass().newInstance();
+			result.read(inView);
+			assertEquals(reference, result);
+		}
+	}
+
+	// ============================================================================================
+
+	private static final class SegmentWithPosition {
+
+		private final MemorySegment segment;
+		private final int position;
+
+		public SegmentWithPosition(MemorySegment segment, int position) {
+			this.segment = segment;
+			this.position = position;
+		}
+	}
+
+	private static final class TestOutputView extends AbstractPagedOutputView {
+
+		private final List<SegmentWithPosition> segments = new ArrayList<SegmentWithPosition>();
+
+		private final int segmentSize;
+
+		private TestOutputView(int segmentSize) {
+			super(new MemorySegment(new byte[segmentSize]), segmentSize, 0);
+
+			this.segmentSize = segmentSize;
+		}
+
+		@Override
+		protected MemorySegment nextSegment(MemorySegment current, int positionInCurrent) throws IOException {
+			segments.add(new SegmentWithPosition(current, positionInCurrent));
+			return new MemorySegment(new byte[segmentSize]);
+		}
+
+		public void close() {
+			segments.add(new SegmentWithPosition(getCurrentSegment(), getCurrentPositionInSegment()));
+		}
+	}
+
+	private static final class TestInputView extends AbstractPagedInputView {
+
+		private final List<SegmentWithPosition> segments;
+
+		private int num;
+
+
+		private TestInputView(List<SegmentWithPosition> segments) {
+			super(segments.get(0).segment, segments.get(0).position, 0);
+
+			this.segments = segments;
+			this.num = 0;
+		}
+
+		@Override
+		protected MemorySegment nextSegment(MemorySegment current) throws IOException {
+			num++;
+			if (num < segments.size()) {
+				return segments.get(num).segment;
+			} else {
+				return null;
+			}
+		}
+
+		@Override
+		protected int getLimitForSegment(MemorySegment segment) {
+			return segments.get(num).position;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializationTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializationTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializationTest.java
new file mode 100644
index 0000000..094e597
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializationTest.java
@@ -0,0 +1,164 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.serialization.RecordDeserializer.DeserializationResult;
+import eu.stratosphere.runtime.io.serialization.types.SerializationTestType;
+import eu.stratosphere.runtime.io.serialization.types.SerializationTestTypeFactory;
+import eu.stratosphere.runtime.io.serialization.types.Util;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+
+public class SpanningRecordSerializationTest {
+
+	@Test
+	public void testIntRecordsSpanningMultipleSegments() {
+		final int SEGMENT_SIZE = 1;
+		final int NUM_VALUES = 10;
+
+		try {
+			test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test encountered an unexpected exception.");
+		}
+	}
+
+	@Test
+	public void testIntRecordsWithAlignedBuffers () {
+		final int SEGMENT_SIZE = 64;
+		final int NUM_VALUES = 64;
+
+		try {
+			test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test encountered an unexpected exception.");
+		}
+	}
+
+	@Test
+	public void testIntRecordsWithUnalignedBuffers () {
+		final int SEGMENT_SIZE = 31;
+		final int NUM_VALUES = 248;
+
+		try {
+			test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test encountered an unexpected exception.");
+		}
+	}
+
+	@Test
+	 public void testRandomRecords () {
+		final int SEGMENT_SIZE = 127;
+		final int NUM_VALUES = 10000;
+
+		try {
+			test(Util.randomRecords(NUM_VALUES), SEGMENT_SIZE);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test encountered an unexpected exception.");
+		}
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	/**
+	 * Iterates over the provided records and tests whether {@link SpanningRecordSerializer} and {@link AdaptiveSpanningRecordDeserializer}
+	 * interact as expected.
+	 * <p>
+	 * Only a single {@link MemorySegment} will be allocated.
+	 *
+	 * @param records records to test
+	 * @param segmentSize size for the {@link MemorySegment}
+	 */
+	private void test (Util.MockRecords records, int segmentSize) throws Exception {
+		final int SERIALIZATION_OVERHEAD = 4; // length encoding
+
+		final RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
+		final RecordDeserializer<SerializationTestType> deserializer = new AdaptiveSpanningRecordDeserializer<SerializationTestType>();
+
+		final Buffer buffer = new Buffer(new MemorySegment(new byte[segmentSize]), segmentSize, null);
+
+		final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<SerializationTestType>();
+
+		// -------------------------------------------------------------------------------------------------------------
+
+		serializer.setNextBuffer(buffer);
+
+		int numBytes = 0;
+		int numRecords = 0;
+		for (SerializationTestType record : records) {
+
+			serializedRecords.add(record);
+
+			numRecords++;
+			numBytes += record.length() + SERIALIZATION_OVERHEAD;
+
+			// serialize record
+			if (serializer.addRecord(record).isFullBuffer()) {
+				// buffer is full => start deserializing
+				deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), segmentSize);
+
+				while (!serializedRecords.isEmpty()) {
+					SerializationTestType expected = serializedRecords.poll();
+					SerializationTestType actual = expected.getClass().newInstance();
+
+					if (deserializer.getNextRecord(actual).isFullRecord()) {
+						Assert.assertEquals(expected, actual);
+						numRecords--;
+					} else {
+						serializedRecords.addFirst(expected);
+						break;
+					}
+				}
+
+				while (serializer.setNextBuffer(buffer).isFullBuffer()) {
+					deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), segmentSize);
+				}
+
+
+
+			}
+		}
+
+		// deserialize left over records
+		deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), (numBytes % segmentSize));
+
+		serializer.clear();
+
+		while (!serializedRecords.isEmpty()) {
+			SerializationTestType expected = serializedRecords.poll();
+
+			SerializationTestType actual = expected.getClass().newInstance();
+			DeserializationResult result = deserializer.getNextRecord(actual);
+
+			Assert.assertTrue(result.isFullRecord());
+			Assert.assertEquals(expected, actual);
+			numRecords--;
+		}
+
+
+		// assert that all records have been serialized and deserialized
+		Assert.assertEquals(0, numRecords);
+		Assert.assertFalse(serializer.hasData());
+		Assert.assertFalse(deserializer.hasUnfinishedData());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializerTest.java
new file mode 100644
index 0000000..637b7d5
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializerTest.java
@@ -0,0 +1,219 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.serialization.RecordSerializer.SerializationResult;
+import eu.stratosphere.runtime.io.serialization.types.SerializationTestType;
+import eu.stratosphere.runtime.io.serialization.types.SerializationTestTypeFactory;
+import eu.stratosphere.runtime.io.serialization.types.Util;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class SpanningRecordSerializerTest {
+
+	@Test
+	public void testHasData() {
+		final int SEGMENT_SIZE = 16;
+
+		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
+		final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), SEGMENT_SIZE, null);
+		final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT);
+
+		Assert.assertFalse(serializer.hasData());
+
+		try {
+			serializer.addRecord(randomIntRecord);
+			Assert.assertTrue(serializer.hasData());
+
+			serializer.setNextBuffer(buffer);
+			Assert.assertTrue(serializer.hasData());
+
+			serializer.clear();
+			Assert.assertFalse(serializer.hasData());
+
+			serializer.setNextBuffer(buffer);
+
+			serializer.addRecord(randomIntRecord);
+			Assert.assertTrue(serializer.hasData());
+
+			serializer.addRecord(randomIntRecord);
+			Assert.assertTrue(serializer.hasData());
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+
+	}
+
+	@Test
+	public void testEmptyRecords() {
+		final int SEGMENT_SIZE = 11;
+
+		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
+		final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), SEGMENT_SIZE, null);
+
+		try {
+			Assert.assertEquals(SerializationResult.FULL_RECORD, serializer.setNextBuffer(buffer));
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+
+		try {
+			SerializationTestType emptyRecord = new SerializationTestType() {
+				@Override
+				public SerializationTestType getRandom(Random rnd) {
+					throw new UnsupportedOperationException();
+				}
+
+				@Override
+				public int length() {
+					throw new UnsupportedOperationException();
+				}
+
+				@Override
+				public void write(DataOutput out) throws IOException {
+				}
+
+				@Override
+				public void read(DataInput in) throws IOException {
+				}
+
+				@Override
+				public int hashCode() {
+					throw new UnsupportedOperationException();
+				}
+
+				@Override
+				public boolean equals(Object obj) {
+					throw new UnsupportedOperationException();
+				}
+			};
+
+			SerializationResult result = serializer.addRecord(emptyRecord);
+			Assert.assertEquals(SerializationResult.FULL_RECORD, result);
+
+			result = serializer.addRecord(emptyRecord);
+			Assert.assertEquals(SerializationResult.FULL_RECORD, result);
+
+			result = serializer.addRecord(emptyRecord);
+			Assert.assertEquals(SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result);
+
+			result = serializer.setNextBuffer(buffer);
+			Assert.assertEquals(SerializationResult.FULL_RECORD, result);
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+	}
+
+	@Test
+	public void testIntRecordsSpanningMultipleSegments() {
+		final int SEGMENT_SIZE = 1;
+		final int NUM_VALUES = 10;
+
+		try {
+			test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test encountered an unexpected exception.");
+		}
+	}
+
+	@Test
+	public void testIntRecordsWithAlignedSegments() {
+		final int SEGMENT_SIZE = 64;
+		final int NUM_VALUES = 64;
+
+		try {
+			test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test encountered an unexpected exception.");
+		}
+	}
+
+	@Test
+	public void testIntRecordsWithUnalignedSegments() {
+		final int SEGMENT_SIZE = 31;
+		final int NUM_VALUES = 248; // least common multiple => last record should align
+
+		try {
+			test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test encountered an unexpected exception.");
+		}
+	}
+
+	@Test
+	public void testRandomRecords() {
+		final int SEGMENT_SIZE = 127;
+		final int NUM_VALUES = 100000;
+
+		try {
+			test(Util.randomRecords(NUM_VALUES), SEGMENT_SIZE);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test encountered an unexpected exception.");
+		}
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	/**
+	 * Iterates over the provided records and tests whether the {@link SpanningRecordSerializer} returns the expected
+	 * {@link SerializationResult} values.
+	 * <p>
+	 * Only a single {@link MemorySegment} will be allocated.
+	 *
+	 * @param records records to test
+	 * @param segmentSize size for the {@link MemorySegment}
+	 */
+	private void test(Util.MockRecords records, int segmentSize) throws Exception {
+		final int SERIALIZATION_OVERHEAD = 4; // length encoding
+
+		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
+		final Buffer buffer = new Buffer(new MemorySegment(new byte[segmentSize]), segmentSize, null);
+
+		// -------------------------------------------------------------------------------------------------------------
+
+		serializer.setNextBuffer(buffer);
+
+		int numBytes = 0;
+		for (SerializationTestType record : records) {
+			SerializationResult result = serializer.addRecord(record);
+			numBytes += record.length() + SERIALIZATION_OVERHEAD;
+
+			if (numBytes < segmentSize) {
+				Assert.assertEquals(SerializationResult.FULL_RECORD, result);
+			} else if (numBytes == segmentSize) {
+				Assert.assertEquals(SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL, result);
+				serializer.setNextBuffer(buffer);
+				numBytes = 0;
+			} else {
+				Assert.assertEquals(SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result);
+
+				while (result.isFullBuffer()) {
+					numBytes -= segmentSize;
+					result = serializer.setNextBuffer(buffer);
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/AsciiStringType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/AsciiStringType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/AsciiStringType.java
new file mode 100644
index 0000000..7aadc7c
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/AsciiStringType.java
@@ -0,0 +1,77 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class AsciiStringType implements SerializationTestType {
+
+	private static final int MAX_LEN = 1500;
+
+	public String value;
+
+	public AsciiStringType() {
+		this.value = "";
+	}
+
+	private AsciiStringType(String value) {
+		this.value = value;
+	}
+
+	@Override
+	public AsciiStringType getRandom(Random rnd) {
+		final StringBuilder bld = new StringBuilder();
+		final int len = rnd.nextInt(MAX_LEN + 1);
+
+		for (int i = 0; i < len; i++) {
+			// 1--127
+			bld.append((char) (rnd.nextInt(126) + 1));
+		}
+
+		return new AsciiStringType(bld.toString());
+	}
+
+	@Override
+	public int length() {
+		return value.getBytes().length + 2;
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeUTF(this.value);
+	}
+
+	@Override
+	public void read(DataInput in) throws IOException {
+		this.value = in.readUTF();
+	}
+
+	@Override
+	public int hashCode() {
+		return this.value.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof AsciiStringType) {
+			AsciiStringType other = (AsciiStringType) obj;
+			return this.value.equals(other.value);
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/BooleanType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/BooleanType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/BooleanType.java
new file mode 100644
index 0000000..32b2ba3
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/BooleanType.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class BooleanType implements SerializationTestType {
+
+	private boolean value;
+
+	public BooleanType() {
+		this.value = false;
+	}
+
+	private BooleanType(boolean value) {
+		this.value = value;
+	}
+
+	@Override
+	public BooleanType getRandom(Random rnd) {
+		return new BooleanType(rnd.nextBoolean());
+	}
+
+	@Override
+	public int length() {
+		return 1;
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeBoolean(this.value);
+	}
+
+	@Override
+	public void read(DataInput in) throws IOException {
+		this.value = in.readBoolean();
+	}
+
+	@Override
+	public int hashCode() {
+		return this.value ? 1 : 0;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof BooleanType) {
+			BooleanType other = (BooleanType) obj;
+			return this.value == other.value;
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteArrayType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteArrayType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteArrayType.java
new file mode 100644
index 0000000..cb29a5c
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteArrayType.java
@@ -0,0 +1,76 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+public class ByteArrayType implements SerializationTestType {
+
+	private static final int MAX_LEN = 512 * 15;
+
+	private byte[] data;
+
+	public ByteArrayType() {
+		this.data = new byte[0];
+	}
+
+	public ByteArrayType(byte[] data) {
+		this.data = data;
+	}
+
+	@Override
+	public ByteArrayType getRandom(Random rnd) {
+		final int len = rnd.nextInt(MAX_LEN) + 1;
+		final byte[] data = new byte[len];
+		rnd.nextBytes(data);
+		return new ByteArrayType(data);
+	}
+
+	@Override
+	public int length() {
+		return data.length + 4;
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeInt(this.data.length);
+		out.write(this.data);
+	}
+
+	@Override
+	public void read(DataInput in) throws IOException {
+		final int len = in.readInt();
+		this.data = new byte[len];
+		in.readFully(this.data);
+	}
+
+	@Override
+	public int hashCode() {
+		return Arrays.hashCode(this.data);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof ByteArrayType) {
+			ByteArrayType other = (ByteArrayType) obj;
+			return Arrays.equals(this.data, other.data);
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteSubArrayType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteSubArrayType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteSubArrayType.java
new file mode 100644
index 0000000..2b683d2
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteSubArrayType.java
@@ -0,0 +1,91 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+public class ByteSubArrayType implements SerializationTestType {
+
+	private static final int MAX_LEN = 512;
+
+	private final byte[] data;
+
+	private int len;
+
+	public ByteSubArrayType() {
+		this.data = new byte[MAX_LEN];
+		this.len = 0;
+	}
+
+	@Override
+	public ByteSubArrayType getRandom(Random rnd) {
+		final int len = rnd.nextInt(MAX_LEN) + 1;
+		final ByteSubArrayType t = new ByteSubArrayType();
+		t.len = len;
+
+		final byte[] data = t.data;
+		for (int i = 0; i < len; i++) {
+			data[i] = (byte) rnd.nextInt(256);
+		}
+
+		return t;
+	}
+
+	@Override
+	public int length() {
+		return len + 4;
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeInt(this.len);
+		out.write(this.data, 0, this.len);
+	}
+
+	@Override
+	public void read(DataInput in) throws IOException {
+		this.len = in.readInt();
+		in.readFully(this.data, 0, this.len);
+	}
+
+	@Override
+	public int hashCode() {
+		final byte[] copy = new byte[this.len];
+		System.arraycopy(this.data, 0, copy, 0, this.len);
+		return Arrays.hashCode(copy);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof ByteSubArrayType) {
+			ByteSubArrayType other = (ByteSubArrayType) obj;
+			if (this.len == other.len) {
+				for (int i = 0; i < this.len; i++) {
+					if (this.data[i] != other.data[i]) {
+						return false;
+					}
+				}
+				return true;
+			} else {
+				return false;
+			}
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteType.java
new file mode 100644
index 0000000..52abdcb
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteType.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class ByteType implements SerializationTestType {
+
+	private byte value;
+
+	public ByteType() {
+		this.value = (byte) 0;
+	}
+
+	private ByteType(byte value) {
+		this.value = value;
+	}
+
+	@Override
+	public ByteType getRandom(Random rnd) {
+		return new ByteType((byte) rnd.nextInt(256));
+	}
+
+	@Override
+	public int length() {
+		return 1;
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeByte(this.value);
+	}
+
+	@Override
+	public void read(DataInput in) throws IOException {
+		this.value = in.readByte();
+	}
+
+	@Override
+	public int hashCode() {
+		return this.value;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof ByteType) {
+			ByteType other = (ByteType) obj;
+			return this.value == other.value;
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/CharType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/CharType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/CharType.java
new file mode 100644
index 0000000..25df737
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/CharType.java
@@ -0,0 +1,68 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class CharType implements SerializationTestType {
+
+	private char value;
+
+	public CharType() {
+		this.value = 0;
+	}
+
+	private CharType(char value) {
+		this.value = value;
+	}
+
+	@Override
+	public CharType getRandom(Random rnd) {
+		return new CharType((char) rnd.nextInt(10000));
+	}
+
+	@Override
+	public int length() {
+		return 2;
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeChar(this.value);
+	}
+
+	@Override
+	public void read(DataInput in) throws IOException {
+		this.value = in.readChar();
+	}
+
+	@Override
+	public int hashCode() {
+		return this.value;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof CharType) {
+			CharType other = (CharType) obj;
+			return this.value == other.value;
+		} else {
+			return false;
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/DoubleType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/DoubleType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/DoubleType.java
new file mode 100644
index 0000000..1046e75
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/DoubleType.java
@@ -0,0 +1,68 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class DoubleType implements SerializationTestType {
+
+	private double value;
+
+	public DoubleType() {
+		this.value = 0;
+	}
+
+	private DoubleType(double value) {
+		this.value = value;
+	}
+
+	@Override
+	public DoubleType getRandom(Random rnd) {
+		return new DoubleType(rnd.nextDouble());
+	}
+
+	@Override
+	public int length() {
+		return 8;
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeDouble(this.value);
+	}
+
+	@Override
+	public void read(DataInput in) throws IOException {
+		this.value = in.readDouble();
+	}
+
+	@Override
+	public int hashCode() {
+		final long l = Double.doubleToLongBits(this.value);
+		return (int) (l ^ l >>> 32);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof DoubleType) {
+			DoubleType other = (DoubleType) obj;
+			return Double.doubleToLongBits(this.value) == Double.doubleToLongBits(other.value);
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/FloatType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/FloatType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/FloatType.java
new file mode 100644
index 0000000..4a50873
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/FloatType.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class FloatType implements SerializationTestType {
+
+	private float value;
+
+	public FloatType() {
+		this.value = 0;
+	}
+
+	private FloatType(float value) {
+		this.value = value;
+	}
+
+	@Override
+	public FloatType getRandom(Random rnd) {
+		return new FloatType(rnd.nextFloat());
+	}
+
+	@Override
+	public int length() {
+		return 4;
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeFloat(this.value);
+	}
+
+	@Override
+	public void read(DataInput in) throws IOException {
+		this.value = in.readFloat();
+	}
+
+	@Override
+	public int hashCode() {
+		return Float.floatToIntBits(this.value);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof FloatType) {
+			FloatType other = (FloatType) obj;
+			return Float.floatToIntBits(this.value) == Float.floatToIntBits(other.value);
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/IntType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/IntType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/IntType.java
new file mode 100644
index 0000000..50a3546
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/IntType.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class IntType implements SerializationTestType {
+
+	private int value;
+
+	public IntType() {
+		this.value = 0;
+	}
+
+	public IntType(int value) {
+		this.value = value;
+	}
+
+	@Override
+	public IntType getRandom(Random rnd) {
+		return new IntType(rnd.nextInt());
+	}
+
+	@Override
+	public int length() {
+		return 4;
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeInt(this.value);
+	}
+
+	@Override
+	public void read(DataInput in) throws IOException {
+		this.value = in.readInt();
+	}
+
+	@Override
+	public int hashCode() {
+		return this.value;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof IntType) {
+			IntType other = (IntType) obj;
+			return this.value == other.value;
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/LongType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/LongType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/LongType.java
new file mode 100644
index 0000000..1402fb5
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/LongType.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class LongType implements SerializationTestType {
+
+	private long value;
+
+	public LongType() {
+		this.value = 0;
+	}
+
+	private LongType(long value) {
+		this.value = value;
+	}
+
+	@Override
+	public LongType getRandom(Random rnd) {
+		return new LongType(rnd.nextLong());
+	}
+
+	@Override
+	public int length() {
+		return 8;
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeLong(this.value);
+	}
+
+	@Override
+	public void read(DataInput in) throws IOException {
+		this.value = in.readLong();
+	}
+
+	@Override
+	public int hashCode() {
+		return (int) (this.value ^ this.value >>> 32);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof LongType) {
+			LongType other = (LongType) obj;
+			return this.value == other.value;
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/SerializationTestType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/SerializationTestType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/SerializationTestType.java
new file mode 100644
index 0000000..a827b07
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/SerializationTestType.java
@@ -0,0 +1,26 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.util.Random;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+
+public interface SerializationTestType extends IOReadableWritable {
+
+	public SerializationTestType getRandom(Random rnd);
+
+	public int length();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/SerializationTestTypeFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/SerializationTestTypeFactory.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/SerializationTestTypeFactory.java
new file mode 100644
index 0000000..127a0ec
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/SerializationTestTypeFactory.java
@@ -0,0 +1,40 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+public enum SerializationTestTypeFactory {
+	BOOLEAN(new BooleanType()),
+	BYTE_ARRAY(new ByteArrayType()),
+	BYTE_SUB_ARRAY(new ByteSubArrayType()),
+	BYTE(new ByteType()),
+	CHAR(new CharType()),
+	DOUBLE(new DoubleType()),
+	FLOAT(new FloatType()),
+	INT(new IntType()),
+	LONG(new LongType()),
+	SHORT(new ShortType()),
+	UNSIGNED_BYTE(new UnsignedByteType()),
+	UNSIGNED_SHORT(new UnsignedShortType()),
+	STRING(new AsciiStringType());
+
+	private final SerializationTestType factory;
+
+	SerializationTestTypeFactory(SerializationTestType type) {
+		this.factory = type;
+	}
+
+	public SerializationTestType factory() {
+		return this.factory;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ShortType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ShortType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ShortType.java
new file mode 100644
index 0000000..7711e88
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ShortType.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class ShortType implements SerializationTestType {
+
+	private short value;
+
+	public ShortType() {
+		this.value = (short) 0;
+	}
+
+	private ShortType(short value) {
+		this.value = value;
+	}
+
+	@Override
+	public ShortType getRandom(Random rnd) {
+		return new ShortType((short) rnd.nextInt(65536));
+	}
+
+	@Override
+	public int length() {
+		return 2;
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeShort(this.value);
+	}
+
+	@Override
+	public void read(DataInput in) throws IOException {
+		this.value = in.readShort();
+	}
+
+	@Override
+	public int hashCode() {
+		return this.value;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof ShortType) {
+			ShortType other = (ShortType) obj;
+			return this.value == other.value;
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/UnsignedByteType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/UnsignedByteType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/UnsignedByteType.java
new file mode 100644
index 0000000..9a1f1fb
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/UnsignedByteType.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class UnsignedByteType implements SerializationTestType {
+
+	private int value;
+
+	public UnsignedByteType() {
+		this.value = 0;
+	}
+
+	private UnsignedByteType(int value) {
+		this.value = value;
+	}
+
+	@Override
+	public UnsignedByteType getRandom(Random rnd) {
+		return new UnsignedByteType(rnd.nextInt(128) + 128);
+	}
+
+	@Override
+	public int length() {
+		return 1;
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeByte(this.value);
+	}
+
+	@Override
+	public void read(DataInput in) throws IOException {
+		this.value = in.readUnsignedByte();
+	}
+
+	@Override
+	public int hashCode() {
+		return this.value;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof UnsignedByteType) {
+			UnsignedByteType other = (UnsignedByteType) obj;
+			return this.value == other.value;
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/UnsignedShortType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/UnsignedShortType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/UnsignedShortType.java
new file mode 100644
index 0000000..ac80ef7
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/UnsignedShortType.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class UnsignedShortType implements SerializationTestType {
+
+	private int value;
+
+	public UnsignedShortType() {
+		this.value = 0;
+	}
+
+	private UnsignedShortType(int value) {
+		this.value = value;
+	}
+
+	@Override
+	public UnsignedShortType getRandom(Random rnd) {
+		return new UnsignedShortType(rnd.nextInt(32768) + 32768);
+	}
+
+	@Override
+	public int length() {
+		return 2;
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeShort(this.value);
+	}
+
+	@Override
+	public void read(DataInput in) throws IOException {
+		this.value = in.readUnsignedShort();
+	}
+
+	@Override
+	public int hashCode() {
+		return this.value;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof UnsignedShortType) {
+			UnsignedShortType other = (UnsignedShortType) obj;
+			return this.value == other.value;
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/Util.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/Util.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/Util.java
new file mode 100644
index 0000000..ef14f7f
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/Util.java
@@ -0,0 +1,90 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Random;
+
+public class Util {
+
+	private static final long SEED = 64871654635745873L;
+
+	private static Random random = new Random(SEED);
+
+	public static SerializationTestType randomRecord(SerializationTestTypeFactory type) {
+		return type.factory().getRandom(Util.random);
+	}
+
+	public static MockRecords randomRecords(final int numElements, final SerializationTestTypeFactory type) {
+
+		return new MockRecords(numElements) {
+			@Override
+			protected SerializationTestType getRecord() {
+				return type.factory().getRandom(Util.random);
+			}
+		};
+	}
+
+	public static MockRecords randomRecords(final int numElements) {
+
+		return new MockRecords(numElements) {
+			@Override
+			protected SerializationTestType getRecord() {
+				// select random test type factory
+				SerializationTestTypeFactory[] types = SerializationTestTypeFactory.values();
+				int i = Util.random.nextInt(types.length);
+
+				return types[i].factory().getRandom(Util.random);
+			}
+		};
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	public abstract static class MockRecords implements Iterable<SerializationTestType> {
+
+		private int numRecords;
+
+		public MockRecords(int numRecords) {
+			this.numRecords = numRecords;
+		}
+
+		@Override
+		public Iterator<SerializationTestType> iterator() {
+			return new Iterator<SerializationTestType>() {
+				@Override
+				public boolean hasNext() {
+					return numRecords > 0;
+				}
+
+				@Override
+				public SerializationTestType next() {
+					if (numRecords > 0) {
+						numRecords--;
+
+						return getRecord();
+					}
+
+					throw new NoSuchElementException();
+				}
+
+				@Override
+				public void remove() {
+					throw new UnsupportedOperationException();
+				}
+			};
+		}
+
+		abstract protected SerializationTestType getRecord();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java
index db843dd..b80810b 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java
@@ -21,6 +21,8 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import eu.stratosphere.test.util.RecordAPITestBase;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import org.junit.Assert;
 
 import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper;
@@ -31,8 +33,6 @@ import eu.stratosphere.api.java.record.io.CsvInputFormat;
 import eu.stratosphere.api.java.record.io.CsvOutputFormat;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
 import eu.stratosphere.nephele.jobgraph.JobInputVertex;