You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:30:44 UTC
[09/30] Offer buffer-oriented API for I/O (#25)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/DefaultChannelSelectorTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/DefaultChannelSelectorTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/DefaultChannelSelectorTest.java
new file mode 100644
index 0000000..890b34f
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/DefaultChannelSelectorTest.java
@@ -0,0 +1,47 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io;
+
+import static org.junit.Assert.assertEquals;
+
+import eu.stratosphere.runtime.io.api.RoundRobinChannelSelector;
+import org.junit.Test;
+
+import eu.stratosphere.core.io.StringRecord;
+
+/**
+ * This class checks the functionality of the {@link RoundRobinChannelSelector} class.
+ *
+ */
+public class DefaultChannelSelectorTest {
+
+ /**
+ * This test checks the channel selection
+ */
+ @Test
+ public void channelSelect() {
+
+ final StringRecord dummyRecord = new StringRecord("abc");
+ final RoundRobinChannelSelector<StringRecord> selector = new RoundRobinChannelSelector<StringRecord>();
+ // Test with two channels
+ final int numberOfOutputChannels = 2;
+ int[] selectedChannels = selector.selectChannels(dummyRecord, numberOfOutputChannels);
+ assertEquals(1, selectedChannels.length);
+ assertEquals(1, selectedChannels[0]);
+ selectedChannels = selector.selectChannels(dummyRecord, numberOfOutputChannels);
+ assertEquals(1, selectedChannels.length);
+ assertEquals(0, selectedChannels[0]);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/library/FileLineReadWriteTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/library/FileLineReadWriteTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/library/FileLineReadWriteTest.java
new file mode 100644
index 0000000..17c2f58
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/library/FileLineReadWriteTest.java
@@ -0,0 +1,136 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.library;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import java.io.File;
+import java.io.IOException;
+
+import eu.stratosphere.runtime.io.api.RecordWriter;
+import eu.stratosphere.nephele.util.FileLineReader;
+import eu.stratosphere.nephele.util.FileLineWriter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.core.fs.FileInputSplit;
+import eu.stratosphere.core.fs.Path;
+import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.nephele.execution.Environment;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.nephele.template.InputSplitProvider;
+
+/**
+ * This class checks the functionality of the {@link eu.stratosphere.nephele.util.FileLineReader} and the {@link eu.stratosphere.nephele.util.FileLineWriter} class.
+ *
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FileLineReader.class)
+public class FileLineReadWriteTest {
+
+ @Mock
+ private Environment environment;
+
+ @Mock
+ private Configuration conf;
+
+ @Mock
+ private RecordReader<StringRecord> recordReader;
+
+ @Mock
+ private RecordWriter<StringRecord> recordWriter;
+
+ @Mock
+ private InputSplitProvider inputSplitProvider;
+
+ private File file = new File("./tmp");
+
+ /**
+ * Set up mocks
+ *
+ * @throws IOException
+ */
+ @Before
+ public void before() throws Exception {
+
+ MockitoAnnotations.initMocks(this);
+ }
+
+ /**
+ * remove the temporary file
+ */
+ @After
+ public void after() {
+ this.file.delete();
+ }
+
+ /**
+ * Tests the read and write methods
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testReadWrite() throws Exception {
+
+ this.file.createNewFile();
+ FileLineWriter writer = new FileLineWriter();
+ Whitebox.setInternalState(writer, "environment", this.environment);
+ Whitebox.setInternalState(writer, "input", this.recordReader);
+ when(this.environment.getTaskConfiguration()).thenReturn(this.conf);
+
+ when(this.conf.getString("outputPath", null)).thenReturn(this.file.toURI().toString());
+ when(this.recordReader.hasNext()).thenReturn(true, true, true, false);
+ StringRecord in = new StringRecord("abc");
+ try {
+ when(this.recordReader.next()).thenReturn(in);
+ } catch (IOException e) {
+ fail();
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ fail();
+ e.printStackTrace();
+ }
+ writer.invoke();
+
+ final FileInputSplit split = new FileInputSplit(0, new Path(this.file.toURI().toString()), 0,
+ this.file.length(), null);
+ when(this.environment.getInputSplitProvider()).thenReturn(this.inputSplitProvider);
+ when(this.inputSplitProvider.getNextInputSplit()).thenReturn(split, (FileInputSplit) null);
+
+ FileLineReader reader = new FileLineReader();
+ Whitebox.setInternalState(reader, "environment", this.environment);
+ Whitebox.setInternalState(reader, "output", this.recordWriter);
+ StringRecord record = mock(StringRecord.class);
+
+ whenNew(StringRecord.class).withNoArguments().thenReturn(record);
+
+ reader.invoke();
+
+ // verify the correct bytes have been written and read
+ verify(record, times(3)).set(in.getBytes());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReaderWriterTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReaderWriterTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReaderWriterTest.java
new file mode 100644
index 0000000..b8914a8
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReaderWriterTest.java
@@ -0,0 +1,394 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.envelope;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.nephele.util.DiscardingRecycler;
+import eu.stratosphere.nephele.util.TestBufferProvider;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProviderBroker;
+import eu.stratosphere.runtime.io.BufferRecycler;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+public class EnvelopeReaderWriterTest {
+
+ private final long RANDOM_SEED = 520346508276087l;
+
+ private static final int BUFFER_SIZE = 32768;
+
+ private static final byte BUFFER_CONTENT = 13;
+
+ private final int[] BUFFER_SIZES = { 0, 2, BUFFER_SIZE, 3782, 88, 0, 23};
+
+ private final AbstractEvent[][] EVENT_LISTS = {
+ {},
+ {},
+ {},
+ { new TestEvent1(34872527) },
+ { new TestEvent1(8749653), new TestEvent1(365345) },
+ { new TestEvent2(34563456), new TestEvent1(598432), new TestEvent2(976293845) },
+ {}
+ };
+
+ @Test
+ public void testWriteAndRead() {
+
+ Assert.assertTrue("Test broken.", BUFFER_SIZES.length == EVENT_LISTS.length);
+
+ File testFile = null;
+ RandomAccessFile raf = null;
+ try {
+ testFile = File.createTempFile("envelopes", ".tmp");
+ raf = new RandomAccessFile(testFile, "rw");
+
+ // write
+ FileChannel c = raf.getChannel();
+ writeEnvelopes(c);
+
+ // read
+ c.position(0);
+ readEnvelopes(c, -1.0f);
+ c.close();
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ finally {
+ if (raf != null)
+ try { raf.close(); } catch (Throwable t) {}
+
+ if (testFile != null)
+ testFile.delete();
+ }
+ }
+
+ @Test
+ public void testWriteAndReadChunked() {
+
+ Assert.assertTrue("Test broken.", BUFFER_SIZES.length == EVENT_LISTS.length);
+
+ File testFile = null;
+ RandomAccessFile raf = null;
+ try {
+ testFile = File.createTempFile("envelopes", ".tmp");
+ raf = new RandomAccessFile(testFile, "rw");
+
+ // write
+ FileChannel c = raf.getChannel();
+ writeEnvelopes(new ChunkedWriteableChannel(c));
+
+ // read
+ c.position(0);
+ readEnvelopes(new ChunkedReadableChannel(c), 0.75f);
+ c.close();
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ finally {
+ if (raf != null)
+ try { raf.close(); } catch (Throwable t) {}
+
+ if (testFile != null)
+ testFile.delete();
+ }
+ }
+
+ private void writeEnvelopes(WritableByteChannel channel) throws IOException {
+
+ final BufferRecycler recycler = new DiscardingRecycler();
+ final Random rand = new Random(RANDOM_SEED);
+
+ final EnvelopeWriter serializer = new EnvelopeWriter();
+
+ final int NUM_ENVS = BUFFER_SIZES.length;
+
+ for (int i = 0; i < NUM_ENVS; i++) {
+ int seqNum = Math.abs(rand.nextInt());
+ JobID jid = new JobID(rand.nextLong(), rand.nextLong());
+ ChannelID sid = new ChannelID(rand.nextLong(), rand.nextLong());
+
+ Envelope env = new Envelope(seqNum, jid, sid);
+ if (EVENT_LISTS[i].length > 0) {
+ env.serializeEventList(Arrays.asList(EVENT_LISTS[i]));
+ }
+
+ int bufferSize = BUFFER_SIZES[i];
+ if (bufferSize > 0) {
+ MemorySegment ms = new MemorySegment(new byte[BUFFER_SIZE]);
+ for (int x = 0; x < bufferSize; x++) {
+ ms.put(x, BUFFER_CONTENT);
+ }
+
+ Buffer mb = new Buffer(ms, bufferSize, recycler);
+ env.setBuffer(mb);
+ }
+
+ serializer.setEnvelopeForWriting(env);
+
+ while (serializer.writeNextChunk(channel));
+ }
+ }
+
+ private void readEnvelopes(ReadableByteChannel channel, float probabilityForNoBufferCurrently) throws IOException {
+
+ final Random rand = new Random(RANDOM_SEED);
+
+ final EnvelopeReader reader = new EnvelopeReader(new OneForAllBroker(BUFFER_SIZE, probabilityForNoBufferCurrently));
+
+ final int NUM_ENVS = BUFFER_SIZES.length;
+
+ for (int i = 0; i < NUM_ENVS; i++) {
+ int expectedSeqNum = Math.abs(rand.nextInt());
+ JobID expectedJid = new JobID(rand.nextLong(), rand.nextLong());
+ ChannelID expectedSid = new ChannelID(rand.nextLong(), rand.nextLong());
+
+ // read the next envelope
+ while (reader.readNextChunk(channel) != EnvelopeReader.DeserializationState.COMPLETE);
+ Envelope env = reader.getFullyDeserializedTransferEnvelope();
+
+ // check the basic fields from the header
+ Assert.assertEquals(expectedSeqNum, env.getSequenceNumber());
+ Assert.assertEquals(expectedJid, env.getJobID());
+ Assert.assertEquals(expectedSid, env.getSource());
+
+ // check the events
+ List<? extends AbstractEvent> events = env.deserializeEvents();
+ Assert.assertEquals(EVENT_LISTS[i].length, events.size());
+
+ for (int n = 0; n < EVENT_LISTS[i].length; n++) {
+ AbstractEvent expectedEvent = EVENT_LISTS[i][n];
+ AbstractEvent actualEvent = events.get(n);
+
+ Assert.assertEquals(expectedEvent.getClass(), actualEvent.getClass());
+ Assert.assertEquals(expectedEvent, actualEvent);
+ }
+
+ // check the buffer
+ Buffer buf = env.getBuffer();
+ if (buf == null) {
+ Assert.assertTrue(BUFFER_SIZES[i] == 0);
+ } else {
+ Assert.assertEquals(BUFFER_SIZES[i], buf.size());
+ for (int k = 0; k < BUFFER_SIZES[i]; k++) {
+ Assert.assertEquals(BUFFER_CONTENT, buf.getMemorySegment().get(k));
+ }
+ }
+
+ reader.reset();
+ }
+
+ }
+
+
+ public static final class TestEvent1 extends AbstractEvent {
+
+ private long id;
+
+ public TestEvent1() {}
+
+ public TestEvent1(long id) {
+ this.id = id;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(id);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ id = in.readLong();
+ }
+
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == TestEvent1.class) {
+ return ((TestEvent1) obj).id == this.id;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return ((int) id) ^ ((int) (id >>> 32));
+ }
+
+ @Override
+ public String toString() {
+ return "TestEvent1 (" + id + ")";
+ }
+ }
+
+ public static final class TestEvent2 extends AbstractEvent {
+
+ private long id;
+
+ public TestEvent2() {}
+
+ public TestEvent2(long id) {
+ this.id = id;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(id);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ id = in.readLong();
+ }
+
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == TestEvent2.class) {
+ return ((TestEvent2) obj).id == this.id;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return ((int) id) ^ ((int) (id >>> 32));
+ }
+
+ @Override
+ public String toString() {
+ return "TestEvent2 (" + id + ")";
+ }
+ }
+
+ private static final class ChunkedWriteableChannel implements WritableByteChannel {
+
+ private final WritableByteChannel delegate;
+
+ private final Random rnd;
+
+ private ChunkedWriteableChannel(WritableByteChannel delegate) {
+ this.delegate = delegate;
+ this.rnd = new Random();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return this.delegate.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.delegate.close();
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ final int available = src.remaining();
+ final int oldLimit = src.limit();
+
+ int toWrite = rnd.nextInt(available) + 1;
+ toWrite = Math.min(Math.max(toWrite, 8), available);
+
+ src.limit(src.position() + toWrite);
+
+ int written = this.delegate.write(src);
+
+ src.limit(oldLimit);
+
+ return written;
+ }
+ }
+
+ private static final class ChunkedReadableChannel implements ReadableByteChannel {
+
+ private final ReadableByteChannel delegate;
+
+ private final Random rnd;
+
+ private ChunkedReadableChannel(ReadableByteChannel delegate) {
+ this.delegate = delegate;
+ this.rnd = new Random();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return this.delegate.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.delegate.close();
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ final int available = dst.remaining();
+ final int oldLimit = dst.limit();
+
+ int toRead = rnd.nextInt(available) + 1;
+ toRead = Math.min(Math.max(toRead, 8), available);
+
+ dst.limit(dst.position() + toRead);
+
+ int read = this.delegate.read(dst);
+
+ dst.limit(oldLimit);
+
+ return read;
+ }
+ }
+
+ private static final class OneForAllBroker implements BufferProviderBroker {
+
+ private final TestBufferProvider provider;
+
+ private OneForAllBroker(int sizeOfMemorySegments) {
+ this.provider = new TestBufferProvider(sizeOfMemorySegments);
+ }
+
+ private OneForAllBroker(int sizeOfMemorySegments, float probabilityForNoBufferCurrently) {
+ this.provider = new TestBufferProvider(sizeOfMemorySegments, probabilityForNoBufferCurrently);
+ }
+
+ @Override
+ public BufferProvider getBufferProvider(JobID jobID, ChannelID sourceChannelID) {
+ return this.provider;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/DataInputOutputSerializerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/DataInputOutputSerializerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/DataInputOutputSerializerTest.java
new file mode 100644
index 0000000..55c0243
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/DataInputOutputSerializerTest.java
@@ -0,0 +1,115 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.runtime.io.serialization.types.SerializationTestType;
+import eu.stratosphere.runtime.io.serialization.types.SerializationTestTypeFactory;
+import eu.stratosphere.runtime.io.serialization.types.Util;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+
+public class DataInputOutputSerializerTest {
+
+ @Test
+ public void testWrapAsByteBuffer() {
+ SerializationTestType randomInt = Util.randomRecord(SerializationTestTypeFactory.INT);
+
+ DataOutputSerializer serializer = new DataOutputSerializer(randomInt.length());
+ MemorySegment segment = new MemorySegment(new byte[randomInt.length()]);
+
+ try {
+ // empty buffer, read buffer should be empty
+ ByteBuffer wrapper = serializer.wrapAsByteBuffer();
+
+ Assert.assertEquals(0, wrapper.position());
+ Assert.assertEquals(0, wrapper.limit());
+
+ // write to data output, read buffer should still be empty
+ randomInt.write(serializer);
+
+ Assert.assertEquals(0, wrapper.position());
+ Assert.assertEquals(0, wrapper.limit());
+
+ // get updated read buffer, read buffer should contain written data
+ wrapper = serializer.wrapAsByteBuffer();
+
+ Assert.assertEquals(0, wrapper.position());
+ Assert.assertEquals(randomInt.length(), wrapper.limit());
+
+ // clear data output, read buffer should still contain written data
+ serializer.clear();
+
+ Assert.assertEquals(0, wrapper.position());
+ Assert.assertEquals(randomInt.length(), wrapper.limit());
+
+ // get updated read buffer, should be empty
+ wrapper = serializer.wrapAsByteBuffer();
+
+ Assert.assertEquals(0, wrapper.position());
+ Assert.assertEquals(0, wrapper.limit());
+
+ // write to data output and read back to memory
+ randomInt.write(serializer);
+ wrapper = serializer.wrapAsByteBuffer();
+
+ segment.put(0, wrapper, randomInt.length());
+
+ Assert.assertEquals(randomInt.length(), wrapper.position());
+ Assert.assertEquals(randomInt.length(), wrapper.limit());
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail("Test encountered an unexpected exception.");
+ }
+ }
+
+ @Test
+ public void testRandomValuesWriteRead() {
+ final int numElements = 100000;
+ final ArrayDeque<SerializationTestType> reference = new ArrayDeque<SerializationTestType>();
+
+ DataOutputSerializer serializer = new DataOutputSerializer(1);
+
+ for (SerializationTestType value : Util.randomRecords(numElements)) {
+ reference.add(value);
+
+ try {
+ value.write(serializer);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail("Test encountered an unexpected exception.");
+ }
+ }
+
+ DataInputDeserializer deserializer = new DataInputDeserializer(serializer.wrapAsByteBuffer());
+
+ for (SerializationTestType expected : reference) {
+ try {
+ SerializationTestType actual = expected.getClass().newInstance();
+ actual.read(deserializer);
+
+ Assert.assertEquals(expected, actual);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test encountered an unexpected exception.");
+ }
+ }
+
+ reference.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java
new file mode 100644
index 0000000..817c0e6
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/PagedViewsTest.java
@@ -0,0 +1,160 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.nephele.services.memorymanager.AbstractPagedInputView;
+import eu.stratosphere.nephele.services.memorymanager.AbstractPagedOutputView;
+import eu.stratosphere.runtime.io.serialization.types.SerializationTestType;
+import eu.stratosphere.runtime.io.serialization.types.SerializationTestTypeFactory;
+import eu.stratosphere.runtime.io.serialization.types.Util;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class PagedViewsTest {
+
+ @Test
+ public void testSequenceOfIntegersWithAlignedBuffers() {
+ try {
+ final int NUM_INTS = 1000000;
+
+ testSequenceOfTypes(Util.randomRecords(NUM_INTS, SerializationTestTypeFactory.INT), 2048);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Test encountered an unexpected exception.");
+ }
+ }
+
+ @Test
+ public void testSequenceOfIntegersWithUnalignedBuffers() {
+ try {
+ final int NUM_INTS = 1000000;
+
+ testSequenceOfTypes(Util.randomRecords(NUM_INTS, SerializationTestTypeFactory.INT), 2047);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Test encountered an unexpected exception.");
+ }
+ }
+
+ @Test
+ public void testRandomTypes() {
+ try {
+ final int NUM_TYPES = 100000;
+
+ // test with an odd buffer size to force many unaligned cases
+ testSequenceOfTypes(Util.randomRecords(NUM_TYPES), 57);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Test encountered an unexpected exception.");
+ }
+ }
+
+ private static void testSequenceOfTypes(Iterable<SerializationTestType> sequence, int segmentSize) throws Exception {
+
+ List<SerializationTestType> elements = new ArrayList<SerializationTestType>(512);
+ TestOutputView outView = new TestOutputView(segmentSize);
+
+ // write
+ for (SerializationTestType type : sequence) {
+ // serialize the record
+ type.write(outView);
+ elements.add(type);
+ }
+ outView.close();
+
+ // check the records
+ TestInputView inView = new TestInputView(outView.segments);
+
+ for (SerializationTestType reference : elements) {
+ SerializationTestType result = reference.getClass().newInstance();
+ result.read(inView);
+ assertEquals(reference, result);
+ }
+ }
+
+ // ============================================================================================
+
+ private static final class SegmentWithPosition {
+
+ private final MemorySegment segment;
+ private final int position;
+
+ public SegmentWithPosition(MemorySegment segment, int position) {
+ this.segment = segment;
+ this.position = position;
+ }
+ }
+
+ private static final class TestOutputView extends AbstractPagedOutputView {
+
+ private final List<SegmentWithPosition> segments = new ArrayList<SegmentWithPosition>();
+
+ private final int segmentSize;
+
+ private TestOutputView(int segmentSize) {
+ super(new MemorySegment(new byte[segmentSize]), segmentSize, 0);
+
+ this.segmentSize = segmentSize;
+ }
+
+ @Override
+ protected MemorySegment nextSegment(MemorySegment current, int positionInCurrent) throws IOException {
+ segments.add(new SegmentWithPosition(current, positionInCurrent));
+ return new MemorySegment(new byte[segmentSize]);
+ }
+
+ public void close() {
+ segments.add(new SegmentWithPosition(getCurrentSegment(), getCurrentPositionInSegment()));
+ }
+ }
+
+ private static final class TestInputView extends AbstractPagedInputView {
+
+ private final List<SegmentWithPosition> segments;
+
+ private int num;
+
+
+ private TestInputView(List<SegmentWithPosition> segments) {
+ super(segments.get(0).segment, segments.get(0).position, 0);
+
+ this.segments = segments;
+ this.num = 0;
+ }
+
+ @Override
+ protected MemorySegment nextSegment(MemorySegment current) throws IOException {
+ num++;
+ if (num < segments.size()) {
+ return segments.get(num).segment;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ protected int getLimitForSegment(MemorySegment segment) {
+ return segments.get(num).position;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializationTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializationTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializationTest.java
new file mode 100644
index 0000000..094e597
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializationTest.java
@@ -0,0 +1,164 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.serialization.RecordDeserializer.DeserializationResult;
+import eu.stratosphere.runtime.io.serialization.types.SerializationTestType;
+import eu.stratosphere.runtime.io.serialization.types.SerializationTestTypeFactory;
+import eu.stratosphere.runtime.io.serialization.types.Util;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+
+public class SpanningRecordSerializationTest {
+
+ @Test
+ public void testIntRecordsSpanningMultipleSegments() {
+ final int SEGMENT_SIZE = 1;
+ final int NUM_VALUES = 10;
+
+ try {
+ test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test encountered an unexpected exception.");
+ }
+ }
+
+ @Test
+ public void testIntRecordsWithAlignedBuffers () {
+ final int SEGMENT_SIZE = 64;
+ final int NUM_VALUES = 64;
+
+ try {
+ test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test encountered an unexpected exception.");
+ }
+ }
+
+ @Test
+ public void testIntRecordsWithUnalignedBuffers () {
+ final int SEGMENT_SIZE = 31;
+ final int NUM_VALUES = 248;
+
+ try {
+ test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test encountered an unexpected exception.");
+ }
+ }
+
+ @Test
+ public void testRandomRecords () {
+ final int SEGMENT_SIZE = 127;
+ final int NUM_VALUES = 10000;
+
+ try {
+ test(Util.randomRecords(NUM_VALUES), SEGMENT_SIZE);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test encountered an unexpected exception.");
+ }
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+
+ /**
+ * Iterates over the provided records and tests whether {@link SpanningRecordSerializer} and {@link AdaptiveSpanningRecordDeserializer}
+ * interact as expected.
+ * <p>
+ * Only a single {@link MemorySegment} will be allocated.
+ *
+ * @param records records to test
+ * @param segmentSize size for the {@link MemorySegment}
+ */
+ private void test (Util.MockRecords records, int segmentSize) throws Exception {
+ final int SERIALIZATION_OVERHEAD = 4; // length encoding
+
+ final RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
+ final RecordDeserializer<SerializationTestType> deserializer = new AdaptiveSpanningRecordDeserializer<SerializationTestType>();
+
+ final Buffer buffer = new Buffer(new MemorySegment(new byte[segmentSize]), segmentSize, null);
+
+ final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<SerializationTestType>();
+
+ // -------------------------------------------------------------------------------------------------------------
+
+ serializer.setNextBuffer(buffer);
+
+ int numBytes = 0;
+ int numRecords = 0;
+ for (SerializationTestType record : records) {
+
+ serializedRecords.add(record);
+
+ numRecords++;
+ numBytes += record.length() + SERIALIZATION_OVERHEAD;
+
+ // serialize record
+ if (serializer.addRecord(record).isFullBuffer()) {
+ // buffer is full => start deserializing
+ deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), segmentSize);
+
+ while (!serializedRecords.isEmpty()) {
+ SerializationTestType expected = serializedRecords.poll();
+ SerializationTestType actual = expected.getClass().newInstance();
+
+ if (deserializer.getNextRecord(actual).isFullRecord()) {
+ Assert.assertEquals(expected, actual);
+ numRecords--;
+ } else {
+ serializedRecords.addFirst(expected);
+ break;
+ }
+ }
+
+ while (serializer.setNextBuffer(buffer).isFullBuffer()) {
+ deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), segmentSize);
+ }
+
+
+
+ }
+ }
+
+ // deserialize left over records
+ deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), (numBytes % segmentSize));
+
+ serializer.clear();
+
+ while (!serializedRecords.isEmpty()) {
+ SerializationTestType expected = serializedRecords.poll();
+
+ SerializationTestType actual = expected.getClass().newInstance();
+ DeserializationResult result = deserializer.getNextRecord(actual);
+
+ Assert.assertTrue(result.isFullRecord());
+ Assert.assertEquals(expected, actual);
+ numRecords--;
+ }
+
+
+ // assert that all records have been serialized and deserialized
+ Assert.assertEquals(0, numRecords);
+ Assert.assertFalse(serializer.hasData());
+ Assert.assertFalse(deserializer.hasUnfinishedData());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializerTest.java
new file mode 100644
index 0000000..637b7d5
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializerTest.java
@@ -0,0 +1,219 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.serialization.RecordSerializer.SerializationResult;
+import eu.stratosphere.runtime.io.serialization.types.SerializationTestType;
+import eu.stratosphere.runtime.io.serialization.types.SerializationTestTypeFactory;
+import eu.stratosphere.runtime.io.serialization.types.Util;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class SpanningRecordSerializerTest {
+
+ @Test
+ public void testHasData() {
+ final int SEGMENT_SIZE = 16;
+
+ final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
+ final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), SEGMENT_SIZE, null);
+ final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT);
+
+ Assert.assertFalse(serializer.hasData());
+
+ try {
+ serializer.addRecord(randomIntRecord);
+ Assert.assertTrue(serializer.hasData());
+
+ serializer.setNextBuffer(buffer);
+ Assert.assertTrue(serializer.hasData());
+
+ serializer.clear();
+ Assert.assertFalse(serializer.hasData());
+
+ serializer.setNextBuffer(buffer);
+
+ serializer.addRecord(randomIntRecord);
+ Assert.assertTrue(serializer.hasData());
+
+ serializer.addRecord(randomIntRecord);
+ Assert.assertTrue(serializer.hasData());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ @Test
+ public void testEmptyRecords() {
+ final int SEGMENT_SIZE = 11;
+
+ final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
+ final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), SEGMENT_SIZE, null);
+
+ try {
+ Assert.assertEquals(SerializationResult.FULL_RECORD, serializer.setNextBuffer(buffer));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ try {
+ SerializationTestType emptyRecord = new SerializationTestType() {
+ @Override
+ public SerializationTestType getRandom(Random rnd) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int length() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ }
+
+ @Override
+ public int hashCode() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ SerializationResult result = serializer.addRecord(emptyRecord);
+ Assert.assertEquals(SerializationResult.FULL_RECORD, result);
+
+ result = serializer.addRecord(emptyRecord);
+ Assert.assertEquals(SerializationResult.FULL_RECORD, result);
+
+ result = serializer.addRecord(emptyRecord);
+ Assert.assertEquals(SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result);
+
+ result = serializer.setNextBuffer(buffer);
+ Assert.assertEquals(SerializationResult.FULL_RECORD, result);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testIntRecordsSpanningMultipleSegments() {
+ final int SEGMENT_SIZE = 1;
+ final int NUM_VALUES = 10;
+
+ try {
+ test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test encountered an unexpected exception.");
+ }
+ }
+
+ @Test
+ public void testIntRecordsWithAlignedSegments() {
+ final int SEGMENT_SIZE = 64;
+ final int NUM_VALUES = 64;
+
+ try {
+ test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test encountered an unexpected exception.");
+ }
+ }
+
+ @Test
+ public void testIntRecordsWithUnalignedSegments() {
+ final int SEGMENT_SIZE = 31;
+ final int NUM_VALUES = 248; // least common multiple => last record should align
+
+ try {
+ test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test encountered an unexpected exception.");
+ }
+ }
+
+ @Test
+ public void testRandomRecords() {
+ final int SEGMENT_SIZE = 127;
+ final int NUM_VALUES = 100000;
+
+ try {
+ test(Util.randomRecords(NUM_VALUES), SEGMENT_SIZE);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test encountered an unexpected exception.");
+ }
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+
+ /**
+ * Iterates over the provided records and tests whether the {@link SpanningRecordSerializer} returns the expected
+ * {@link SerializationResult} values.
+ * <p>
+ * Only a single {@link MemorySegment} will be allocated.
+ *
+ * @param records records to test
+ * @param segmentSize size for the {@link MemorySegment}
+ */
+ private void test(Util.MockRecords records, int segmentSize) throws Exception {
+ final int SERIALIZATION_OVERHEAD = 4; // length encoding
+
+ final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
+ final Buffer buffer = new Buffer(new MemorySegment(new byte[segmentSize]), segmentSize, null);
+
+ // -------------------------------------------------------------------------------------------------------------
+
+ serializer.setNextBuffer(buffer);
+
+ int numBytes = 0;
+ for (SerializationTestType record : records) {
+ SerializationResult result = serializer.addRecord(record);
+ numBytes += record.length() + SERIALIZATION_OVERHEAD;
+
+ if (numBytes < segmentSize) {
+ Assert.assertEquals(SerializationResult.FULL_RECORD, result);
+ } else if (numBytes == segmentSize) {
+ Assert.assertEquals(SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL, result);
+ serializer.setNextBuffer(buffer);
+ numBytes = 0;
+ } else {
+ Assert.assertEquals(SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result);
+
+ while (result.isFullBuffer()) {
+ numBytes -= segmentSize;
+ result = serializer.setNextBuffer(buffer);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/AsciiStringType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/AsciiStringType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/AsciiStringType.java
new file mode 100644
index 0000000..7aadc7c
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/AsciiStringType.java
@@ -0,0 +1,77 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class AsciiStringType implements SerializationTestType {
+
+ private static final int MAX_LEN = 1500;
+
+ public String value;
+
+ public AsciiStringType() {
+ this.value = "";
+ }
+
+ private AsciiStringType(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public AsciiStringType getRandom(Random rnd) {
+ final StringBuilder bld = new StringBuilder();
+ final int len = rnd.nextInt(MAX_LEN + 1);
+
+ for (int i = 0; i < len; i++) {
+ // 1--127
+ bld.append((char) (rnd.nextInt(126) + 1));
+ }
+
+ return new AsciiStringType(bld.toString());
+ }
+
+ @Override
+ public int length() {
+ return value.getBytes().length + 2;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(this.value);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ this.value = in.readUTF();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.value.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof AsciiStringType) {
+ AsciiStringType other = (AsciiStringType) obj;
+ return this.value.equals(other.value);
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/BooleanType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/BooleanType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/BooleanType.java
new file mode 100644
index 0000000..32b2ba3
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/BooleanType.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class BooleanType implements SerializationTestType {
+
+ private boolean value;
+
+ public BooleanType() {
+ this.value = false;
+ }
+
+ private BooleanType(boolean value) {
+ this.value = value;
+ }
+
+ @Override
+ public BooleanType getRandom(Random rnd) {
+ return new BooleanType(rnd.nextBoolean());
+ }
+
+ @Override
+ public int length() {
+ return 1;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(this.value);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ this.value = in.readBoolean();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.value ? 1 : 0;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof BooleanType) {
+ BooleanType other = (BooleanType) obj;
+ return this.value == other.value;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteArrayType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteArrayType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteArrayType.java
new file mode 100644
index 0000000..cb29a5c
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteArrayType.java
@@ -0,0 +1,76 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+public class ByteArrayType implements SerializationTestType {
+
+ private static final int MAX_LEN = 512 * 15;
+
+ private byte[] data;
+
+ public ByteArrayType() {
+ this.data = new byte[0];
+ }
+
+ public ByteArrayType(byte[] data) {
+ this.data = data;
+ }
+
+ @Override
+ public ByteArrayType getRandom(Random rnd) {
+ final int len = rnd.nextInt(MAX_LEN) + 1;
+ final byte[] data = new byte[len];
+ rnd.nextBytes(data);
+ return new ByteArrayType(data);
+ }
+
+ @Override
+ public int length() {
+ return data.length + 4;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(this.data.length);
+ out.write(this.data);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ final int len = in.readInt();
+ this.data = new byte[len];
+ in.readFully(this.data);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(this.data);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ByteArrayType) {
+ ByteArrayType other = (ByteArrayType) obj;
+ return Arrays.equals(this.data, other.data);
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteSubArrayType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteSubArrayType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteSubArrayType.java
new file mode 100644
index 0000000..2b683d2
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteSubArrayType.java
@@ -0,0 +1,91 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+public class ByteSubArrayType implements SerializationTestType {
+
+ private static final int MAX_LEN = 512;
+
+ private final byte[] data;
+
+ private int len;
+
+ public ByteSubArrayType() {
+ this.data = new byte[MAX_LEN];
+ this.len = 0;
+ }
+
+ @Override
+ public ByteSubArrayType getRandom(Random rnd) {
+ final int len = rnd.nextInt(MAX_LEN) + 1;
+ final ByteSubArrayType t = new ByteSubArrayType();
+ t.len = len;
+
+ final byte[] data = t.data;
+ for (int i = 0; i < len; i++) {
+ data[i] = (byte) rnd.nextInt(256);
+ }
+
+ return t;
+ }
+
+ @Override
+ public int length() {
+ return len + 4;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(this.len);
+ out.write(this.data, 0, this.len);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ this.len = in.readInt();
+ in.readFully(this.data, 0, this.len);
+ }
+
+ @Override
+ public int hashCode() {
+ final byte[] copy = new byte[this.len];
+ System.arraycopy(this.data, 0, copy, 0, this.len);
+ return Arrays.hashCode(copy);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ByteSubArrayType) {
+ ByteSubArrayType other = (ByteSubArrayType) obj;
+ if (this.len == other.len) {
+ for (int i = 0; i < this.len; i++) {
+ if (this.data[i] != other.data[i]) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteType.java
new file mode 100644
index 0000000..52abdcb
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ByteType.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class ByteType implements SerializationTestType {
+
+ private byte value;
+
+ public ByteType() {
+ this.value = (byte) 0;
+ }
+
+ private ByteType(byte value) {
+ this.value = value;
+ }
+
+ @Override
+ public ByteType getRandom(Random rnd) {
+ return new ByteType((byte) rnd.nextInt(256));
+ }
+
+ @Override
+ public int length() {
+ return 1;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeByte(this.value);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ this.value = in.readByte();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.value;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ByteType) {
+ ByteType other = (ByteType) obj;
+ return this.value == other.value;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/CharType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/CharType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/CharType.java
new file mode 100644
index 0000000..25df737
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/CharType.java
@@ -0,0 +1,68 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class CharType implements SerializationTestType {
+
+ private char value;
+
+ public CharType() {
+ this.value = 0;
+ }
+
+ private CharType(char value) {
+ this.value = value;
+ }
+
+ @Override
+ public CharType getRandom(Random rnd) {
+ return new CharType((char) rnd.nextInt(10000));
+ }
+
+ @Override
+ public int length() {
+ return 2;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeChar(this.value);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ this.value = in.readChar();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.value;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof CharType) {
+ CharType other = (CharType) obj;
+ return this.value == other.value;
+ } else {
+ return false;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/DoubleType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/DoubleType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/DoubleType.java
new file mode 100644
index 0000000..1046e75
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/DoubleType.java
@@ -0,0 +1,68 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class DoubleType implements SerializationTestType {
+
+ private double value;
+
+ public DoubleType() {
+ this.value = 0;
+ }
+
+ private DoubleType(double value) {
+ this.value = value;
+ }
+
+ @Override
+ public DoubleType getRandom(Random rnd) {
+ return new DoubleType(rnd.nextDouble());
+ }
+
+ @Override
+ public int length() {
+ return 8;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeDouble(this.value);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ this.value = in.readDouble();
+ }
+
+ @Override
+ public int hashCode() {
+ final long l = Double.doubleToLongBits(this.value);
+ return (int) (l ^ l >>> 32);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof DoubleType) {
+ DoubleType other = (DoubleType) obj;
+ return Double.doubleToLongBits(this.value) == Double.doubleToLongBits(other.value);
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/FloatType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/FloatType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/FloatType.java
new file mode 100644
index 0000000..4a50873
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/FloatType.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class FloatType implements SerializationTestType {
+
+ private float value;
+
+ public FloatType() {
+ this.value = 0;
+ }
+
+ private FloatType(float value) {
+ this.value = value;
+ }
+
+ @Override
+ public FloatType getRandom(Random rnd) {
+ return new FloatType(rnd.nextFloat());
+ }
+
+ @Override
+ public int length() {
+ return 4;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeFloat(this.value);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ this.value = in.readFloat();
+ }
+
+ @Override
+ public int hashCode() {
+ return Float.floatToIntBits(this.value);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof FloatType) {
+ FloatType other = (FloatType) obj;
+ return Float.floatToIntBits(this.value) == Float.floatToIntBits(other.value);
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/IntType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/IntType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/IntType.java
new file mode 100644
index 0000000..50a3546
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/IntType.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class IntType implements SerializationTestType {
+
+ private int value;
+
+ public IntType() {
+ this.value = 0;
+ }
+
+ public IntType(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public IntType getRandom(Random rnd) {
+ return new IntType(rnd.nextInt());
+ }
+
+ @Override
+ public int length() {
+ return 4;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(this.value);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ this.value = in.readInt();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.value;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof IntType) {
+ IntType other = (IntType) obj;
+ return this.value == other.value;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/LongType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/LongType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/LongType.java
new file mode 100644
index 0000000..1402fb5
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/LongType.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class LongType implements SerializationTestType {
+
+ private long value;
+
+ public LongType() {
+ this.value = 0;
+ }
+
+ private LongType(long value) {
+ this.value = value;
+ }
+
+ @Override
+ public LongType getRandom(Random rnd) {
+ return new LongType(rnd.nextLong());
+ }
+
+ @Override
+ public int length() {
+ return 8;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(this.value);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ this.value = in.readLong();
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (this.value ^ this.value >>> 32);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof LongType) {
+ LongType other = (LongType) obj;
+ return this.value == other.value;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/SerializationTestType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/SerializationTestType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/SerializationTestType.java
new file mode 100644
index 0000000..a827b07
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/SerializationTestType.java
@@ -0,0 +1,26 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.util.Random;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+
+public interface SerializationTestType extends IOReadableWritable {
+
+ public SerializationTestType getRandom(Random rnd);
+
+ public int length();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/SerializationTestTypeFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/SerializationTestTypeFactory.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/SerializationTestTypeFactory.java
new file mode 100644
index 0000000..127a0ec
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/SerializationTestTypeFactory.java
@@ -0,0 +1,40 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+public enum SerializationTestTypeFactory {
+ BOOLEAN(new BooleanType()),
+ BYTE_ARRAY(new ByteArrayType()),
+ BYTE_SUB_ARRAY(new ByteSubArrayType()),
+ BYTE(new ByteType()),
+ CHAR(new CharType()),
+ DOUBLE(new DoubleType()),
+ FLOAT(new FloatType()),
+ INT(new IntType()),
+ LONG(new LongType()),
+ SHORT(new ShortType()),
+ UNSIGNED_BYTE(new UnsignedByteType()),
+ UNSIGNED_SHORT(new UnsignedShortType()),
+ STRING(new AsciiStringType());
+
+ private final SerializationTestType factory;
+
+ SerializationTestTypeFactory(SerializationTestType type) {
+ this.factory = type;
+ }
+
+ public SerializationTestType factory() {
+ return this.factory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ShortType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ShortType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ShortType.java
new file mode 100644
index 0000000..7711e88
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/ShortType.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class ShortType implements SerializationTestType {
+
+ private short value;
+
+ public ShortType() {
+ this.value = (short) 0;
+ }
+
+ private ShortType(short value) {
+ this.value = value;
+ }
+
+ @Override
+ public ShortType getRandom(Random rnd) {
+ return new ShortType((short) rnd.nextInt(65536));
+ }
+
+ @Override
+ public int length() {
+ return 2;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeShort(this.value);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ this.value = in.readShort();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.value;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ShortType) {
+ ShortType other = (ShortType) obj;
+ return this.value == other.value;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/UnsignedByteType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/UnsignedByteType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/UnsignedByteType.java
new file mode 100644
index 0000000..9a1f1fb
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/UnsignedByteType.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class UnsignedByteType implements SerializationTestType {
+
+ private int value;
+
+ public UnsignedByteType() {
+ this.value = 0;
+ }
+
+ private UnsignedByteType(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public UnsignedByteType getRandom(Random rnd) {
+ return new UnsignedByteType(rnd.nextInt(128) + 128);
+ }
+
+ @Override
+ public int length() {
+ return 1;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeByte(this.value);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ this.value = in.readUnsignedByte();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.value;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof UnsignedByteType) {
+ UnsignedByteType other = (UnsignedByteType) obj;
+ return this.value == other.value;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/UnsignedShortType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/UnsignedShortType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/UnsignedShortType.java
new file mode 100644
index 0000000..ac80ef7
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/UnsignedShortType.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+public class UnsignedShortType implements SerializationTestType {
+
+ private int value;
+
+ public UnsignedShortType() {
+ this.value = 0;
+ }
+
+ private UnsignedShortType(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public UnsignedShortType getRandom(Random rnd) {
+ return new UnsignedShortType(rnd.nextInt(32768) + 32768);
+ }
+
+ @Override
+ public int length() {
+ return 2;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeShort(this.value);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ this.value = in.readUnsignedShort();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.value;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof UnsignedShortType) {
+ UnsignedShortType other = (UnsignedShortType) obj;
+ return this.value == other.value;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/Util.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/Util.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/Util.java
new file mode 100644
index 0000000..ef14f7f
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/serialization/types/Util.java
@@ -0,0 +1,90 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+package eu.stratosphere.runtime.io.serialization.types;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Random;
+
+public class Util {
+
+ private static final long SEED = 64871654635745873L;
+
+ private static Random random = new Random(SEED);
+
+ public static SerializationTestType randomRecord(SerializationTestTypeFactory type) {
+ return type.factory().getRandom(Util.random);
+ }
+
+ public static MockRecords randomRecords(final int numElements, final SerializationTestTypeFactory type) {
+
+ return new MockRecords(numElements) {
+ @Override
+ protected SerializationTestType getRecord() {
+ return type.factory().getRandom(Util.random);
+ }
+ };
+ }
+
+ public static MockRecords randomRecords(final int numElements) {
+
+ return new MockRecords(numElements) {
+ @Override
+ protected SerializationTestType getRecord() {
+ // select random test type factory
+ SerializationTestTypeFactory[] types = SerializationTestTypeFactory.values();
+ int i = Util.random.nextInt(types.length);
+
+ return types[i].factory().getRandom(Util.random);
+ }
+ };
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+ public abstract static class MockRecords implements Iterable<SerializationTestType> {
+
+ private int numRecords;
+
+ public MockRecords(int numRecords) {
+ this.numRecords = numRecords;
+ }
+
+ @Override
+ public Iterator<SerializationTestType> iterator() {
+ return new Iterator<SerializationTestType>() {
+ @Override
+ public boolean hasNext() {
+ return numRecords > 0;
+ }
+
+ @Override
+ public SerializationTestType next() {
+ if (numRecords > 0) {
+ numRecords--;
+
+ return getRecord();
+ }
+
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ abstract protected SerializationTestType getRecord();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java
index db843dd..b80810b 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java
@@ -21,6 +21,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import eu.stratosphere.test.util.RecordAPITestBase;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
import org.junit.Assert;
import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper;
@@ -31,8 +33,6 @@ import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;