You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:30:39 UTC

[04/30] Replace custom Java NIO TCP/IP code with Netty 4 library

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
new file mode 100644
index 0000000..40aefd3
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
@@ -0,0 +1,857 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.netty;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.BufferRecycler;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider.BufferAvailabilityRegistration;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProviderBroker;
+import eu.stratosphere.runtime.io.network.Envelope;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
+import junit.framework.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class InboundEnvelopeDecoderTest {
+
+	@Mock
+	private BufferProvider bufferProvider;
+
+	@Mock
+	private BufferProviderBroker bufferProviderBroker;
+
+	@Before
+	public void initMocks() throws IOException {
+		MockitoAnnotations.initMocks(this);
+	}
+
+	@Test
+	public void testBufferStaging() throws Exception {
+		final InboundEnvelopeDecoder decoder = new InboundEnvelopeDecoder(this.bufferProviderBroker);
+		final EmbeddedChannel ch = new EmbeddedChannel(
+				new OutboundEnvelopeEncoder(),
+				decoder);
+
+		when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
+				.thenReturn(this.bufferProvider);
+
+		// --------------------------------------------------------------------
+
+		Envelope[] envelopes = nextEnvelopes(3, true);
+
+		ByteBuf buf = encode(ch, envelopes);
+
+		when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
+				.thenReturn(BufferAvailabilityRegistration.REGISTERED);
+
+		Buffer buffer = allocBuffer(envelopes[2].getBuffer().size());
+
+		when(this.bufferProvider.requestBuffer(anyInt()))
+				.thenReturn(null, null, buffer, null);
+
+		// --------------------------------------------------------------------
+
+		// slices: [0] => full envelope, [1] => half envelope, [2] => remaining half + full envelope
+		ByteBuf[] slices = slice(buf,
+				OutboundEnvelopeEncoder.HEADER_SIZE + envelopes[0].getBuffer().size(),
+				OutboundEnvelopeEncoder.HEADER_SIZE + envelopes[1].getBuffer().size() / 2);
+
+		// 1. no buffer available, incoming slice contains all data
+		int refCount = slices[0].refCnt();
+
+		decodeAndVerify(ch, slices[0]);
+
+		Assert.assertEquals(refCount + 1, slices[0].refCnt());
+		Assert.assertFalse(ch.config().isAutoRead());
+
+		// notify of available buffer (=> bufferAvailable() callback does return a buffer
+		// of the current network buffer size; the decoder needs to adjust its size to the
+		// requested size
+		decoder.bufferAvailable(allocBuffer(envelopes[0].getBuffer().size() * 2));
+		ch.runPendingTasks();
+
+		Assert.assertEquals(refCount - 1, slices[0].refCnt());
+		Assert.assertTrue(ch.config().isAutoRead());
+
+		decodeAndVerify(ch, envelopes[0]);
+
+		// 2. no buffer available, incoming slice does NOT contain all data
+		refCount = slices[1].refCnt();
+
+		decodeAndVerify(ch, slices[1]);
+
+		Assert.assertEquals(refCount + 1, slices[1].refCnt());
+		Assert.assertFalse(ch.config().isAutoRead());
+
+		decoder.bufferAvailable(allocBuffer());
+		ch.runPendingTasks();
+
+		Assert.assertEquals(refCount - 1, slices[1].refCnt());
+		Assert.assertTrue(ch.config().isAutoRead());
+
+		decodeAndVerify(ch);
+
+		// 3. buffer available
+		refCount = slices[2].refCnt();
+
+		decodeAndVerify(ch, slices[2], envelopes[1], envelopes[2]);
+
+		Assert.assertEquals(refCount - 1, slices[2].refCnt());
+		Assert.assertTrue(ch.config().isAutoRead());
+
+		Assert.assertEquals(1, buf.refCnt());
+		buf.release();
+	}
+
+	@Test
+	public void testBufferStagingStagedBufferException() throws Exception {
+		final EmbeddedChannel ch = new EmbeddedChannel(
+				new OutboundEnvelopeEncoder(),
+				new InboundEnvelopeDecoder(this.bufferProviderBroker));
+
+		when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
+				.thenReturn(this.bufferProvider);
+
+		// --------------------------------------------------------------------
+
+		ByteBuf buf = encode(ch, nextEnvelope(true));
+
+		when(this.bufferProvider.requestBuffer(anyInt()))
+				.thenReturn(null);
+
+		when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
+				.thenReturn(BufferAvailabilityRegistration.REGISTERED);
+
+		// --------------------------------------------------------------------
+
+		int refCount = buf.refCnt();
+
+		decodeAndVerify(ch, buf);
+
+		Assert.assertFalse(ch.config().isAutoRead());
+		Assert.assertEquals(refCount + 1, buf.refCnt());
+
+		try {
+			decodeAndVerify(ch, buf);
+			Assert.fail("Expected IllegalStateException not thrown");
+		} catch (IllegalStateException e) {
+			// expected exception
+		}
+
+		buf.release();
+	}
+
+	@Test
+	public void testBufferAvailabilityRegistrationBufferAvailable() throws Exception {
+		final EmbeddedChannel ch = new EmbeddedChannel(
+				new OutboundEnvelopeEncoder(),
+				new InboundEnvelopeDecoder(this.bufferProviderBroker));
+
+		when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
+				.thenReturn(this.bufferProvider);
+
+		// --------------------------------------------------------------------
+
+		Envelope[] envelopes = new Envelope[]{nextEnvelope(true), nextEnvelope()};
+
+		when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
+				.thenReturn(BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_AVAILABLE);
+
+		when(this.bufferProvider.requestBuffer(anyInt()))
+				.thenReturn(null)
+				.thenReturn(allocBuffer(envelopes[0].getBuffer().size()));
+
+		// --------------------------------------------------------------------
+
+		ByteBuf buf = encode(ch, envelopes);
+
+		decodeAndVerify(ch, buf, envelopes);
+		Assert.assertEquals(0, buf.refCnt());
+	}
+
+	@Test
+	public void testBufferAvailabilityRegistrationBufferPoolDestroyedSkipBytes() throws Exception {
+		final EmbeddedChannel ch = new EmbeddedChannel(
+				new OutboundEnvelopeEncoder(),
+				new InboundEnvelopeDecoder(this.bufferProviderBroker));
+
+		when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
+				.thenReturn(this.bufferProvider);
+
+		when(this.bufferProvider.requestBuffer(anyInt()))
+				.thenReturn(null);
+
+		when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
+				.thenReturn(BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_POOL_DESTROYED);
+
+		// --------------------------------------------------------------------
+
+		Envelope[] envelopes = new Envelope[]{nextEnvelope(true), nextEnvelope(), nextEnvelope()};
+		Envelope[] expectedEnvelopes = new Envelope[]{envelopes[1], envelopes[2]};
+
+		ByteBuf buf = encode(ch, envelopes);
+
+		int bufferSize = envelopes[0].getBuffer().size();
+
+		// --------------------------------------------------------------------
+		// 1) skip in current buffer only
+		// --------------------------------------------------------------------
+		{
+			// skip last bytes in current buffer
+			ByteBuf[] slices = slice(buf, OutboundEnvelopeEncoder.HEADER_SIZE + bufferSize);
+
+			int refCount = slices[0].refCnt();
+			decodeAndVerify(ch, slices[0]);
+			Assert.assertEquals(refCount - 1, slices[0].refCnt());
+
+			refCount = slices[1].refCnt();
+			decodeAndVerify(ch, slices[1], expectedEnvelopes);
+			Assert.assertEquals(refCount - 1, slices[1].refCnt());
+		}
+
+		{
+			// skip bytes in current buffer, leave last 16 bytes from next envelope
+			ByteBuf[] slices = slice(buf, OutboundEnvelopeEncoder.HEADER_SIZE + bufferSize + 16);
+
+			int refCount = slices[0].refCnt();
+			decodeAndVerify(ch, slices[0]);
+			Assert.assertEquals(refCount - 1, slices[0].refCnt());
+
+			refCount = slices[1].refCnt();
+			decodeAndVerify(ch, slices[1], expectedEnvelopes);
+			Assert.assertEquals(refCount - 1, slices[1].refCnt());
+		}
+
+		{
+			// skip bytes in current buffer, then continue with full envelope from same buffer
+			ByteBuf[] slices = slice(buf, OutboundEnvelopeEncoder.HEADER_SIZE + bufferSize + OutboundEnvelopeEncoder.HEADER_SIZE);
+
+			int refCount = slices[0].refCnt();
+			decodeAndVerify(ch, slices[0], expectedEnvelopes[0]);
+			Assert.assertEquals(refCount - 1, slices[0].refCnt());
+
+			refCount = slices[1].refCnt();
+			decodeAndVerify(ch, slices[1], expectedEnvelopes[1]);
+			Assert.assertEquals(refCount - 1, slices[1].refCnt());
+		}
+
+		// --------------------------------------------------------------------
+		// 2) skip in current and next buffer
+		// --------------------------------------------------------------------
+
+		{
+			// skip bytes in current buffer, then continue to skip last 32 bytes in next buffer
+			ByteBuf[] slices = slice(buf, OutboundEnvelopeEncoder.HEADER_SIZE + bufferSize - 32);
+
+			int refCount = slices[0].refCnt();
+			decodeAndVerify(ch, slices[0]);
+			Assert.assertEquals(refCount - 1, slices[0].refCnt());
+
+			refCount = slices[1].refCnt();
+			decodeAndVerify(ch, slices[1], expectedEnvelopes);
+			Assert.assertEquals(refCount - 1, slices[1].refCnt());
+		}
+
+		{
+			// skip bytes in current buffer, then continue to skip in next two buffers
+			ByteBuf[] slices = slice(buf, OutboundEnvelopeEncoder.HEADER_SIZE + bufferSize - 32, 16);
+
+			int refCount = slices[0].refCnt();
+			decodeAndVerify(ch, slices[0]);
+			Assert.assertEquals(refCount - 1, slices[0].refCnt());
+
+			refCount = slices[1].refCnt();
+			decodeAndVerify(ch, slices[1]);
+			Assert.assertEquals(refCount - 1, slices[1].refCnt());
+
+			refCount = slices[2].refCnt();
+			decodeAndVerify(ch, slices[2], expectedEnvelopes);
+			Assert.assertEquals(refCount - 1, slices[2].refCnt());
+		}
+
+		// ref count should be 1, because slices shared the ref count
+		Assert.assertEquals(1, buf.refCnt());
+	}
+
+	@Test
+	public void testEncodeDecode() throws Exception {
+		final EmbeddedChannel ch = new EmbeddedChannel(
+				new OutboundEnvelopeEncoder(), new InboundEnvelopeDecoder(this.bufferProviderBroker));
+
+		when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
+				.thenReturn(this.bufferProvider);
+
+		when(this.bufferProvider.requestBuffer(anyInt())).thenAnswer(new Answer<Object>() {
+			@Override
+			public Object answer(InvocationOnMock invocation) throws Throwable {
+				// fulfill the buffer request
+				return allocBuffer((Integer) invocation.getArguments()[0]);
+			}
+		});
+
+		// --------------------------------------------------------------------
+
+		Envelope[] envelopes = new Envelope[]{
+				nextEnvelope(0),
+				nextEnvelope(2),
+				nextEnvelope(32768),
+				nextEnvelope(3782, new TestEvent1(34872527)),
+				nextEnvelope(88, new TestEvent1(8749653), new TestEvent1(365345)),
+				nextEnvelope(0, new TestEvent2(34563456), new TestEvent1(598432), new TestEvent2(976293845)),
+				nextEnvelope(23)
+		};
+
+		ByteBuf buf = encode(ch, envelopes);
+
+		// 1. complete ByteBuf as input
+		int refCount = buf.retain().refCnt();
+
+		decodeAndVerify(ch, buf, envelopes);
+		Assert.assertEquals(refCount - 1, buf.refCnt());
+
+		// 2. random slices
+		buf.readerIndex(0);
+		ByteBuf[] slices = randomSlices(buf);
+
+		ch.writeInbound(slices);
+
+		for (ByteBuf slice : slices) {
+			Assert.assertEquals(1, slice.refCnt());
+		}
+
+		decodeAndVerify(ch, envelopes);
+
+		buf.release();
+	}
+
+	@Test
+	public void testEncodeDecodeRandomEnvelopes() throws Exception {
+		final InboundEnvelopeDecoder decoder = new InboundEnvelopeDecoder(this.bufferProviderBroker);
+		final EmbeddedChannel ch = new EmbeddedChannel(
+				new OutboundEnvelopeEncoder(), decoder);
+
+		when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
+				.thenReturn(this.bufferProvider);
+
+		when(this.bufferProvider.requestBuffer(anyInt())).thenAnswer(new Answer<Object>() {
+			@Override
+			public Object answer(InvocationOnMock invocation) throws Throwable {
+				// fulfill the buffer request with the requested size
+				return allocBuffer((Integer) invocation.getArguments()[0]);
+			}
+		});
+
+		Random randomAnswerSource = new Random(RANDOM_SEED);
+
+		RandomBufferRequestAnswer randomBufferRequestAnswer = new RandomBufferRequestAnswer(randomAnswerSource);
+
+		RandomBufferAvailabilityRegistrationAnswer randomBufferAvailabilityRegistrationAnswer =
+				new RandomBufferAvailabilityRegistrationAnswer(randomAnswerSource, randomBufferRequestAnswer);
+
+		when(this.bufferProvider.requestBuffer(anyInt())).thenAnswer(randomBufferRequestAnswer);
+
+		when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
+				.thenAnswer(randomBufferAvailabilityRegistrationAnswer);
+
+		// --------------------------------------------------------------------
+
+		Envelope[] envelopes = nextRandomEnvelopes(1024);
+
+		ByteBuf buf = encode(ch, envelopes);
+
+		ByteBuf[] slices = randomSlices(buf);
+
+		for (ByteBuf slice : slices) {
+			int refCount = slice.refCnt();
+			ch.writeInbound(slice);
+
+			// registered BufferAvailabilityListener => call bufferAvailable(buffer)
+			while (randomBufferAvailabilityRegistrationAnswer.isRegistered()) {
+				randomBufferAvailabilityRegistrationAnswer.unregister();
+
+				Assert.assertFalse(ch.config().isAutoRead());
+				Assert.assertEquals(refCount + 1, slice.refCnt());
+
+				// return a buffer of max size => decoder needs to limit buffer size
+				decoder.bufferAvailable(allocBuffer(MAX_BUFFER_SIZE));
+				ch.runPendingTasks();
+			}
+
+			Assert.assertEquals(refCount - 1, slice.refCnt());
+			Assert.assertTrue(ch.config().isAutoRead());
+		}
+
+		Envelope[] expected = randomBufferAvailabilityRegistrationAnswer.removeSkippedEnvelopes(envelopes);
+
+		decodeAndVerify(ch, expected);
+
+		Assert.assertEquals(1, buf.refCnt());
+
+		buf.release();
+	}
+
+	// ========================================================================
+	// helpers
+	// ========================================================================
+
+	private final static long RANDOM_SEED = 520346508276087l;
+
+	private final static Random random = new Random(RANDOM_SEED);
+
+	private final static int[] BUFFER_SIZES = new int[]{8192, 16384, 32768};
+
+	private final static int MAX_BUFFER_SIZE = BUFFER_SIZES[2];
+
+	private final static int MAX_NUM_EVENTS = 5;
+
+	private final static int MAX_SLICE_SIZE = MAX_BUFFER_SIZE / 3;
+
+	private final static int MIN_SLICE_SIZE = 1;
+
+	private final static BufferRecycler RECYCLER = mock(BufferRecycler.class);
+
+	// ------------------------------------------------------------------------
+	// envelopes
+	// ------------------------------------------------------------------------
+
+	private static Buffer allocBuffer() {
+		return allocBuffer(MAX_BUFFER_SIZE);
+	}
+
+	private static Buffer allocBuffer(int bufferSize) {
+		return spy(new Buffer(new MemorySegment(new byte[bufferSize]), bufferSize, RECYCLER));
+	}
+
+	private Envelope nextEnvelope() {
+		return nextEnvelope(false, false);
+	}
+
+	private Envelope nextEnvelope(boolean withBuffer) {
+		return nextEnvelope(withBuffer, false);
+	}
+
+	private Envelope nextEnvelope(int bufferSize, AbstractEvent... events) {
+		Envelope env = new Envelope(random.nextInt(), new JobID(), new ChannelID());
+		if (bufferSize > 0) {
+			byte[] data = new byte[bufferSize];
+			random.nextBytes(data);
+
+			env.setBuffer(spy(new Buffer(new MemorySegment(data), bufferSize, RECYCLER)));
+		}
+
+		if (events != null && events.length > 0) {
+			env.serializeEventList(Arrays.asList(events));
+		}
+
+		return env;
+	}
+
+	private Envelope nextEnvelope(boolean withBuffer, boolean withEvents) {
+		int bufferSize = 0;
+		AbstractEvent[] events = null;
+
+		if (withBuffer) {
+			bufferSize = BUFFER_SIZES[random.nextInt(BUFFER_SIZES.length)];
+		}
+
+		if (withEvents) {
+			events = new AbstractEvent[random.nextInt(MAX_NUM_EVENTS) + 1];
+
+			for (int i = 0; i < events.length; i++) {
+				events[i] = (random.nextBoolean()
+						? new TestEvent1(random.nextLong())
+						: new TestEvent2(random.nextLong()));
+			}
+		}
+
+		return nextEnvelope(bufferSize, events);
+	}
+
+	private Envelope[] nextEnvelopes(int numEnvelopes, boolean withBuffer) {
+		Envelope[] envelopes = new Envelope[numEnvelopes];
+		for (int i = 0; i < numEnvelopes; i++) {
+			envelopes[i] = nextEnvelope(withBuffer, false);
+		}
+		return envelopes;
+	}
+
+	private Envelope[] nextRandomEnvelopes(int numEnvelopes) {
+		Envelope[] envelopes = new Envelope[numEnvelopes];
+		for (int i = 0; i < numEnvelopes; i++) {
+			envelopes[i] = nextEnvelope(random.nextBoolean(), random.nextBoolean());
+		}
+		return envelopes;
+	}
+
+	// ------------------------------------------------------------------------
+	// channel encode/decode
+	// ------------------------------------------------------------------------
+
+	private static ByteBuf encode(EmbeddedChannel ch, Envelope... envelopes) {
+		for (Envelope env : envelopes) {
+			ch.writeOutbound(env);
+
+			if (env.getBuffer() != null) {
+				verify(env.getBuffer(), times(1)).recycleBuffer();
+			}
+		}
+
+		CompositeByteBuf encodedEnvelopes = new CompositeByteBuf(ByteBufAllocator.DEFAULT, false, envelopes.length);
+
+		ByteBuf buf;
+		while ((buf = (ByteBuf) ch.readOutbound()) != null) {
+			encodedEnvelopes.addComponent(buf);
+		}
+
+		return encodedEnvelopes.writerIndex(encodedEnvelopes.capacity());
+	}
+
+	private static void decodeAndVerify(EmbeddedChannel ch, ByteBuf buf, Envelope... expectedEnvelopes) {
+		ch.writeInbound(buf);
+
+		decodeAndVerify(ch, expectedEnvelopes);
+	}
+
+	private static void decodeAndVerify(EmbeddedChannel ch, Envelope... expectedEnvelopes) {
+		if (expectedEnvelopes == null) {
+			Assert.assertNull(ch.readInbound());
+		}
+		else {
+			for (Envelope expected : expectedEnvelopes) {
+				Envelope actual = (Envelope) ch.readInbound();
+
+				if (actual == null) {
+					Assert.fail("No inbound envelope available, but expected one");
+				}
+
+				assertEqualEnvelopes(expected, actual);
+			}
+		}
+	}
+
+	private static void assertEqualEnvelopes(Envelope expected, Envelope actual) {
+		Assert.assertTrue(expected.getSequenceNumber() == actual.getSequenceNumber() &&
+				expected.getJobID().equals(actual.getJobID()) &&
+				expected.getSource().equals(actual.getSource()));
+
+		if (expected.getBuffer() == null) {
+			Assert.assertNull(actual.getBuffer());
+		}
+		else {
+			Assert.assertNotNull(actual.getBuffer());
+
+			ByteBuffer expectedByteBuffer = expected.getBuffer().getMemorySegment().wrap(0, expected.getBuffer().size());
+			ByteBuffer actualByteBuffer = actual.getBuffer().getMemorySegment().wrap(0, actual.getBuffer().size());
+
+			Assert.assertEquals(0, expectedByteBuffer.compareTo(actualByteBuffer));
+		}
+
+		if (expected.getEventsSerialized() == null) {
+			Assert.assertNull(actual.getEventsSerialized());
+		}
+		else {
+			Assert.assertNotNull(actual.getEventsSerialized());
+
+			// this is needed, because the encoding of the byte buffer
+			// alters the state of the buffer
+			expected.getEventsSerialized().clear();
+
+			List<? extends AbstractEvent> expectedEvents = expected.deserializeEvents();
+			List<? extends AbstractEvent> actualEvents = actual.deserializeEvents();
+
+			Assert.assertEquals(expectedEvents.size(), actualEvents.size());
+
+			for (int i = 0; i < expectedEvents.size(); i++) {
+				AbstractEvent expectedEvent = expectedEvents.get(i);
+				AbstractEvent actualEvent = actualEvents.get(i);
+
+				Assert.assertEquals(expectedEvent.getClass(), actualEvent.getClass());
+				Assert.assertEquals(expectedEvent, actualEvent);
+			}
+		}
+	}
+
+	private static ByteBuf[] randomSlices(ByteBuf buf) {
+		List<Integer> sliceSizes = new LinkedList<Integer>();
+
+		if (buf.readableBytes() < MIN_SLICE_SIZE) {
+			throw new IllegalStateException("Buffer to slice is smaller than required minimum slice size");
+		}
+
+		int available = buf.readableBytes() - MIN_SLICE_SIZE;
+
+		while (available > 0) {
+			int size = Math.min(available, Math.max(MIN_SLICE_SIZE, random.nextInt(MAX_SLICE_SIZE) + 1));
+			available -= size;
+			sliceSizes.add(size);
+		}
+
+		int[] slices = new int[sliceSizes.size()];
+		for (int i = 0; i < sliceSizes.size(); i++) {
+			slices[i] = sliceSizes.get(i);
+		}
+
+		return slice(buf, slices);
+	}
+
+	/**
+	 * Returns slices with the specified sizes of the given buffer.
+	 * <p/>
+	 * When given n indexes, n+1 slices will be returned:
+	 * <ul>
+	 * <li>0 - sliceSizes[0]</li>
+	 * <li>sliceSizes[0] - sliceSizes[1]</li>
+	 * <li>...</li>
+	 * <li>sliceSizes[n-1] - buf.capacity()</li>
+	 * </ul>
+	 *
+	 * @return slices with the specified sizes of the given buffer
+	 */
+	private static ByteBuf[] slice(ByteBuf buf, int... sliceSizes) {
+		if (sliceSizes.length == 0) {
+			throw new IllegalStateException("Need to provide at least one slice size");
+		}
+
+		int numSlices = sliceSizes.length;
+		// transform slice sizes to buffer indexes
+		for (int i = 1; i < numSlices; i++) {
+			sliceSizes[i] += sliceSizes[i - 1];
+		}
+
+		for (int i = 0; i < sliceSizes.length - 1; i++) {
+			if (sliceSizes[i] >= sliceSizes[i + 1] || sliceSizes[i] <= 0 || sliceSizes[i] >= buf.capacity()) {
+				throw new IllegalStateException(
+						String.format("Slice size %s are off for %s", Arrays.toString(sliceSizes), buf));
+			}
+		}
+
+		ByteBuf[] slices = new ByteBuf[numSlices + 1];
+
+		// slice at slice indexes
+		slices[0] = buf.slice(0, sliceSizes[0]).retain();
+		for (int i = 1; i < numSlices; i++) {
+			slices[i] = buf.slice(sliceSizes[i - 1], sliceSizes[i] - sliceSizes[i - 1]).retain();
+		}
+		slices[numSlices] = buf.slice(sliceSizes[numSlices - 1], buf.capacity() - sliceSizes[numSlices - 1]).retain();
+
+		return slices;
+	}
+
+	// ------------------------------------------------------------------------
+	// mocking
+	// ------------------------------------------------------------------------
+
+	private static JobID anyJobId() {
+		return Matchers.anyObject();
+	}
+
+	private static ChannelID anyChannelId() {
+		return Matchers.anyObject();
+	}
+
+	// these following two Answer classes are quite ugly, but they allow to implement a randomized
+	// test of encoding and decoding envelopes
+	private static class RandomBufferRequestAnswer implements Answer<Buffer> {
+
+		private final Random random;
+
+		private boolean forced;
+
+		private RandomBufferRequestAnswer(Random random) {
+			this.random = random;
+		}
+
+		@Override
+		public Buffer answer(InvocationOnMock invocation) throws Throwable {
+			if (this.forced) {
+				Buffer toReturn = allocBuffer((Integer) invocation.getArguments()[0]);
+				this.forced = false;
+
+				return toReturn;
+			}
+
+			return this.random.nextBoolean() ? allocBuffer((Integer) invocation.getArguments()[0]) : null;
+		}
+
+		public void forceBufferAvailable() {
+			this.forced = true;
+		}
+	}
+
+	private static class RandomBufferAvailabilityRegistrationAnswer implements Answer<BufferAvailabilityRegistration> {
+
+		private final Random random;
+
+		private final RandomBufferRequestAnswer bufferRequestAnswer;
+
+		private boolean isRegistered = false;
+
+		private int numSkipped;
+
+		private RandomBufferAvailabilityRegistrationAnswer(Random random, RandomBufferRequestAnswer bufferRequestAnswer) {
+			this.random = random;
+			this.bufferRequestAnswer = bufferRequestAnswer;
+		}
+
+		@Override
+		public BufferAvailabilityRegistration answer(InvocationOnMock invocation) throws Throwable {
+			if (this.random.nextBoolean()) {
+				this.isRegistered = true;
+				return BufferAvailabilityRegistration.REGISTERED;
+			}
+			else if (this.random.nextBoolean()) {
+				this.bufferRequestAnswer.forceBufferAvailable();
+				return BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_AVAILABLE;
+			}
+			else {
+				this.numSkipped++;
+				return BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_POOL_DESTROYED;
+			}
+		}
+
+		public Envelope[] removeSkippedEnvelopes(Envelope[] envelopes) {
+			this.random.setSeed(RANDOM_SEED);
+			Envelope[] envelopesWithoutSkipped = new Envelope[envelopes.length - this.numSkipped];
+			int numEnvelopes = 0;
+
+			for (Envelope env : envelopes) {
+				if (env.getBuffer() != null) {
+					// skip envelope if returned NOT_REGISTERED_BUFFER_POOL_DESTROYED
+					if (!this.random.nextBoolean() && !this.random.nextBoolean() && !this.random.nextBoolean()) {
+						continue;
+					}
+				}
+
+				envelopesWithoutSkipped[numEnvelopes++] = env;
+			}
+
+			return envelopesWithoutSkipped;
+		}
+
+		public boolean isRegistered() {
+			return this.isRegistered;
+		}
+
+		public void unregister() {
+			this.isRegistered = false;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static final class TestEvent1 extends AbstractEvent {
+
+		private long id;
+
+		public TestEvent1() {
+		}
+
+		public TestEvent1(long id) {
+			this.id = id;
+		}
+
+		@Override
+		public void write(DataOutput out) throws IOException {
+			out.writeLong(id);
+		}
+
+		@Override
+		public void read(DataInput in) throws IOException {
+			id = in.readLong();
+		}
+
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj.getClass() == TestEvent1.class && ((TestEvent1) obj).id == this.id;
+		}
+
+		@Override
+		public int hashCode() {
+			return ((int) id) ^ ((int) (id >>> 32));
+		}
+
+		@Override
+		public String toString() {
+			return "TestEvent1 (" + id + ")";
+		}
+	}
+
+	public static final class TestEvent2 extends AbstractEvent {
+
+		private long id;
+
+		public TestEvent2() {
+		}
+
+		public TestEvent2(long id) {
+			this.id = id;
+		}
+
+		@Override
+		public void write(DataOutput out) throws IOException {
+			out.writeLong(id);
+		}
+
+		@Override
+		public void read(DataInput in) throws IOException {
+			id = in.readLong();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj.getClass() == TestEvent2.class && ((TestEvent2) obj).id == this.id;
+		}
+
+		@Override
+		public int hashCode() {
+			return ((int) id) ^ ((int) (id >>> 32));
+		}
+
+		@Override
+		public String toString() {
+			return "TestEvent2 (" + id + ")";
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java
new file mode 100644
index 0000000..c424a1f
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -0,0 +1,196 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.netty;
+
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.network.ChannelManager;
+import eu.stratosphere.runtime.io.network.Envelope;
+import eu.stratosphere.runtime.io.network.RemoteReceiver;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+@RunWith(Parameterized.class)
+public class NettyConnectionManagerTest {
+
+	private final static long RANDOM_SEED = 520346508276087l;
+
+	private final static Random random = new Random(RANDOM_SEED);
+
+	private final static int BIND_PORT = 20000;
+
+	private final static int HIGH_WATERMARK = 32 * 1024;
+
+	private int numSubtasks;
+
+	private int numToSendPerSubtask;
+
+	private int numInThreads;
+
+	private int numOutThreads;
+
+	private int numChannels;
+
+	public NettyConnectionManagerTest(int numSubtasks, int numToSendPerSubtask, int numChannels, int numInThreads, int numOutThreads) {
+		this.numSubtasks = numSubtasks;
+		this.numToSendPerSubtask = numToSendPerSubtask;
+		this.numChannels = numChannels;
+		this.numInThreads = numInThreads;
+		this.numOutThreads = numOutThreads;
+	}
+
+	@Parameterized.Parameters
+	public static Collection configure() {
+		return Arrays.asList(
+				new Object[][]{
+						{64, 4096, 1, 1, 1},
+						{128, 2048, 1, 1, 1},
+						{256, 1024, 1, 1, 1},
+						{512, 512, 1, 1, 1},
+						{64, 4096, 4, 1, 1},
+						{128, 2048, 4, 1, 1},
+						{256, 1024, 4, 1, 1},
+						{512, 512, 4, 1, 1},
+						{64, 4096, 4, 2, 2},
+						{128, 2048, 4, 2, 2},
+						{256, 1024, 4, 2, 2},
+						{512, 512, 4, 2, 2}
+				}
+		);
+	}
+
+	@Test
+	public void testEnqueueRaceAndDeadlockFreeMultipleChannels() throws Exception {
+		final InetAddress localhost = InetAddress.getLocalHost();
+		final CountDownLatch latch = new CountDownLatch(this.numSubtasks);
+
+		// --------------------------------------------------------------------
+		// setup
+		// --------------------------------------------------------------------
+		ChannelManager channelManager = mock(ChannelManager.class);
+		doAnswer(new VerifyEnvelopes(latch)).when(channelManager).dispatchFromNetwork(Matchers.<Envelope>anyObject());
+
+		NettyConnectionManager connManagerToTest = new NettyConnectionManager(channelManager, localhost,
+				BIND_PORT, HIGH_WATERMARK, this.numInThreads, this.numOutThreads, -1, -1);
+
+		NettyConnectionManager connManagerReceiver = new NettyConnectionManager(channelManager, localhost,
+				BIND_PORT + 1, HIGH_WATERMARK, this.numInThreads, this.numOutThreads, -1, -1);
+
+		// --------------------------------------------------------------------
+		// start sender threads
+		// --------------------------------------------------------------------
+		RemoteReceiver[] receivers = new RemoteReceiver[this.numChannels];
+
+		for (int i = 0; i < this.numChannels; i++) {
+			receivers[i] = new RemoteReceiver(new InetSocketAddress(localhost, BIND_PORT + 1), i);
+		}
+
+		for (int i = 0; i < this.numSubtasks; i++) {
+			RemoteReceiver receiver = receivers[random.nextInt(this.numChannels)];
+			new Thread(new SubtaskSenderThread(connManagerToTest, receiver)).start();
+		}
+
+		latch.await();
+
+		connManagerToTest.shutdown();
+		connManagerReceiver.shutdown();
+	}
+
+
+	private class VerifyEnvelopes implements Answer {
+
+		private final ConcurrentMap<ChannelID, Integer> received = new ConcurrentHashMap<ChannelID, Integer>();
+
+		private final CountDownLatch latch;
+
+		private VerifyEnvelopes(CountDownLatch latch) {
+			this.latch = latch;
+		}
+
+		@Override
+		public Object answer(InvocationOnMock invocation) throws Throwable {
+			Envelope env = (Envelope) invocation.getArguments()[0];
+
+			ChannelID channelId = env.getSource();
+			int seqNum = env.getSequenceNumber();
+
+			if (seqNum == 0) {
+				Assert.assertNull(
+						String.format("Received envelope from %s before, but current seq num is 0", channelId),
+						this.received.putIfAbsent(channelId, seqNum));
+			}
+			else {
+				Assert.assertTrue(
+						String.format("Received seq num %d from %s, but previous was not %d", seqNum, channelId, seqNum - 1),
+						this.received.replace(channelId, seqNum - 1, seqNum));
+			}
+
+			// count down the latch if all envelopes received for this source
+			if (seqNum == numToSendPerSubtask - 1) {
+				this.latch.countDown();
+			}
+
+			return null;
+		}
+	}
+
+	private class SubtaskSenderThread implements Runnable {
+
+		private final NettyConnectionManager connectionManager;
+
+		private final RemoteReceiver receiver;
+
+		private final JobID jobId = new JobID();
+
+		private final ChannelID channelId = new ChannelID();
+
+		private int seqNum = 0;
+
+		private SubtaskSenderThread(NettyConnectionManager connectionManager, RemoteReceiver receiver) {
+			this.connectionManager = connectionManager;
+			this.receiver = receiver;
+		}
+
+		@Override
+		public void run() {
+			// enqueue envelopes with ascending seq nums
+			while (this.seqNum < numToSendPerSubtask) {
+				try {
+					Envelope env = new Envelope(this.seqNum++, this.jobId, this.channelId);
+					this.connectionManager.enqueue(env, receiver);
+				} catch (IOException e) {
+					throw new RuntimeException("Unexpected exception while enqueing envelope");
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoderTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoderTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoderTest.java
new file mode 100644
index 0000000..ba4edc7
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoderTest.java
@@ -0,0 +1,97 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.netty;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.network.Envelope;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class OutboundEnvelopeEncoderTest {
+
+	private final long RANDOM_SEED = 520346508276087l;
+
+	private final Random random = new Random(RANDOM_SEED);
+
+	private static final int NUM_RANDOM_ENVELOPES = 512;
+
+	private static final int MAX_EVENTS_SIZE = 1024;
+
+	private static final int MAX_BUFFER_SIZE = 32768;
+
+	@Test
+	public void testEncodedSizeAndBufferRecycling() {
+		final ByteBuffer events = ByteBuffer.allocate(MAX_EVENTS_SIZE);
+		final MemorySegment segment = new MemorySegment(new byte[MAX_BUFFER_SIZE]);
+
+		final Buffer buffer = mock(Buffer.class);
+		when(buffer.getMemorySegment()).thenReturn(segment);
+
+		final EmbeddedChannel channel = new EmbeddedChannel(new OutboundEnvelopeEncoder());
+
+		int numBuffers = 0;
+		for (int i = 0; i < NUM_RANDOM_ENVELOPES; i++) {
+			Envelope env = new Envelope(i, new JobID(), new ChannelID());
+			int expectedEncodedMsgSize = OutboundEnvelopeEncoder.HEADER_SIZE;
+
+			if (random.nextBoolean()) {
+				int eventsSize = random.nextInt(MAX_EVENTS_SIZE + 1);
+				expectedEncodedMsgSize += eventsSize;
+
+				events.clear();
+				events.limit(eventsSize);
+
+				env.setEventsSerialized(events);
+			}
+
+			if (random.nextBoolean()) {
+				numBuffers++;
+
+				int bufferSize = random.nextInt(MAX_BUFFER_SIZE + 1);
+				when(buffer.size()).thenReturn(bufferSize);
+				env.setBuffer(buffer);
+
+				expectedEncodedMsgSize += bufferSize;
+			}
+
+			Assert.assertTrue(channel.writeOutbound(env));
+
+			// --------------------------------------------------------------------
+			// verify encoded ByteBuf size
+			// --------------------------------------------------------------------
+			ByteBuf encodedMsg = (ByteBuf) channel.readOutbound();
+			Assert.assertEquals(expectedEncodedMsgSize, encodedMsg.readableBytes());
+
+			encodedMsg.release();
+		}
+
+		// --------------------------------------------------------------------
+		// verify buffers are recycled
+		// --------------------------------------------------------------------
+		verify(buffer, times(numBuffers)).recycleBuffer();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
index c9323a6..8cda32f 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
@@ -14,6 +14,7 @@
  **********************************************************************************************************************/
 package eu.stratosphere.test.broadcastvars;
 
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
 import org.apache.log4j.Level;
 
 import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
@@ -22,8 +23,6 @@ import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
 import eu.stratosphere.api.java.record.io.CsvInputFormat;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
 import eu.stratosphere.nephele.jobgraph.JobInputVertex;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-tests/src/test/java/eu/stratosphere/test/clients/examples/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/clients/examples/LocalExecutorITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/clients/examples/LocalExecutorITCase.java
index 802fd0b..984ecc2 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/clients/examples/LocalExecutorITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/clients/examples/LocalExecutorITCase.java
@@ -23,9 +23,8 @@ import eu.stratosphere.client.LocalExecutor;
 import eu.stratosphere.test.recordJobs.wordcount.WordCount;
 import eu.stratosphere.test.testdata.WordCountData;
 
-
 public class LocalExecutorITCase {
-	
+
 	@Test
 	public void testLocalExecutorWithWordCount() {
 		try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/WordCountITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/WordCountITCase.java
index 2eaa8a9..4a60836 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/WordCountITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/WordCountITCase.java
@@ -20,7 +20,7 @@ import eu.stratosphere.test.util.JavaProgramTestBase;
 
 
 public class WordCountITCase extends JavaProgramTestBase {
-	
+
 	protected String textPath;
 	protected String resultPath;
 
@@ -28,7 +28,6 @@ public class WordCountITCase extends JavaProgramTestBase {
 		setNumTaskManager(2);
 	}
 
-	
 	@Override
 	protected void preSubmit() throws Exception {
 		textPath = createTempFile("text.txt", WordCountData.TEXT);
@@ -39,7 +38,7 @@ public class WordCountITCase extends JavaProgramTestBase {
 	protected void postSubmit() throws Exception {
 		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath);
 	}
-	
+
 	@Override
 	protected void testProgram() throws Exception {
 		WordCount.main(new String[] { textPath, resultPath });

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
index 0a63af1..40068b7 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
@@ -16,6 +16,8 @@ package eu.stratosphere.test.iterative.nephele;
 import java.io.BufferedReader;
 import java.util.Collection;
 
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.test.util.RecordAPITestBase;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -31,8 +33,6 @@ import eu.stratosphere.api.java.record.io.CsvInputFormat;
 import eu.stratosphere.api.java.record.io.CsvOutputFormat;
 import eu.stratosphere.api.java.record.io.FileOutputFormat;
 import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
 import eu.stratosphere.nephele.jobgraph.JobInputVertex;
@@ -425,11 +425,11 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		JobGraphUtils.connect(head, intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
 		intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
 
-		JobGraphUtils.connect(intermediate, tail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(intermediate, tail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
 
-		JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
-		JobGraphUtils.connect(tail, fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(tail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
 		JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 
@@ -567,16 +567,16 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		JobGraphUtils.connect(intermediate, ssJoinIntermediate, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 		ssJoinIntermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
 
-		JobGraphUtils.connect(ssJoinIntermediate, ssTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(ssJoinIntermediate, ssTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 		ssTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
 
-		JobGraphUtils.connect(ssJoinIntermediate, wsTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(ssJoinIntermediate, wsTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 		wsTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
 
-		JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
-		JobGraphUtils.connect(ssTail, ssFakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
-		JobGraphUtils.connect(wsTail, wsFakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(ssTail, ssFakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(wsTail, wsFakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
 		JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 
@@ -695,12 +695,12 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 			DistributionPattern.POINTWISE);
 		wsUpdateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
 
-		JobGraphUtils.connect(wsUpdateIntermediate, ssTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(wsUpdateIntermediate, ssTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 		ssTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
 
-		JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
-		JobGraphUtils.connect(ssTail, fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(ssTail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
 		JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 
@@ -815,12 +815,12 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		JobGraphUtils.connect(intermediate, ssJoinIntermediate, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 		ssJoinIntermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
 
-		JobGraphUtils.connect(ssJoinIntermediate, wsTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(ssJoinIntermediate, wsTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 		wsTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
 
-		JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
-		JobGraphUtils.connect(wsTail, fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(wsTail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
 		JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java
index ef83aef..ef7c9d2 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java
@@ -15,6 +15,8 @@ package eu.stratosphere.test.iterative.nephele;
 import java.util.Collection;
 import java.util.Iterator;
 
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -25,8 +27,6 @@ import eu.stratosphere.api.java.record.functions.MapFunction;
 import eu.stratosphere.api.java.record.functions.ReduceFunction;
 import eu.stratosphere.api.java.record.io.FileOutputFormat;
 import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
 import eu.stratosphere.nephele.jobgraph.JobInputVertex;
@@ -232,16 +232,16 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 		// --------------------------------------------------------------------------------------------------------------
 		// 2. EDGES
 		// --------------------------------------------------------------------------------------------------------------
-		JobGraphUtils.connect(input, head, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(input, head, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
-		JobGraphUtils.connect(head, tail, ChannelType.INMEMORY, DistributionPattern.BIPARTITE);
+		JobGraphUtils.connect(head, tail, ChannelType.IN_MEMORY, DistributionPattern.BIPARTITE);
 		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
 
-		JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
 		JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 
-		JobGraphUtils.connect(tail, fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(tail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
 		// --------------------------------------------------------------------------------------------------------------
 		// 3. INSTANCE SHARING

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountUnionReduceITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountUnionReduceITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountUnionReduceITCase.java
index 624fd9b..30ce102 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountUnionReduceITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountUnionReduceITCase.java
@@ -22,8 +22,6 @@ import eu.stratosphere.api.java.record.io.CsvOutputFormat;
 import eu.stratosphere.api.java.record.io.TextInputFormat;
 import eu.stratosphere.api.java.record.operators.MapOperator;
 import eu.stratosphere.api.java.record.operators.ReduceOperator;
-import eu.stratosphere.nephele.io.MutableUnionRecordReader;
-import eu.stratosphere.nephele.io.UnionRecordReader;
 import eu.stratosphere.test.recordJobs.wordcount.WordCount.CountWords;
 import eu.stratosphere.test.recordJobs.wordcount.WordCount.TokenizeLine;
 import eu.stratosphere.test.testdata.WordCountData;
@@ -39,8 +37,6 @@ import eu.stratosphere.types.StringValue;
  * 
  * @see {@link https://github.com/stratosphere/stratosphere/issues/192}
  * @see {@link https://github.com/stratosphere/stratosphere/issues/124}
- * @see {@link UnionRecordReader}
- * @see {@link MutableUnionRecordReader}
  */
 public class WordCountUnionReduceITCase extends RecordAPITestBase {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
new file mode 100644
index 0000000..f5beda4
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
@@ -0,0 +1,286 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.runtime;
+
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.nephele.jobgraph.JobGenericInputVertex;
+import eu.stratosphere.nephele.jobgraph.JobGraph;
+import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
+import eu.stratosphere.nephele.jobgraph.JobInputVertex;
+import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
+import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
+import eu.stratosphere.nephele.template.AbstractGenericInputTask;
+import eu.stratosphere.nephele.template.AbstractOutputTask;
+import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.runtime.io.api.RecordWriter;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.test.util.RecordAPITestBase;
+import eu.stratosphere.util.LogUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class NetworkStackNepheleITCase extends RecordAPITestBase {
+
+	private static final Log LOG = LogFactory.getLog(NetworkStackNepheleITCase.class);
+
+	private static final String DATA_VOLUME_GB_CONFIG_KEY = "data.volume.gb";
+
+	private static final String USE_FORWARDER_CONFIG_KEY = "use.forwarder";
+
+	private static final String NUM_SUBTASKS_CONFIG_KEY = "num.subtasks";
+
+	private static final String NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY = "num.subtasks.instance";
+
+	private static final String IS_SLOW_SENDER_CONFIG_KEY = "is.slow.sender";
+
+	private static final String IS_SLOW_RECEIVER_CONFIG_KEY = "is.slow.receiver";
+
+	private static final int IS_SLOW_SLEEP_MS = 10;
+
+	private static final int IS_SLOW_EVERY_NUM_RECORDS = (2 * 32 * 1024) / SpeedTestRecord.RECORD_SIZE;
+
+	// ------------------------------------------------------------------------
+
+	public NetworkStackNepheleITCase(Configuration config) {
+		super(config);
+
+		setNumTaskManager(2);
+		LogUtils.initializeDefaultConsoleLogger();
+	}
+
+	@Parameters
+	public static Collection<Object[]> getConfigurations() {
+		Object[][] configParams = new Object[][]{
+				new Object[]{1, false, false, false, 4, 2},
+				new Object[]{1, true, false, false, 4, 2},
+				new Object[]{1, true, true, false, 4, 2},
+				new Object[]{1, true, false, true, 4, 2},
+				new Object[]{2, true, false, false, 4, 2},
+				new Object[]{4, true, false, false, 4, 2},
+				new Object[]{4, true, false, false, 8, 4},
+				new Object[]{4, true, false, false, 16, 8},
+		};
+
+		List<Configuration> configs = new ArrayList<Configuration>(configParams.length);
+		for (Object[] p : configParams) {
+			Configuration config = new Configuration();
+			config.setInteger(DATA_VOLUME_GB_CONFIG_KEY, (Integer) p[0]);
+			config.setBoolean(USE_FORWARDER_CONFIG_KEY, (Boolean) p[1]);
+			config.setBoolean(IS_SLOW_SENDER_CONFIG_KEY, (Boolean) p[2]);
+			config.setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, (Boolean) p[3]);
+			config.setInteger(NUM_SUBTASKS_CONFIG_KEY, (Integer) p[4]);
+			config.setInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, (Integer) p[5]);
+
+			configs.add(config);
+		}
+
+		return toParameterList(configs);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected JobGraph getJobGraph() throws Exception {
+		int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
+		boolean useForwarder = this.config.getBoolean(USE_FORWARDER_CONFIG_KEY, true);
+		boolean isSlowSender = this.config.getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
+		boolean isSlowReceiver = this.config.getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
+		int numSubtasks = this.config.getInteger(NUM_SUBTASKS_CONFIG_KEY, 1);
+		int numSubtasksPerInstance = this.config.getInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, 1);
+
+		return createJobGraph(dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, numSubtasks, numSubtasksPerInstance);
+	}
+
+	@After
+	public void calculateThroughput() {
+		if (getJobExecutionResult() != null) {
+			int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
+
+			double dataVolumeMbit = dataVolumeGb * 8192.0;
+			double runtimeSecs = getJobExecutionResult().getNetRuntime() / 1000.0;
+
+			int mbitPerSecond = (int) Math.round(dataVolumeMbit / runtimeSecs);
+
+			LOG.info(String.format("Test finished with throughput of %d MBit/s (" +
+					"runtime [secs]: %.2f, data volume [mbits]: %.2f)", mbitPerSecond, runtimeSecs, dataVolumeMbit));
+		}
+	}
+
+	private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender, boolean isSlowReceiver,
+									int numSubtasks, int numSubtasksPerInstance) throws JobGraphDefinitionException {
+
+		JobGraph jobGraph = new JobGraph("Speed Test");
+
+		JobInputVertex producer = new JobGenericInputVertex("Speed Test Producer", jobGraph);
+		producer.setInputClass(SpeedTestProducer.class);
+		producer.setNumberOfSubtasks(numSubtasks);
+		producer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
+		producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
+		producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
+
+		JobTaskVertex forwarder = null;
+		if (useForwarder) {
+			forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
+			forwarder.setTaskClass(SpeedTestForwarder.class);
+			forwarder.setNumberOfSubtasks(numSubtasks);
+			forwarder.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
+		}
+
+		JobOutputVertex consumer = new JobOutputVertex("Speed Test Consumer", jobGraph);
+		consumer.setOutputClass(SpeedTestConsumer.class);
+		consumer.setNumberOfSubtasks(numSubtasks);
+		consumer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
+		consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
+
+		if (useForwarder) {
+			producer.connectTo(forwarder, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+			forwarder.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+
+			producer.setVertexToShareInstancesWith(forwarder);
+			forwarder.setVertexToShareInstancesWith(consumer);
+		}
+		else {
+			producer.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+			producer.setVertexToShareInstancesWith(consumer);
+		}
+
+		return jobGraph;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static class SpeedTestProducer extends AbstractGenericInputTask {
+
+		private RecordWriter<SpeedTestRecord> writer;
+
+		@Override
+		public void registerInputOutput() {
+			this.writer = new RecordWriter<SpeedTestRecord>(this);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			this.writer.initializeSerializers();
+
+			// Determine the amount of data to send per subtask
+			int dataVolumeGb = getTaskConfiguration().getInteger(NetworkStackNepheleITCase.DATA_VOLUME_GB_CONFIG_KEY, 1);
+
+			long dataMbPerSubtask = (dataVolumeGb * 1024) / getCurrentNumberOfSubtasks();
+			long numRecordsToEmit = (dataMbPerSubtask * 1024 * 1024) / SpeedTestRecord.RECORD_SIZE;
+
+			LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)",
+					getIndexInSubtaskGroup() + 1, getCurrentNumberOfSubtasks(), numRecordsToEmit,
+					SpeedTestRecord.RECORD_SIZE, dataMbPerSubtask/1024.0));
+
+			boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
+
+			int numRecords = 0;
+			SpeedTestRecord record = new SpeedTestRecord();
+			for (long i = 0; i < numRecordsToEmit; i++) {
+				if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
+					Thread.sleep(IS_SLOW_SLEEP_MS);
+				}
+
+				this.writer.emit(record);
+			}
+
+			this.writer.flush();
+		}
+	}
+
+	public static class SpeedTestForwarder extends AbstractTask {
+
+		private RecordReader<SpeedTestRecord> reader;
+
+		private RecordWriter<SpeedTestRecord> writer;
+
+		@Override
+		public void registerInputOutput() {
+			this.reader = new RecordReader<SpeedTestRecord>(this, SpeedTestRecord.class);
+			this.writer = new RecordWriter<SpeedTestRecord>(this);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			this.writer.initializeSerializers();
+
+			SpeedTestRecord record;
+			while ((record = this.reader.next()) != null) {
+				this.writer.emit(record);
+			}
+
+			this.writer.flush();
+		}
+	}
+
+	public static class SpeedTestConsumer extends AbstractOutputTask {
+
+		private RecordReader<SpeedTestRecord> reader;
+
+		@Override
+		public void registerInputOutput() {
+			this.reader = new RecordReader<SpeedTestRecord>(this, SpeedTestRecord.class);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
+
+			int numRecords = 0;
+			while (this.reader.next() != null) {
+				if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
+					Thread.sleep(IS_SLOW_SLEEP_MS);
+				}
+			}
+		}
+	}
+
+	public static class SpeedTestRecord implements IOReadableWritable {
+
+		private static final int RECORD_SIZE = 128;
+
+		private final byte[] buf = new byte[RECORD_SIZE];
+
+		public SpeedTestRecord() {
+			for (int i = 0; i < RECORD_SIZE; ++i) {
+				this.buf[i] = (byte) (i % 128);
+			}
+		}
+
+		@Override
+		public void write(DataOutput out) throws IOException {
+			out.write(this.buf);
+		}
+
+		@Override
+		public void read(DataInput in) throws IOException {
+			in.readFully(this.buf);
+		}
+	}
+}