You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:30:39 UTC
[04/30] Replace custom Java NIO TCP/IP code with Netty 4 library
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
new file mode 100644
index 0000000..40aefd3
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
@@ -0,0 +1,857 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.netty;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.BufferRecycler;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider.BufferAvailabilityRegistration;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProviderBroker;
+import eu.stratosphere.runtime.io.network.Envelope;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
+import junit.framework.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class InboundEnvelopeDecoderTest {
+
+ @Mock
+ private BufferProvider bufferProvider;
+
+ @Mock
+ private BufferProviderBroker bufferProviderBroker;
+
+ @Before
+ public void initMocks() throws IOException {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testBufferStaging() throws Exception {
+ final InboundEnvelopeDecoder decoder = new InboundEnvelopeDecoder(this.bufferProviderBroker);
+ final EmbeddedChannel ch = new EmbeddedChannel(
+ new OutboundEnvelopeEncoder(),
+ decoder);
+
+ when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
+ .thenReturn(this.bufferProvider);
+
+ // --------------------------------------------------------------------
+
+ Envelope[] envelopes = nextEnvelopes(3, true);
+
+ ByteBuf buf = encode(ch, envelopes);
+
+ when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
+ .thenReturn(BufferAvailabilityRegistration.REGISTERED);
+
+ Buffer buffer = allocBuffer(envelopes[2].getBuffer().size());
+
+ when(this.bufferProvider.requestBuffer(anyInt()))
+ .thenReturn(null, null, buffer, null);
+
+ // --------------------------------------------------------------------
+
+ // slices: [0] => full envelope, [1] => half envelope, [2] => remaining half + full envelope
+ ByteBuf[] slices = slice(buf,
+ OutboundEnvelopeEncoder.HEADER_SIZE + envelopes[0].getBuffer().size(),
+ OutboundEnvelopeEncoder.HEADER_SIZE + envelopes[1].getBuffer().size() / 2);
+
+ // 1. no buffer available, incoming slice contains all data
+ int refCount = slices[0].refCnt();
+
+ decodeAndVerify(ch, slices[0]);
+
+ Assert.assertEquals(refCount + 1, slices[0].refCnt());
+ Assert.assertFalse(ch.config().isAutoRead());
+
+ // notify of available buffer (=> bufferAvailable() callback does return a buffer
+ // of the current network buffer size; the decoder needs to adjust its size to the
+ // requested size
+ decoder.bufferAvailable(allocBuffer(envelopes[0].getBuffer().size() * 2));
+ ch.runPendingTasks();
+
+ Assert.assertEquals(refCount - 1, slices[0].refCnt());
+ Assert.assertTrue(ch.config().isAutoRead());
+
+ decodeAndVerify(ch, envelopes[0]);
+
+ // 2. no buffer available, incoming slice does NOT contain all data
+ refCount = slices[1].refCnt();
+
+ decodeAndVerify(ch, slices[1]);
+
+ Assert.assertEquals(refCount + 1, slices[1].refCnt());
+ Assert.assertFalse(ch.config().isAutoRead());
+
+ decoder.bufferAvailable(allocBuffer());
+ ch.runPendingTasks();
+
+ Assert.assertEquals(refCount - 1, slices[1].refCnt());
+ Assert.assertTrue(ch.config().isAutoRead());
+
+ decodeAndVerify(ch);
+
+ // 3. buffer available
+ refCount = slices[2].refCnt();
+
+ decodeAndVerify(ch, slices[2], envelopes[1], envelopes[2]);
+
+ Assert.assertEquals(refCount - 1, slices[2].refCnt());
+ Assert.assertTrue(ch.config().isAutoRead());
+
+ Assert.assertEquals(1, buf.refCnt());
+ buf.release();
+ }
+
+ @Test
+ public void testBufferStagingStagedBufferException() throws Exception {
+ final EmbeddedChannel ch = new EmbeddedChannel(
+ new OutboundEnvelopeEncoder(),
+ new InboundEnvelopeDecoder(this.bufferProviderBroker));
+
+ when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
+ .thenReturn(this.bufferProvider);
+
+ // --------------------------------------------------------------------
+
+ ByteBuf buf = encode(ch, nextEnvelope(true));
+
+ when(this.bufferProvider.requestBuffer(anyInt()))
+ .thenReturn(null);
+
+ when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
+ .thenReturn(BufferAvailabilityRegistration.REGISTERED);
+
+ // --------------------------------------------------------------------
+
+ int refCount = buf.refCnt();
+
+ decodeAndVerify(ch, buf);
+
+ Assert.assertFalse(ch.config().isAutoRead());
+ Assert.assertEquals(refCount + 1, buf.refCnt());
+
+ try {
+ decodeAndVerify(ch, buf);
+ Assert.fail("Expected IllegalStateException not thrown");
+ } catch (IllegalStateException e) {
+ // expected exception
+ }
+
+ buf.release();
+ }
+
+ @Test
+ public void testBufferAvailabilityRegistrationBufferAvailable() throws Exception {
+ final EmbeddedChannel ch = new EmbeddedChannel(
+ new OutboundEnvelopeEncoder(),
+ new InboundEnvelopeDecoder(this.bufferProviderBroker));
+
+ when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
+ .thenReturn(this.bufferProvider);
+
+ // --------------------------------------------------------------------
+
+ Envelope[] envelopes = new Envelope[]{nextEnvelope(true), nextEnvelope()};
+
+ when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
+ .thenReturn(BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_AVAILABLE);
+
+ when(this.bufferProvider.requestBuffer(anyInt()))
+ .thenReturn(null)
+ .thenReturn(allocBuffer(envelopes[0].getBuffer().size()));
+
+ // --------------------------------------------------------------------
+
+ ByteBuf buf = encode(ch, envelopes);
+
+ decodeAndVerify(ch, buf, envelopes);
+ Assert.assertEquals(0, buf.refCnt());
+ }
+
+ @Test
+ public void testBufferAvailabilityRegistrationBufferPoolDestroyedSkipBytes() throws Exception {
+ final EmbeddedChannel ch = new EmbeddedChannel(
+ new OutboundEnvelopeEncoder(),
+ new InboundEnvelopeDecoder(this.bufferProviderBroker));
+
+ when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
+ .thenReturn(this.bufferProvider);
+
+ when(this.bufferProvider.requestBuffer(anyInt()))
+ .thenReturn(null);
+
+ when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
+ .thenReturn(BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_POOL_DESTROYED);
+
+ // --------------------------------------------------------------------
+
+ Envelope[] envelopes = new Envelope[]{nextEnvelope(true), nextEnvelope(), nextEnvelope()};
+ Envelope[] expectedEnvelopes = new Envelope[]{envelopes[1], envelopes[2]};
+
+ ByteBuf buf = encode(ch, envelopes);
+
+ int bufferSize = envelopes[0].getBuffer().size();
+
+ // --------------------------------------------------------------------
+ // 1) skip in current buffer only
+ // --------------------------------------------------------------------
+ {
+ // skip last bytes in current buffer
+ ByteBuf[] slices = slice(buf, OutboundEnvelopeEncoder.HEADER_SIZE + bufferSize);
+
+ int refCount = slices[0].refCnt();
+ decodeAndVerify(ch, slices[0]);
+ Assert.assertEquals(refCount - 1, slices[0].refCnt());
+
+ refCount = slices[1].refCnt();
+ decodeAndVerify(ch, slices[1], expectedEnvelopes);
+ Assert.assertEquals(refCount - 1, slices[1].refCnt());
+ }
+
+ {
+ // skip bytes in current buffer, leave last 16 bytes from next envelope
+ ByteBuf[] slices = slice(buf, OutboundEnvelopeEncoder.HEADER_SIZE + bufferSize + 16);
+
+ int refCount = slices[0].refCnt();
+ decodeAndVerify(ch, slices[0]);
+ Assert.assertEquals(refCount - 1, slices[0].refCnt());
+
+ refCount = slices[1].refCnt();
+ decodeAndVerify(ch, slices[1], expectedEnvelopes);
+ Assert.assertEquals(refCount - 1, slices[1].refCnt());
+ }
+
+ {
+ // skip bytes in current buffer, then continue with full envelope from same buffer
+ ByteBuf[] slices = slice(buf, OutboundEnvelopeEncoder.HEADER_SIZE + bufferSize + OutboundEnvelopeEncoder.HEADER_SIZE);
+
+ int refCount = slices[0].refCnt();
+ decodeAndVerify(ch, slices[0], expectedEnvelopes[0]);
+ Assert.assertEquals(refCount - 1, slices[0].refCnt());
+
+ refCount = slices[1].refCnt();
+ decodeAndVerify(ch, slices[1], expectedEnvelopes[1]);
+ Assert.assertEquals(refCount - 1, slices[1].refCnt());
+ }
+
+ // --------------------------------------------------------------------
+ // 2) skip in current and next buffer
+ // --------------------------------------------------------------------
+
+ {
+ // skip bytes in current buffer, then continue to skip last 32 bytes in next buffer
+ ByteBuf[] slices = slice(buf, OutboundEnvelopeEncoder.HEADER_SIZE + bufferSize - 32);
+
+ int refCount = slices[0].refCnt();
+ decodeAndVerify(ch, slices[0]);
+ Assert.assertEquals(refCount - 1, slices[0].refCnt());
+
+ refCount = slices[1].refCnt();
+ decodeAndVerify(ch, slices[1], expectedEnvelopes);
+ Assert.assertEquals(refCount - 1, slices[1].refCnt());
+ }
+
+ {
+ // skip bytes in current buffer, then continue to skip in next two buffers
+ ByteBuf[] slices = slice(buf, OutboundEnvelopeEncoder.HEADER_SIZE + bufferSize - 32, 16);
+
+ int refCount = slices[0].refCnt();
+ decodeAndVerify(ch, slices[0]);
+ Assert.assertEquals(refCount - 1, slices[0].refCnt());
+
+ refCount = slices[1].refCnt();
+ decodeAndVerify(ch, slices[1]);
+ Assert.assertEquals(refCount - 1, slices[1].refCnt());
+
+ refCount = slices[2].refCnt();
+ decodeAndVerify(ch, slices[2], expectedEnvelopes);
+ Assert.assertEquals(refCount - 1, slices[2].refCnt());
+ }
+
+ // ref count should be 1, because slices shared the ref count
+ Assert.assertEquals(1, buf.refCnt());
+ }
+
+ @Test
+ public void testEncodeDecode() throws Exception {
+ final EmbeddedChannel ch = new EmbeddedChannel(
+ new OutboundEnvelopeEncoder(), new InboundEnvelopeDecoder(this.bufferProviderBroker));
+
+ when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
+ .thenReturn(this.bufferProvider);
+
+ when(this.bufferProvider.requestBuffer(anyInt())).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ // fulfill the buffer request
+ return allocBuffer((Integer) invocation.getArguments()[0]);
+ }
+ });
+
+ // --------------------------------------------------------------------
+
+ Envelope[] envelopes = new Envelope[]{
+ nextEnvelope(0),
+ nextEnvelope(2),
+ nextEnvelope(32768),
+ nextEnvelope(3782, new TestEvent1(34872527)),
+ nextEnvelope(88, new TestEvent1(8749653), new TestEvent1(365345)),
+ nextEnvelope(0, new TestEvent2(34563456), new TestEvent1(598432), new TestEvent2(976293845)),
+ nextEnvelope(23)
+ };
+
+ ByteBuf buf = encode(ch, envelopes);
+
+ // 1. complete ByteBuf as input
+ int refCount = buf.retain().refCnt();
+
+ decodeAndVerify(ch, buf, envelopes);
+ Assert.assertEquals(refCount - 1, buf.refCnt());
+
+ // 2. random slices
+ buf.readerIndex(0);
+ ByteBuf[] slices = randomSlices(buf);
+
+ ch.writeInbound(slices);
+
+ for (ByteBuf slice : slices) {
+ Assert.assertEquals(1, slice.refCnt());
+ }
+
+ decodeAndVerify(ch, envelopes);
+
+ buf.release();
+ }
+
+ @Test
+ public void testEncodeDecodeRandomEnvelopes() throws Exception {
+ final InboundEnvelopeDecoder decoder = new InboundEnvelopeDecoder(this.bufferProviderBroker);
+ final EmbeddedChannel ch = new EmbeddedChannel(
+ new OutboundEnvelopeEncoder(), decoder);
+
+ when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
+ .thenReturn(this.bufferProvider);
+
+ when(this.bufferProvider.requestBuffer(anyInt())).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ // fulfill the buffer request with the requested size
+ return allocBuffer((Integer) invocation.getArguments()[0]);
+ }
+ });
+
+ Random randomAnswerSource = new Random(RANDOM_SEED);
+
+ RandomBufferRequestAnswer randomBufferRequestAnswer = new RandomBufferRequestAnswer(randomAnswerSource);
+
+ RandomBufferAvailabilityRegistrationAnswer randomBufferAvailabilityRegistrationAnswer =
+ new RandomBufferAvailabilityRegistrationAnswer(randomAnswerSource, randomBufferRequestAnswer);
+
+ when(this.bufferProvider.requestBuffer(anyInt())).thenAnswer(randomBufferRequestAnswer);
+
+ when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
+ .thenAnswer(randomBufferAvailabilityRegistrationAnswer);
+
+ // --------------------------------------------------------------------
+
+ Envelope[] envelopes = nextRandomEnvelopes(1024);
+
+ ByteBuf buf = encode(ch, envelopes);
+
+ ByteBuf[] slices = randomSlices(buf);
+
+ for (ByteBuf slice : slices) {
+ int refCount = slice.refCnt();
+ ch.writeInbound(slice);
+
+ // registered BufferAvailabilityListener => call bufferAvailable(buffer)
+ while (randomBufferAvailabilityRegistrationAnswer.isRegistered()) {
+ randomBufferAvailabilityRegistrationAnswer.unregister();
+
+ Assert.assertFalse(ch.config().isAutoRead());
+ Assert.assertEquals(refCount + 1, slice.refCnt());
+
+ // return a buffer of max size => decoder needs to limit buffer size
+ decoder.bufferAvailable(allocBuffer(MAX_BUFFER_SIZE));
+ ch.runPendingTasks();
+ }
+
+ Assert.assertEquals(refCount - 1, slice.refCnt());
+ Assert.assertTrue(ch.config().isAutoRead());
+ }
+
+ Envelope[] expected = randomBufferAvailabilityRegistrationAnswer.removeSkippedEnvelopes(envelopes);
+
+ decodeAndVerify(ch, expected);
+
+ Assert.assertEquals(1, buf.refCnt());
+
+ buf.release();
+ }
+
+ // ========================================================================
+ // helpers
+ // ========================================================================
+
+ private final static long RANDOM_SEED = 520346508276087l;
+
+ private final static Random random = new Random(RANDOM_SEED);
+
+ private final static int[] BUFFER_SIZES = new int[]{8192, 16384, 32768};
+
+ private final static int MAX_BUFFER_SIZE = BUFFER_SIZES[2];
+
+ private final static int MAX_NUM_EVENTS = 5;
+
+ private final static int MAX_SLICE_SIZE = MAX_BUFFER_SIZE / 3;
+
+ private final static int MIN_SLICE_SIZE = 1;
+
+ private final static BufferRecycler RECYCLER = mock(BufferRecycler.class);
+
+ // ------------------------------------------------------------------------
+ // envelopes
+ // ------------------------------------------------------------------------
+
+ private static Buffer allocBuffer() {
+ return allocBuffer(MAX_BUFFER_SIZE);
+ }
+
+ private static Buffer allocBuffer(int bufferSize) {
+ return spy(new Buffer(new MemorySegment(new byte[bufferSize]), bufferSize, RECYCLER));
+ }
+
+ private Envelope nextEnvelope() {
+ return nextEnvelope(false, false);
+ }
+
+ private Envelope nextEnvelope(boolean withBuffer) {
+ return nextEnvelope(withBuffer, false);
+ }
+
+ private Envelope nextEnvelope(int bufferSize, AbstractEvent... events) {
+ Envelope env = new Envelope(random.nextInt(), new JobID(), new ChannelID());
+ if (bufferSize > 0) {
+ byte[] data = new byte[bufferSize];
+ random.nextBytes(data);
+
+ env.setBuffer(spy(new Buffer(new MemorySegment(data), bufferSize, RECYCLER)));
+ }
+
+ if (events != null && events.length > 0) {
+ env.serializeEventList(Arrays.asList(events));
+ }
+
+ return env;
+ }
+
+ private Envelope nextEnvelope(boolean withBuffer, boolean withEvents) {
+ int bufferSize = 0;
+ AbstractEvent[] events = null;
+
+ if (withBuffer) {
+ bufferSize = BUFFER_SIZES[random.nextInt(BUFFER_SIZES.length)];
+ }
+
+ if (withEvents) {
+ events = new AbstractEvent[random.nextInt(MAX_NUM_EVENTS) + 1];
+
+ for (int i = 0; i < events.length; i++) {
+ events[i] = (random.nextBoolean()
+ ? new TestEvent1(random.nextLong())
+ : new TestEvent2(random.nextLong()));
+ }
+ }
+
+ return nextEnvelope(bufferSize, events);
+ }
+
+ private Envelope[] nextEnvelopes(int numEnvelopes, boolean withBuffer) {
+ Envelope[] envelopes = new Envelope[numEnvelopes];
+ for (int i = 0; i < numEnvelopes; i++) {
+ envelopes[i] = nextEnvelope(withBuffer, false);
+ }
+ return envelopes;
+ }
+
+ private Envelope[] nextRandomEnvelopes(int numEnvelopes) {
+ Envelope[] envelopes = new Envelope[numEnvelopes];
+ for (int i = 0; i < numEnvelopes; i++) {
+ envelopes[i] = nextEnvelope(random.nextBoolean(), random.nextBoolean());
+ }
+ return envelopes;
+ }
+
+ // ------------------------------------------------------------------------
+ // channel encode/decode
+ // ------------------------------------------------------------------------
+
+ private static ByteBuf encode(EmbeddedChannel ch, Envelope... envelopes) {
+ for (Envelope env : envelopes) {
+ ch.writeOutbound(env);
+
+ if (env.getBuffer() != null) {
+ verify(env.getBuffer(), times(1)).recycleBuffer();
+ }
+ }
+
+ CompositeByteBuf encodedEnvelopes = new CompositeByteBuf(ByteBufAllocator.DEFAULT, false, envelopes.length);
+
+ ByteBuf buf;
+ while ((buf = (ByteBuf) ch.readOutbound()) != null) {
+ encodedEnvelopes.addComponent(buf);
+ }
+
+ return encodedEnvelopes.writerIndex(encodedEnvelopes.capacity());
+ }
+
+ private static void decodeAndVerify(EmbeddedChannel ch, ByteBuf buf, Envelope... expectedEnvelopes) {
+ ch.writeInbound(buf);
+
+ decodeAndVerify(ch, expectedEnvelopes);
+ }
+
+ private static void decodeAndVerify(EmbeddedChannel ch, Envelope... expectedEnvelopes) {
+ if (expectedEnvelopes == null) {
+ Assert.assertNull(ch.readInbound());
+ }
+ else {
+ for (Envelope expected : expectedEnvelopes) {
+ Envelope actual = (Envelope) ch.readInbound();
+
+ if (actual == null) {
+ Assert.fail("No inbound envelope available, but expected one");
+ }
+
+ assertEqualEnvelopes(expected, actual);
+ }
+ }
+ }
+
+ private static void assertEqualEnvelopes(Envelope expected, Envelope actual) {
+ Assert.assertTrue(expected.getSequenceNumber() == actual.getSequenceNumber() &&
+ expected.getJobID().equals(actual.getJobID()) &&
+ expected.getSource().equals(actual.getSource()));
+
+ if (expected.getBuffer() == null) {
+ Assert.assertNull(actual.getBuffer());
+ }
+ else {
+ Assert.assertNotNull(actual.getBuffer());
+
+ ByteBuffer expectedByteBuffer = expected.getBuffer().getMemorySegment().wrap(0, expected.getBuffer().size());
+ ByteBuffer actualByteBuffer = actual.getBuffer().getMemorySegment().wrap(0, actual.getBuffer().size());
+
+ Assert.assertEquals(0, expectedByteBuffer.compareTo(actualByteBuffer));
+ }
+
+ if (expected.getEventsSerialized() == null) {
+ Assert.assertNull(actual.getEventsSerialized());
+ }
+ else {
+ Assert.assertNotNull(actual.getEventsSerialized());
+
+ // this is needed, because the encoding of the byte buffer
+ // alters the state of the buffer
+ expected.getEventsSerialized().clear();
+
+ List<? extends AbstractEvent> expectedEvents = expected.deserializeEvents();
+ List<? extends AbstractEvent> actualEvents = actual.deserializeEvents();
+
+ Assert.assertEquals(expectedEvents.size(), actualEvents.size());
+
+ for (int i = 0; i < expectedEvents.size(); i++) {
+ AbstractEvent expectedEvent = expectedEvents.get(i);
+ AbstractEvent actualEvent = actualEvents.get(i);
+
+ Assert.assertEquals(expectedEvent.getClass(), actualEvent.getClass());
+ Assert.assertEquals(expectedEvent, actualEvent);
+ }
+ }
+ }
+
+ private static ByteBuf[] randomSlices(ByteBuf buf) {
+ List<Integer> sliceSizes = new LinkedList<Integer>();
+
+ if (buf.readableBytes() < MIN_SLICE_SIZE) {
+ throw new IllegalStateException("Buffer to slice is smaller than required minimum slice size");
+ }
+
+ int available = buf.readableBytes() - MIN_SLICE_SIZE;
+
+ while (available > 0) {
+ int size = Math.min(available, Math.max(MIN_SLICE_SIZE, random.nextInt(MAX_SLICE_SIZE) + 1));
+ available -= size;
+ sliceSizes.add(size);
+ }
+
+ int[] slices = new int[sliceSizes.size()];
+ for (int i = 0; i < sliceSizes.size(); i++) {
+ slices[i] = sliceSizes.get(i);
+ }
+
+ return slice(buf, slices);
+ }
+
+ /**
+ * Returns slices with the specified sizes of the given buffer.
+ * <p/>
+ * When given n indexes, n+1 slices will be returned:
+ * <ul>
+ * <li>0 - sliceSizes[0]</li>
+ * <li>sliceSizes[0] - sliceSizes[1]</li>
+ * <li>...</li>
+ * <li>sliceSizes[n-1] - buf.capacity()</li>
+ * </ul>
+ *
+ * @return slices with the specified sizes of the given buffer
+ */
+ private static ByteBuf[] slice(ByteBuf buf, int... sliceSizes) {
+ if (sliceSizes.length == 0) {
+ throw new IllegalStateException("Need to provide at least one slice size");
+ }
+
+ int numSlices = sliceSizes.length;
+ // transform slice sizes to buffer indexes
+ for (int i = 1; i < numSlices; i++) {
+ sliceSizes[i] += sliceSizes[i - 1];
+ }
+
+ for (int i = 0; i < sliceSizes.length - 1; i++) {
+ if (sliceSizes[i] >= sliceSizes[i + 1] || sliceSizes[i] <= 0 || sliceSizes[i] >= buf.capacity()) {
+ throw new IllegalStateException(
+ String.format("Slice size %s are off for %s", Arrays.toString(sliceSizes), buf));
+ }
+ }
+
+ ByteBuf[] slices = new ByteBuf[numSlices + 1];
+
+ // slice at slice indexes
+ slices[0] = buf.slice(0, sliceSizes[0]).retain();
+ for (int i = 1; i < numSlices; i++) {
+ slices[i] = buf.slice(sliceSizes[i - 1], sliceSizes[i] - sliceSizes[i - 1]).retain();
+ }
+ slices[numSlices] = buf.slice(sliceSizes[numSlices - 1], buf.capacity() - sliceSizes[numSlices - 1]).retain();
+
+ return slices;
+ }
+
+ // ------------------------------------------------------------------------
+ // mocking
+ // ------------------------------------------------------------------------
+
+ private static JobID anyJobId() {
+ return Matchers.anyObject();
+ }
+
+ private static ChannelID anyChannelId() {
+ return Matchers.anyObject();
+ }
+
+ // these following two Answer classes are quite ugly, but they allow to implement a randomized
+ // test of encoding and decoding envelopes
+ private static class RandomBufferRequestAnswer implements Answer<Buffer> {
+
+ private final Random random;
+
+ private boolean forced;
+
+ private RandomBufferRequestAnswer(Random random) {
+ this.random = random;
+ }
+
+ @Override
+ public Buffer answer(InvocationOnMock invocation) throws Throwable {
+ if (this.forced) {
+ Buffer toReturn = allocBuffer((Integer) invocation.getArguments()[0]);
+ this.forced = false;
+
+ return toReturn;
+ }
+
+ return this.random.nextBoolean() ? allocBuffer((Integer) invocation.getArguments()[0]) : null;
+ }
+
+ public void forceBufferAvailable() {
+ this.forced = true;
+ }
+ }
+
+ private static class RandomBufferAvailabilityRegistrationAnswer implements Answer<BufferAvailabilityRegistration> {
+
+ private final Random random;
+
+ private final RandomBufferRequestAnswer bufferRequestAnswer;
+
+ private boolean isRegistered = false;
+
+ private int numSkipped;
+
+ private RandomBufferAvailabilityRegistrationAnswer(Random random, RandomBufferRequestAnswer bufferRequestAnswer) {
+ this.random = random;
+ this.bufferRequestAnswer = bufferRequestAnswer;
+ }
+
+ @Override
+ public BufferAvailabilityRegistration answer(InvocationOnMock invocation) throws Throwable {
+ if (this.random.nextBoolean()) {
+ this.isRegistered = true;
+ return BufferAvailabilityRegistration.REGISTERED;
+ }
+ else if (this.random.nextBoolean()) {
+ this.bufferRequestAnswer.forceBufferAvailable();
+ return BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_AVAILABLE;
+ }
+ else {
+ this.numSkipped++;
+ return BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_POOL_DESTROYED;
+ }
+ }
+
+ public Envelope[] removeSkippedEnvelopes(Envelope[] envelopes) {
+ this.random.setSeed(RANDOM_SEED);
+ Envelope[] envelopesWithoutSkipped = new Envelope[envelopes.length - this.numSkipped];
+ int numEnvelopes = 0;
+
+ for (Envelope env : envelopes) {
+ if (env.getBuffer() != null) {
+ // skip envelope if returned NOT_REGISTERED_BUFFER_POOL_DESTROYED
+ if (!this.random.nextBoolean() && !this.random.nextBoolean() && !this.random.nextBoolean()) {
+ continue;
+ }
+ }
+
+ envelopesWithoutSkipped[numEnvelopes++] = env;
+ }
+
+ return envelopesWithoutSkipped;
+ }
+
+ public boolean isRegistered() {
+ return this.isRegistered;
+ }
+
+ public void unregister() {
+ this.isRegistered = false;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static final class TestEvent1 extends AbstractEvent {
+
+ private long id;
+
+ public TestEvent1() {
+ }
+
+ public TestEvent1(long id) {
+ this.id = id;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(id);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ id = in.readLong();
+ }
+
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj.getClass() == TestEvent1.class && ((TestEvent1) obj).id == this.id;
+ }
+
+ @Override
+ public int hashCode() {
+ return ((int) id) ^ ((int) (id >>> 32));
+ }
+
+ @Override
+ public String toString() {
+ return "TestEvent1 (" + id + ")";
+ }
+ }
+
+ public static final class TestEvent2 extends AbstractEvent {
+
+ private long id;
+
+ public TestEvent2() {
+ }
+
+ public TestEvent2(long id) {
+ this.id = id;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(id);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ id = in.readLong();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj.getClass() == TestEvent2.class && ((TestEvent2) obj).id == this.id;
+ }
+
+ @Override
+ public int hashCode() {
+ return ((int) id) ^ ((int) (id >>> 32));
+ }
+
+ @Override
+ public String toString() {
+ return "TestEvent2 (" + id + ")";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java
new file mode 100644
index 0000000..c424a1f
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -0,0 +1,196 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.netty;
+
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.network.ChannelManager;
+import eu.stratosphere.runtime.io.network.Envelope;
+import eu.stratosphere.runtime.io.network.RemoteReceiver;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+@RunWith(Parameterized.class)
+public class NettyConnectionManagerTest {
+
+ private final static long RANDOM_SEED = 520346508276087l;
+
+ private final static Random random = new Random(RANDOM_SEED);
+
+ private final static int BIND_PORT = 20000;
+
+ private final static int HIGH_WATERMARK = 32 * 1024;
+
+ private int numSubtasks;
+
+ private int numToSendPerSubtask;
+
+ private int numInThreads;
+
+ private int numOutThreads;
+
+ private int numChannels;
+
+ public NettyConnectionManagerTest(int numSubtasks, int numToSendPerSubtask, int numChannels, int numInThreads, int numOutThreads) {
+ this.numSubtasks = numSubtasks;
+ this.numToSendPerSubtask = numToSendPerSubtask;
+ this.numChannels = numChannels;
+ this.numInThreads = numInThreads;
+ this.numOutThreads = numOutThreads;
+ }
+
+ @Parameterized.Parameters
+ public static Collection configure() {
+ return Arrays.asList(
+ new Object[][]{
+ {64, 4096, 1, 1, 1},
+ {128, 2048, 1, 1, 1},
+ {256, 1024, 1, 1, 1},
+ {512, 512, 1, 1, 1},
+ {64, 4096, 4, 1, 1},
+ {128, 2048, 4, 1, 1},
+ {256, 1024, 4, 1, 1},
+ {512, 512, 4, 1, 1},
+ {64, 4096, 4, 2, 2},
+ {128, 2048, 4, 2, 2},
+ {256, 1024, 4, 2, 2},
+ {512, 512, 4, 2, 2}
+ }
+ );
+ }
+
+ @Test
+ public void testEnqueueRaceAndDeadlockFreeMultipleChannels() throws Exception {
+ final InetAddress localhost = InetAddress.getLocalHost();
+ final CountDownLatch latch = new CountDownLatch(this.numSubtasks);
+
+ // --------------------------------------------------------------------
+ // setup
+ // --------------------------------------------------------------------
+ ChannelManager channelManager = mock(ChannelManager.class);
+ doAnswer(new VerifyEnvelopes(latch)).when(channelManager).dispatchFromNetwork(Matchers.<Envelope>anyObject());
+
+ NettyConnectionManager connManagerToTest = new NettyConnectionManager(channelManager, localhost,
+ BIND_PORT, HIGH_WATERMARK, this.numInThreads, this.numOutThreads, -1, -1);
+
+ NettyConnectionManager connManagerReceiver = new NettyConnectionManager(channelManager, localhost,
+ BIND_PORT + 1, HIGH_WATERMARK, this.numInThreads, this.numOutThreads, -1, -1);
+
+ // --------------------------------------------------------------------
+ // start sender threads
+ // --------------------------------------------------------------------
+ RemoteReceiver[] receivers = new RemoteReceiver[this.numChannels];
+
+ for (int i = 0; i < this.numChannels; i++) {
+ receivers[i] = new RemoteReceiver(new InetSocketAddress(localhost, BIND_PORT + 1), i);
+ }
+
+ for (int i = 0; i < this.numSubtasks; i++) {
+ RemoteReceiver receiver = receivers[random.nextInt(this.numChannels)];
+ new Thread(new SubtaskSenderThread(connManagerToTest, receiver)).start();
+ }
+
+ latch.await();
+
+ connManagerToTest.shutdown();
+ connManagerReceiver.shutdown();
+ }
+
+
+ private class VerifyEnvelopes implements Answer {
+
+ private final ConcurrentMap<ChannelID, Integer> received = new ConcurrentHashMap<ChannelID, Integer>();
+
+ private final CountDownLatch latch;
+
+ private VerifyEnvelopes(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Envelope env = (Envelope) invocation.getArguments()[0];
+
+ ChannelID channelId = env.getSource();
+ int seqNum = env.getSequenceNumber();
+
+ if (seqNum == 0) {
+ Assert.assertNull(
+ String.format("Received envelope from %s before, but current seq num is 0", channelId),
+ this.received.putIfAbsent(channelId, seqNum));
+ }
+ else {
+ Assert.assertTrue(
+ String.format("Received seq num %d from %s, but previous was not %d", seqNum, channelId, seqNum - 1),
+ this.received.replace(channelId, seqNum - 1, seqNum));
+ }
+
+ // count down the latch if all envelopes received for this source
+ if (seqNum == numToSendPerSubtask - 1) {
+ this.latch.countDown();
+ }
+
+ return null;
+ }
+ }
+
+ private class SubtaskSenderThread implements Runnable {
+
+ private final NettyConnectionManager connectionManager;
+
+ private final RemoteReceiver receiver;
+
+ private final JobID jobId = new JobID();
+
+ private final ChannelID channelId = new ChannelID();
+
+ private int seqNum = 0;
+
+ private SubtaskSenderThread(NettyConnectionManager connectionManager, RemoteReceiver receiver) {
+ this.connectionManager = connectionManager;
+ this.receiver = receiver;
+ }
+
+ @Override
+ public void run() {
+ // enqueue envelopes with ascending seq nums
+ while (this.seqNum < numToSendPerSubtask) {
+ try {
+ Envelope env = new Envelope(this.seqNum++, this.jobId, this.channelId);
+ this.connectionManager.enqueue(env, receiver);
+ } catch (IOException e) {
+ throw new RuntimeException("Unexpected exception while enqueing envelope");
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoderTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoderTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoderTest.java
new file mode 100644
index 0000000..ba4edc7
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoderTest.java
@@ -0,0 +1,97 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.netty;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.network.Envelope;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class OutboundEnvelopeEncoderTest {
+
+ private final long RANDOM_SEED = 520346508276087l;
+
+ private final Random random = new Random(RANDOM_SEED);
+
+ private static final int NUM_RANDOM_ENVELOPES = 512;
+
+ private static final int MAX_EVENTS_SIZE = 1024;
+
+ private static final int MAX_BUFFER_SIZE = 32768;
+
+ @Test
+ public void testEncodedSizeAndBufferRecycling() {
+ final ByteBuffer events = ByteBuffer.allocate(MAX_EVENTS_SIZE);
+ final MemorySegment segment = new MemorySegment(new byte[MAX_BUFFER_SIZE]);
+
+ final Buffer buffer = mock(Buffer.class);
+ when(buffer.getMemorySegment()).thenReturn(segment);
+
+ final EmbeddedChannel channel = new EmbeddedChannel(new OutboundEnvelopeEncoder());
+
+ int numBuffers = 0;
+ for (int i = 0; i < NUM_RANDOM_ENVELOPES; i++) {
+ Envelope env = new Envelope(i, new JobID(), new ChannelID());
+ int expectedEncodedMsgSize = OutboundEnvelopeEncoder.HEADER_SIZE;
+
+ if (random.nextBoolean()) {
+ int eventsSize = random.nextInt(MAX_EVENTS_SIZE + 1);
+ expectedEncodedMsgSize += eventsSize;
+
+ events.clear();
+ events.limit(eventsSize);
+
+ env.setEventsSerialized(events);
+ }
+
+ if (random.nextBoolean()) {
+ numBuffers++;
+
+ int bufferSize = random.nextInt(MAX_BUFFER_SIZE + 1);
+ when(buffer.size()).thenReturn(bufferSize);
+ env.setBuffer(buffer);
+
+ expectedEncodedMsgSize += bufferSize;
+ }
+
+ Assert.assertTrue(channel.writeOutbound(env));
+
+ // --------------------------------------------------------------------
+ // verify encoded ByteBuf size
+ // --------------------------------------------------------------------
+ ByteBuf encodedMsg = (ByteBuf) channel.readOutbound();
+ Assert.assertEquals(expectedEncodedMsgSize, encodedMsg.readableBytes());
+
+ encodedMsg.release();
+ }
+
+ // --------------------------------------------------------------------
+ // verify buffers are recycled
+ // --------------------------------------------------------------------
+ verify(buffer, times(numBuffers)).recycleBuffer();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
index c9323a6..8cda32f 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
@@ -14,6 +14,7 @@
**********************************************************************************************************************/
package eu.stratosphere.test.broadcastvars;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
import org.apache.log4j.Level;
import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
@@ -22,8 +23,6 @@ import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-tests/src/test/java/eu/stratosphere/test/clients/examples/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/clients/examples/LocalExecutorITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/clients/examples/LocalExecutorITCase.java
index 802fd0b..984ecc2 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/clients/examples/LocalExecutorITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/clients/examples/LocalExecutorITCase.java
@@ -23,9 +23,8 @@ import eu.stratosphere.client.LocalExecutor;
import eu.stratosphere.test.recordJobs.wordcount.WordCount;
import eu.stratosphere.test.testdata.WordCountData;
-
public class LocalExecutorITCase {
-
+
@Test
public void testLocalExecutorWithWordCount() {
try {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/WordCountITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/WordCountITCase.java
index 2eaa8a9..4a60836 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/WordCountITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/WordCountITCase.java
@@ -20,7 +20,7 @@ import eu.stratosphere.test.util.JavaProgramTestBase;
public class WordCountITCase extends JavaProgramTestBase {
-
+
protected String textPath;
protected String resultPath;
@@ -28,7 +28,6 @@ public class WordCountITCase extends JavaProgramTestBase {
setNumTaskManager(2);
}
-
@Override
protected void preSubmit() throws Exception {
textPath = createTempFile("text.txt", WordCountData.TEXT);
@@ -39,7 +38,7 @@ public class WordCountITCase extends JavaProgramTestBase {
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath);
}
-
+
@Override
protected void testProgram() throws Exception {
WordCount.main(new String[] { textPath, resultPath });
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
index 0a63af1..40068b7 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
@@ -16,6 +16,8 @@ package eu.stratosphere.test.iterative.nephele;
import java.io.BufferedReader;
import java.util.Collection;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.test.util.RecordAPITestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -31,8 +33,6 @@ import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.io.FileOutputFormat;
import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
@@ -425,11 +425,11 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
JobGraphUtils.connect(head, intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
- JobGraphUtils.connect(intermediate, tail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(intermediate, tail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
- JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(tail, fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(tail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
@@ -567,16 +567,16 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
JobGraphUtils.connect(intermediate, ssJoinIntermediate, ChannelType.NETWORK, DistributionPattern.POINTWISE);
ssJoinIntermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
- JobGraphUtils.connect(ssJoinIntermediate, ssTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(ssJoinIntermediate, ssTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
ssTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
- JobGraphUtils.connect(ssJoinIntermediate, wsTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(ssJoinIntermediate, wsTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
wsTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
- JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(ssTail, ssFakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(wsTail, wsFakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(ssTail, ssFakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(wsTail, wsFakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
@@ -695,12 +695,12 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
DistributionPattern.POINTWISE);
wsUpdateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
- JobGraphUtils.connect(wsUpdateIntermediate, ssTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(wsUpdateIntermediate, ssTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
ssTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
- JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(ssTail, fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(ssTail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
@@ -815,12 +815,12 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
JobGraphUtils.connect(intermediate, ssJoinIntermediate, ChannelType.NETWORK, DistributionPattern.POINTWISE);
ssJoinIntermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
- JobGraphUtils.connect(ssJoinIntermediate, wsTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(ssJoinIntermediate, wsTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
wsTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
- JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(wsTail, fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(wsTail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java
index ef83aef..ef7c9d2 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java
@@ -15,6 +15,8 @@ package eu.stratosphere.test.iterative.nephele;
import java.util.Collection;
import java.util.Iterator;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -25,8 +27,6 @@ import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.io.FileOutputFormat;
import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
@@ -232,16 +232,16 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
// --------------------------------------------------------------------------------------------------------------
// 2. EDGES
// --------------------------------------------------------------------------------------------------------------
- JobGraphUtils.connect(input, head, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(input, head, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(head, tail, ChannelType.INMEMORY, DistributionPattern.BIPARTITE);
+ JobGraphUtils.connect(head, tail, ChannelType.IN_MEMORY, DistributionPattern.BIPARTITE);
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
- JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(tail, fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(tail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
// --------------------------------------------------------------------------------------------------------------
// 3. INSTANCE SHARING
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountUnionReduceITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountUnionReduceITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountUnionReduceITCase.java
index 624fd9b..30ce102 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountUnionReduceITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountUnionReduceITCase.java
@@ -22,8 +22,6 @@ import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.io.TextInputFormat;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
-import eu.stratosphere.nephele.io.MutableUnionRecordReader;
-import eu.stratosphere.nephele.io.UnionRecordReader;
import eu.stratosphere.test.recordJobs.wordcount.WordCount.CountWords;
import eu.stratosphere.test.recordJobs.wordcount.WordCount.TokenizeLine;
import eu.stratosphere.test.testdata.WordCountData;
@@ -39,8 +37,6 @@ import eu.stratosphere.types.StringValue;
*
* @see {@link https://github.com/stratosphere/stratosphere/issues/192}
* @see {@link https://github.com/stratosphere/stratosphere/issues/124}
- * @see {@link UnionRecordReader}
- * @see {@link MutableUnionRecordReader}
*/
public class WordCountUnionReduceITCase extends RecordAPITestBase {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
new file mode 100644
index 0000000..f5beda4
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
@@ -0,0 +1,286 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.runtime;
+
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.nephele.jobgraph.JobGenericInputVertex;
+import eu.stratosphere.nephele.jobgraph.JobGraph;
+import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
+import eu.stratosphere.nephele.jobgraph.JobInputVertex;
+import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
+import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
+import eu.stratosphere.nephele.template.AbstractGenericInputTask;
+import eu.stratosphere.nephele.template.AbstractOutputTask;
+import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.runtime.io.api.RecordWriter;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.test.util.RecordAPITestBase;
+import eu.stratosphere.util.LogUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class NetworkStackNepheleITCase extends RecordAPITestBase {
+
+ private static final Log LOG = LogFactory.getLog(NetworkStackNepheleITCase.class);
+
+ private static final String DATA_VOLUME_GB_CONFIG_KEY = "data.volume.gb";
+
+ private static final String USE_FORWARDER_CONFIG_KEY = "use.forwarder";
+
+ private static final String NUM_SUBTASKS_CONFIG_KEY = "num.subtasks";
+
+ private static final String NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY = "num.subtasks.instance";
+
+ private static final String IS_SLOW_SENDER_CONFIG_KEY = "is.slow.sender";
+
+ private static final String IS_SLOW_RECEIVER_CONFIG_KEY = "is.slow.receiver";
+
+ private static final int IS_SLOW_SLEEP_MS = 10;
+
+ private static final int IS_SLOW_EVERY_NUM_RECORDS = (2 * 32 * 1024) / SpeedTestRecord.RECORD_SIZE;
+
+ // ------------------------------------------------------------------------
+
+ public NetworkStackNepheleITCase(Configuration config) {
+ super(config);
+
+ setNumTaskManager(2);
+ LogUtils.initializeDefaultConsoleLogger();
+ }
+
+ @Parameters
+ public static Collection<Object[]> getConfigurations() {
+ Object[][] configParams = new Object[][]{
+ new Object[]{1, false, false, false, 4, 2},
+ new Object[]{1, true, false, false, 4, 2},
+ new Object[]{1, true, true, false, 4, 2},
+ new Object[]{1, true, false, true, 4, 2},
+ new Object[]{2, true, false, false, 4, 2},
+ new Object[]{4, true, false, false, 4, 2},
+ new Object[]{4, true, false, false, 8, 4},
+ new Object[]{4, true, false, false, 16, 8},
+ };
+
+ List<Configuration> configs = new ArrayList<Configuration>(configParams.length);
+ for (Object[] p : configParams) {
+ Configuration config = new Configuration();
+ config.setInteger(DATA_VOLUME_GB_CONFIG_KEY, (Integer) p[0]);
+ config.setBoolean(USE_FORWARDER_CONFIG_KEY, (Boolean) p[1]);
+ config.setBoolean(IS_SLOW_SENDER_CONFIG_KEY, (Boolean) p[2]);
+ config.setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, (Boolean) p[3]);
+ config.setInteger(NUM_SUBTASKS_CONFIG_KEY, (Integer) p[4]);
+ config.setInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, (Integer) p[5]);
+
+ configs.add(config);
+ }
+
+ return toParameterList(configs);
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ protected JobGraph getJobGraph() throws Exception {
+ int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
+ boolean useForwarder = this.config.getBoolean(USE_FORWARDER_CONFIG_KEY, true);
+ boolean isSlowSender = this.config.getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
+ boolean isSlowReceiver = this.config.getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
+ int numSubtasks = this.config.getInteger(NUM_SUBTASKS_CONFIG_KEY, 1);
+ int numSubtasksPerInstance = this.config.getInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, 1);
+
+ return createJobGraph(dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, numSubtasks, numSubtasksPerInstance);
+ }
+
+ @After
+ public void calculateThroughput() {
+ if (getJobExecutionResult() != null) {
+ int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
+
+ double dataVolumeMbit = dataVolumeGb * 8192.0;
+ double runtimeSecs = getJobExecutionResult().getNetRuntime() / 1000.0;
+
+ int mbitPerSecond = (int) Math.round(dataVolumeMbit / runtimeSecs);
+
+ LOG.info(String.format("Test finished with throughput of %d MBit/s (" +
+ "runtime [secs]: %.2f, data volume [mbits]: %.2f)", mbitPerSecond, runtimeSecs, dataVolumeMbit));
+ }
+ }
+
+ private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender, boolean isSlowReceiver,
+ int numSubtasks, int numSubtasksPerInstance) throws JobGraphDefinitionException {
+
+ JobGraph jobGraph = new JobGraph("Speed Test");
+
+ JobInputVertex producer = new JobGenericInputVertex("Speed Test Producer", jobGraph);
+ producer.setInputClass(SpeedTestProducer.class);
+ producer.setNumberOfSubtasks(numSubtasks);
+ producer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
+ producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
+ producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
+
+ JobTaskVertex forwarder = null;
+ if (useForwarder) {
+ forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
+ forwarder.setTaskClass(SpeedTestForwarder.class);
+ forwarder.setNumberOfSubtasks(numSubtasks);
+ forwarder.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
+ }
+
+ JobOutputVertex consumer = new JobOutputVertex("Speed Test Consumer", jobGraph);
+ consumer.setOutputClass(SpeedTestConsumer.class);
+ consumer.setNumberOfSubtasks(numSubtasks);
+ consumer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
+ consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
+
+ if (useForwarder) {
+ producer.connectTo(forwarder, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+ forwarder.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+
+ producer.setVertexToShareInstancesWith(forwarder);
+ forwarder.setVertexToShareInstancesWith(consumer);
+ }
+ else {
+ producer.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+ producer.setVertexToShareInstancesWith(consumer);
+ }
+
+ return jobGraph;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static class SpeedTestProducer extends AbstractGenericInputTask {
+
+ private RecordWriter<SpeedTestRecord> writer;
+
+ @Override
+ public void registerInputOutput() {
+ this.writer = new RecordWriter<SpeedTestRecord>(this);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ this.writer.initializeSerializers();
+
+ // Determine the amount of data to send per subtask
+ int dataVolumeGb = getTaskConfiguration().getInteger(NetworkStackNepheleITCase.DATA_VOLUME_GB_CONFIG_KEY, 1);
+
+ long dataMbPerSubtask = (dataVolumeGb * 1024) / getCurrentNumberOfSubtasks();
+ long numRecordsToEmit = (dataMbPerSubtask * 1024 * 1024) / SpeedTestRecord.RECORD_SIZE;
+
+ LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)",
+ getIndexInSubtaskGroup() + 1, getCurrentNumberOfSubtasks(), numRecordsToEmit,
+ SpeedTestRecord.RECORD_SIZE, dataMbPerSubtask/1024.0));
+
+ boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
+
+ int numRecords = 0;
+ SpeedTestRecord record = new SpeedTestRecord();
+ for (long i = 0; i < numRecordsToEmit; i++) {
+ if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
+ Thread.sleep(IS_SLOW_SLEEP_MS);
+ }
+
+ this.writer.emit(record);
+ }
+
+ this.writer.flush();
+ }
+ }
+
+ public static class SpeedTestForwarder extends AbstractTask {
+
+ private RecordReader<SpeedTestRecord> reader;
+
+ private RecordWriter<SpeedTestRecord> writer;
+
+ @Override
+ public void registerInputOutput() {
+ this.reader = new RecordReader<SpeedTestRecord>(this, SpeedTestRecord.class);
+ this.writer = new RecordWriter<SpeedTestRecord>(this);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ this.writer.initializeSerializers();
+
+ SpeedTestRecord record;
+ while ((record = this.reader.next()) != null) {
+ this.writer.emit(record);
+ }
+
+ this.writer.flush();
+ }
+ }
+
+ public static class SpeedTestConsumer extends AbstractOutputTask {
+
+ private RecordReader<SpeedTestRecord> reader;
+
+ @Override
+ public void registerInputOutput() {
+ this.reader = new RecordReader<SpeedTestRecord>(this, SpeedTestRecord.class);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
+
+ int numRecords = 0;
+ while (this.reader.next() != null) {
+ if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
+ Thread.sleep(IS_SLOW_SLEEP_MS);
+ }
+ }
+ }
+ }
+
+ public static class SpeedTestRecord implements IOReadableWritable {
+
+ private static final int RECORD_SIZE = 128;
+
+ private final byte[] buf = new byte[RECORD_SIZE];
+
+ public SpeedTestRecord() {
+ for (int i = 0; i < RECORD_SIZE; ++i) {
+ this.buf[i] = (byte) (i % 128);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.write(this.buf);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ in.readFully(this.buf);
+ }
+ }
+}