You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:38:45 UTC
[04/19] flink git commit: [streaming] Major internal renaming and
restructure
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java
deleted file mode 100644
index 24106c1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.junit.Test;
-
-public class BarrierBufferIOTest {
-
- @Test
- public void IOTest() throws IOException, InterruptedException {
-
- BufferPool pool1 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
- BufferPool pool2 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
-
- MockInputGate myIG = new MockInputGate(new BufferPool[] { pool1, pool2 },
- new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
- // new BarrierSimulator[] { new CountBarrier(1000), new
- // CountBarrier(1000) });
-
- BarrierBuffer barrierBuffer = new BarrierBuffer(myIG,
- new BarrierBufferTest.MockReader(myIG));
-
- try {
- // long time = System.currentTimeMillis();
- for (int i = 0; i < 2000000; i++) {
- BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
- if (boe.isBuffer()) {
- boe.getBuffer().recycle();
- } else {
- barrierBuffer.processSuperstep(boe);
- }
- }
- // System.out.println("Ran for " + (System.currentTimeMillis() -
- // time));
- } catch (Exception e) {
- fail();
- } finally {
- barrierBuffer.cleanup();
- }
- }
-
- private static class RandomBarrier implements BarrierGenerator {
- private static Random rnd = new Random();
-
- double threshold;
-
- public RandomBarrier(double expectedEvery) {
- threshold = 1 / expectedEvery;
- }
-
- @Override
- public boolean isNextBarrier() {
- return rnd.nextDouble() < threshold;
- }
- }
-
- private static class CountBarrier implements BarrierGenerator {
-
- long every;
- long c = 0;
-
- public CountBarrier(long every) {
- this.every = every;
- }
-
- @Override
- public boolean isNextBarrier() {
- return c++ % every == 0;
- }
- }
-
- protected static class MockInputGate implements InputGate {
-
- private int numChannels;
- private BufferPool[] bufferPools;
- private int[] currentSupersteps;
- BarrierGenerator[] barrierGens;
- int currentChannel = 0;
- long c = 0;
-
- public MockInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
- this.numChannels = bufferPools.length;
- this.currentSupersteps = new int[numChannels];
- this.bufferPools = bufferPools;
- this.barrierGens = barrierGens;
- }
-
- @Override
- public int getNumberOfInputChannels() {
- return numChannels;
- }
-
- @Override
- public boolean isFinished() {
- return false;
- }
-
- @Override
- public void requestPartitions() throws IOException, InterruptedException {
- }
-
- @Override
- public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
- currentChannel = (currentChannel + 1) % numChannels;
-
- if (barrierGens[currentChannel].isNextBarrier()) {
- return BarrierBufferTest.createSuperstep(++currentSupersteps[currentChannel],
- currentChannel);
- } else {
- Buffer buffer = bufferPools[currentChannel].requestBuffer();
- buffer.getMemorySegment().putLong(0, c++);
-
- return new BufferOrEvent(buffer, currentChannel);
- }
-
- }
-
- @Override
- public void sendTaskEvent(TaskEvent event) throws IOException {
- }
-
- @Override
- public void registerListener(EventListener<InputGate> listener) {
- }
-
- }
-
- protected interface BarrierGenerator {
- public boolean isNextBarrier();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
deleted file mode 100644
index 62feb35..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
-import org.junit.Test;
-
-public class BarrierBufferTest {
-
- @Test
- public void testWithoutBarriers() throws IOException, InterruptedException {
-
- List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
- input.add(createBuffer(0));
- input.add(createBuffer(0));
- input.add(createBuffer(0));
-
- InputGate mockIG = new MockInputGate(1, input);
- AbstractReader mockAR = new MockReader(mockIG);
-
- BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
-
- assertEquals(input.get(0), bb.getNextNonBlocked());
- assertEquals(input.get(1), bb.getNextNonBlocked());
- assertEquals(input.get(2), bb.getNextNonBlocked());
-
- bb.cleanup();
- }
-
- @Test
- public void testOneChannelBarrier() throws IOException, InterruptedException {
-
- List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
- input.add(createBuffer(0));
- input.add(createBuffer(0));
- input.add(createSuperstep(1, 0));
- input.add(createBuffer(0));
- input.add(createBuffer(0));
- input.add(createSuperstep(2, 0));
- input.add(createBuffer(0));
-
- InputGate mockIG = new MockInputGate(1, input);
- AbstractReader mockAR = new MockReader(mockIG);
-
- BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
- BufferOrEvent nextBoe;
-
- assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked());
- assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked());
- assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked());
- bb.processSuperstep(nextBoe);
- assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked());
- assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked());
- assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked());
- bb.processSuperstep(nextBoe);
- assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
-
- bb.cleanup();
- }
-
- @Test
- public void testMultiChannelBarrier() throws IOException, InterruptedException {
-
- List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
- input.add(createBuffer(0));
- input.add(createBuffer(1));
- input.add(createSuperstep(1, 0));
- input.add(createSuperstep(2, 0));
- input.add(createBuffer(0));
- input.add(createSuperstep(3, 0));
- input.add(createBuffer(0));
- input.add(createBuffer(1));
- input.add(createSuperstep(1, 1));
- input.add(createBuffer(0));
- input.add(createBuffer(1));
- input.add(createSuperstep(2, 1));
- input.add(createSuperstep(3, 1));
- input.add(createSuperstep(4, 0));
- input.add(createBuffer(0));
- input.add(new BufferOrEvent(new EndOfPartitionEvent(), 1));
-
-
- InputGate mockIG1 = new MockInputGate(2, input);
- AbstractReader mockAR1 = new MockReader(mockIG1);
-
- BarrierBuffer bb = new BarrierBuffer(mockIG1, mockAR1);
- BufferOrEvent nextBoe;
-
- check(input.get(0), nextBoe = bb.getNextNonBlocked());
- check(input.get(1), nextBoe = bb.getNextNonBlocked());
- check(input.get(2), nextBoe = bb.getNextNonBlocked());
- bb.processSuperstep(nextBoe);
- check(input.get(7), nextBoe = bb.getNextNonBlocked());
- check(input.get(8), nextBoe = bb.getNextNonBlocked());
- bb.processSuperstep(nextBoe);
- check(input.get(3), nextBoe = bb.getNextNonBlocked());
- bb.processSuperstep(nextBoe);
- check(input.get(10), nextBoe = bb.getNextNonBlocked());
- check(input.get(11), nextBoe = bb.getNextNonBlocked());
- bb.processSuperstep(nextBoe);
- check(input.get(4), nextBoe = bb.getNextNonBlocked());
- check(input.get(5), nextBoe = bb.getNextNonBlocked());
- bb.processSuperstep(nextBoe);
- check(input.get(12), nextBoe = bb.getNextNonBlocked());
- bb.processSuperstep(nextBoe);
- check(input.get(6), nextBoe = bb.getNextNonBlocked());
- check(input.get(9), nextBoe = bb.getNextNonBlocked());
- check(input.get(13), nextBoe = bb.getNextNonBlocked());
- bb.processSuperstep(nextBoe);
- check(input.get(14), nextBoe = bb.getNextNonBlocked());
- check(input.get(15), nextBoe = bb.getNextNonBlocked());
-
- bb.cleanup();
- }
-
- private static void check(BufferOrEvent expected, BufferOrEvent actual) {
- assertEquals(expected.isBuffer(), actual.isBuffer());
- assertEquals(expected.getChannelIndex(), actual.getChannelIndex());
- if (expected.isEvent()) {
- assertEquals(expected.getEvent(), actual.getEvent());
- }
- }
-
- protected static class MockInputGate implements InputGate {
-
- private int numChannels;
- private Queue<BufferOrEvent> boes;
-
- public MockInputGate(int numChannels, List<BufferOrEvent> boes) {
- this.numChannels = numChannels;
- this.boes = new LinkedList<BufferOrEvent>(boes);
- }
-
- @Override
- public int getNumberOfInputChannels() {
- return numChannels;
- }
-
- @Override
- public boolean isFinished() {
- return boes.isEmpty();
- }
-
- @Override
- public void requestPartitions() throws IOException, InterruptedException {
- }
-
- @Override
- public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
- return boes.remove();
- }
-
- @Override
- public void sendTaskEvent(TaskEvent event) throws IOException {
- }
-
- @Override
- public void registerListener(EventListener<InputGate> listener) {
- }
-
- }
-
- protected static class MockReader extends AbstractReader {
-
- protected MockReader(InputGate inputGate) {
- super(inputGate);
- }
-
- }
-
- protected static BufferOrEvent createSuperstep(long id, int channel) {
- return new BufferOrEvent(new StreamingSuperstep(id), channel);
- }
-
- protected static BufferOrEvent createBuffer(int channel) {
- return new BufferOrEvent(new Buffer(new MemorySegment(new byte[] { 1 }),
- new BufferRecycler() {
-
- @Override
- public void recycle(MemorySegment memorySegment) {
- }
- }), channel);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java
deleted file mode 100644
index 1e57d14..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.streaming.io.BarrierBufferTest.MockInputGate;
-import org.junit.Test;
-
-public class CoRecordReaderTest {
-
- @Test
- public void test() throws InterruptedException, IOException {
-
- List<BufferOrEvent> input1 = new LinkedList<BufferOrEvent>();
- input1.add(BarrierBufferTest.createBuffer(0));
- input1.add(BarrierBufferTest.createSuperstep(1, 0));
- input1.add(BarrierBufferTest.createBuffer(0));
-
- InputGate ig1 = new MockInputGate(1, input1);
-
- List<BufferOrEvent> input2 = new LinkedList<BufferOrEvent>();
- input2.add(BarrierBufferTest.createBuffer(0));
- input2.add(BarrierBufferTest.createBuffer(0));
- input2.add(BarrierBufferTest.createSuperstep(1, 0));
- input2.add(BarrierBufferTest.createBuffer(0));
-
- InputGate ig2 = new MockInputGate(1, input2);
-
- CoRecordReader<?, ?> coReader = new CoRecordReader<IOReadableWritable, IOReadableWritable>(
- ig1, ig2);
- BarrierBuffer b1 = coReader.barrierBuffer1;
- BarrierBuffer b2 = coReader.barrierBuffer2;
-
- coReader.addToAvailable(ig1);
- coReader.addToAvailable(ig2);
- coReader.addToAvailable(ig2);
- coReader.addToAvailable(ig1);
-
- assertEquals(1, coReader.getNextReaderIndexBlocking());
- b1.getNextNonBlocked();
-
- assertEquals(2, coReader.getNextReaderIndexBlocking());
- b2.getNextNonBlocked();
-
- assertEquals(2, coReader.getNextReaderIndexBlocking());
- b2.getNextNonBlocked();
-
- assertEquals(1, coReader.getNextReaderIndexBlocking());
- b1.getNextNonBlocked();
- b1.processSuperstep(input1.get(1));
-
- coReader.addToAvailable(ig1);
- coReader.addToAvailable(ig2);
- coReader.addToAvailable(ig2);
-
- assertEquals(2, coReader.getNextReaderIndexBlocking());
- b2.getNextNonBlocked();
- b2.processSuperstep(input2.get(2));
-
- assertEquals(1, coReader.getNextReaderIndexBlocking());
- b1.getNextNonBlocked();
-
- assertEquals(2, coReader.getNextReaderIndexBlocking());
- b2.getNextNonBlocked();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java
deleted file mode 100644
index 30fc820..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.junit.Test;
-
-public class SpillingBufferOrEventTest {
-
- @Test
- public void testSpilling() throws IOException, InterruptedException {
- BufferSpiller bsp = new BufferSpiller();
- SpillReader spr = new SpillReader();
-
- BufferPool pool1 = new NetworkBufferPool(10, 256).createBufferPool(2, true);
- BufferPool pool2 = new NetworkBufferPool(10, 256).createBufferPool(2, true);
-
- Buffer b1 = pool1.requestBuffer();
- b1.getMemorySegment().putInt(0, 10000);
- BufferOrEvent boe1 = new BufferOrEvent(b1, 2);
- SpillingBufferOrEvent sboe1 = new SpillingBufferOrEvent(boe1, bsp, spr);
-
- assertTrue(sboe1.isSpilled());
-
- Buffer b2 = pool2.requestBuffer();
- b2.getMemorySegment().putInt(0, 10000);
- BufferOrEvent boe2 = new BufferOrEvent(b2, 4);
- SpillingBufferOrEvent sboe2 = new SpillingBufferOrEvent(boe2, bsp, spr);
-
- assertTrue(sboe2.isSpilled());
-
- Buffer b3 = pool1.requestBuffer();
- b3.getMemorySegment().putInt(0, 50000);
- BufferOrEvent boe3 = new BufferOrEvent(b3, 0);
- SpillingBufferOrEvent sboe3 = new SpillingBufferOrEvent(boe3, bsp, spr);
-
- assertTrue(sboe3.isSpilled());
-
- Buffer b4 = pool2.requestBuffer();
- b4.getMemorySegment().putInt(0, 60000);
- BufferOrEvent boe4 = new BufferOrEvent(b4, 0);
- SpillingBufferOrEvent sboe4 = new SpillingBufferOrEvent(boe4, bsp, spr);
-
- assertTrue(sboe4.isSpilled());
-
- bsp.close();
-
- spr.setSpillFile(bsp.getSpillFile());
-
- Buffer b1ret = sboe1.getBufferOrEvent().getBuffer();
- assertEquals(10000, b1ret.getMemorySegment().getInt(0));
- assertEquals(2, sboe1.getBufferOrEvent().getChannelIndex());
- b1ret.recycle();
-
- Buffer b2ret = sboe2.getBufferOrEvent().getBuffer();
- assertEquals(10000, b2ret.getMemorySegment().getInt(0));
- assertEquals(4, sboe2.getBufferOrEvent().getChannelIndex());
- b2ret.recycle();
-
- Buffer b3ret = sboe3.getBufferOrEvent().getBuffer();
- assertEquals(50000, b3ret.getMemorySegment().getInt(0));
- assertEquals(0, sboe3.getBufferOrEvent().getChannelIndex());
- b3ret.recycle();
-
- Buffer b4ret = sboe4.getBufferOrEvent().getBuffer();
- assertEquals(60000, b4ret.getMemorySegment().getInt(0));
- b4ret.recycle();
-
- spr.close();
- bsp.getSpillFile().delete();
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java
deleted file mode 100644
index de6c200..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import static org.junit.Assert.assertArrayEquals;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class BroadcastPartitionerTest {
-
- private BroadcastPartitioner<Tuple> broadcastPartitioner1;
- private BroadcastPartitioner<Tuple> broadcastPartitioner2;
- private BroadcastPartitioner<Tuple> broadcastPartitioner3;
-
- private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
- private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(null);
-
- @Before
- public void setPartitioner() {
- broadcastPartitioner1 = new BroadcastPartitioner<Tuple>();
- broadcastPartitioner2 = new BroadcastPartitioner<Tuple>();
- broadcastPartitioner3 = new BroadcastPartitioner<Tuple>();
-
- }
-
- @Test
- public void testSelectChannels() {
- int[] first = new int[] { 0 };
- int[] second = new int[] { 0, 1 };
- int[] sixth = new int[] { 0, 1, 2, 3, 4, 5 };
- sd.setInstance(streamRecord);
- assertArrayEquals(first, broadcastPartitioner1.selectChannels(sd, 1));
- assertArrayEquals(second, broadcastPartitioner2.selectChannels(sd, 2));
- assertArrayEquals(sixth, broadcastPartitioner3.selectChannels(sd, 6));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java
deleted file mode 100644
index 0675242..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class DistributePartitionerTest {
-
- private DistributePartitioner<Tuple> distributePartitioner;
- private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
- private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
- null);
-
- @Before
- public void setPartitioner() {
- distributePartitioner = new DistributePartitioner<Tuple>(false);
- }
-
- @Test
- public void testSelectChannelsLength() {
- sd.setInstance(streamRecord);
- assertEquals(1, distributePartitioner.selectChannels(sd, 1).length);
- assertEquals(1, distributePartitioner.selectChannels(sd, 2).length);
- assertEquals(1, distributePartitioner.selectChannels(sd, 1024).length);
- }
-
- @Test
- public void testSelectChannelsInterval() {
- sd.setInstance(streamRecord);
- assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
- assertEquals(1, distributePartitioner.selectChannels(sd, 3)[0]);
- assertEquals(2, distributePartitioner.selectChannels(sd, 3)[0]);
- assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
deleted file mode 100644
index b56649b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FieldsPartitionerTest {
-
- private FieldsPartitioner<Tuple> fieldsPartitioner;
- private StreamRecord<Tuple> streamRecord1 = new StreamRecord<Tuple>()
- .setObject(new Tuple2<String, Integer>("test", 0));
- private StreamRecord<Tuple> streamRecord2 = new StreamRecord<Tuple>()
- .setObject(new Tuple2<String, Integer>("test", 42));
- private SerializationDelegate<StreamRecord<Tuple>> sd1 = new SerializationDelegate<StreamRecord<Tuple>>(
- null);
- private SerializationDelegate<StreamRecord<Tuple>> sd2 = new SerializationDelegate<StreamRecord<Tuple>>(
- null);
-
- @Before
- public void setPartitioner() {
- fieldsPartitioner = new FieldsPartitioner<Tuple>(new KeySelector<Tuple, String>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String getKey(Tuple value) throws Exception {
- return value.getField(0);
- }
- });
- }
-
- @Test
- public void testSelectChannelsLength() {
- sd1.setInstance(streamRecord1);
- assertEquals(1, fieldsPartitioner.selectChannels(sd1, 1).length);
- assertEquals(1, fieldsPartitioner.selectChannels(sd1, 2).length);
- assertEquals(1, fieldsPartitioner.selectChannels(sd1, 1024).length);
- }
-
- @Test
- public void testSelectChannelsGrouping() {
- sd1.setInstance(streamRecord1);
- sd2.setInstance(streamRecord2);
-
- assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 1),
- fieldsPartitioner.selectChannels(sd2, 1));
- assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 2),
- fieldsPartitioner.selectChannels(sd2, 2));
- assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 1024),
- fieldsPartitioner.selectChannels(sd2, 1024));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java
deleted file mode 100644
index b381d85..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ForwardPartitionerTest {
-
- private DistributePartitioner<Tuple> forwardPartitioner;
- private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
- private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
- null);
-
- @Before
- public void setPartitioner() {
- forwardPartitioner = new DistributePartitioner<Tuple>(true);
- }
-
- @Test
- public void testSelectChannelsLength() {
- sd.setInstance(streamRecord);
- assertEquals(1, forwardPartitioner.selectChannels(sd, 1).length);
- assertEquals(1, forwardPartitioner.selectChannels(sd, 2).length);
- assertEquals(1, forwardPartitioner.selectChannels(sd, 1024).length);
- }
-
- @Test
- public void testSelectChannelsInterval() {
- sd.setInstance(streamRecord);
- assertEquals(0, forwardPartitioner.selectChannels(sd, 1)[0]);
- assertEquals(1, forwardPartitioner.selectChannels(sd, 2)[0]);
- assertEquals(2, forwardPartitioner.selectChannels(sd, 1024)[0]);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java
deleted file mode 100644
index eebda64..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import static org.junit.Assert.assertArrayEquals;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class GlobalPartitionerTest {
-
- private GlobalPartitioner<Tuple> globalPartitioner;
- private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
- private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
- null);
-
- @Before
- public void setPartitioner() {
- globalPartitioner = new GlobalPartitioner<Tuple>();
- }
-
- @Test
- public void testSelectChannels() {
- int[] result = new int[] { 0 };
-
- sd.setInstance(streamRecord);
-
- assertArrayEquals(result, globalPartitioner.selectChannels(sd, 1));
- assertArrayEquals(result, globalPartitioner.selectChannels(sd, 2));
- assertArrayEquals(result, globalPartitioner.selectChannels(sd, 1024));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java
deleted file mode 100644
index 3c03d07..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ShufflePartitionerTest {
-
- private ShufflePartitioner<Tuple> shufflePartitioner;
- private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
- private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
- null);
-
- @Before
- public void setPartitioner() {
- shufflePartitioner = new ShufflePartitioner<Tuple>();
- }
-
- @Test
- public void testSelectChannelsLength() {
- sd.setInstance(streamRecord);
- assertEquals(1, shufflePartitioner.selectChannels(sd, 1).length);
- assertEquals(1, shufflePartitioner.selectChannels(sd, 2).length);
- assertEquals(1, shufflePartitioner.selectChannels(sd, 1024).length);
- }
-
- @Test
- public void testSelectChannelsInterval() {
- sd.setInstance(streamRecord);
- assertEquals(0, shufflePartitioner.selectChannels(sd, 1)[0]);
-
- assertTrue(0 <= shufflePartitioner.selectChannels(sd, 2)[0]);
- assertTrue(2 > shufflePartitioner.selectChannels(sd, 2)[0]);
-
- assertTrue(0 <= shufflePartitioner.selectChannels(sd, 1024)[0]);
- assertTrue(1024 > shufflePartitioner.selectChannels(sd, 1024)[0]);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
new file mode 100644
index 0000000..3f8401d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.runtime.io.BarrierBuffer;
+import org.junit.Test;
+
+public class BarrierBufferIOTest {
+
+ @Test
+ public void IOTest() throws IOException, InterruptedException {
+
+ BufferPool pool1 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+ BufferPool pool2 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+
+ MockInputGate myIG = new MockInputGate(new BufferPool[] { pool1, pool2 },
+ new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
+ // new BarrierSimulator[] { new CountBarrier(1000), new
+ // CountBarrier(1000) });
+
+ BarrierBuffer barrierBuffer = new BarrierBuffer(myIG,
+ new BarrierBufferTest.MockReader(myIG));
+
+ try {
+ // long time = System.currentTimeMillis();
+ for (int i = 0; i < 2000000; i++) {
+ BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
+ if (boe.isBuffer()) {
+ boe.getBuffer().recycle();
+ } else {
+ barrierBuffer.processSuperstep(boe);
+ }
+ }
+ // System.out.println("Ran for " + (System.currentTimeMillis() -
+ // time));
+ } catch (Exception e) {
+ fail();
+ } finally {
+ barrierBuffer.cleanup();
+ }
+ }
+
+ private static class RandomBarrier implements BarrierGenerator {
+ private static Random rnd = new Random();
+
+ double threshold;
+
+ public RandomBarrier(double expectedEvery) {
+ threshold = 1 / expectedEvery;
+ }
+
+ @Override
+ public boolean isNextBarrier() {
+ return rnd.nextDouble() < threshold;
+ }
+ }
+
+ private static class CountBarrier implements BarrierGenerator {
+
+ long every;
+ long c = 0;
+
+ public CountBarrier(long every) {
+ this.every = every;
+ }
+
+ @Override
+ public boolean isNextBarrier() {
+ return c++ % every == 0;
+ }
+ }
+
+ protected static class MockInputGate implements InputGate {
+
+ private int numChannels;
+ private BufferPool[] bufferPools;
+ private int[] currentSupersteps;
+ BarrierGenerator[] barrierGens;
+ int currentChannel = 0;
+ long c = 0;
+
+ public MockInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
+ this.numChannels = bufferPools.length;
+ this.currentSupersteps = new int[numChannels];
+ this.bufferPools = bufferPools;
+ this.barrierGens = barrierGens;
+ }
+
+ @Override
+ public int getNumberOfInputChannels() {
+ return numChannels;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return false;
+ }
+
+ @Override
+ public void requestPartitions() throws IOException, InterruptedException {
+ }
+
+ @Override
+ public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
+ currentChannel = (currentChannel + 1) % numChannels;
+
+ if (barrierGens[currentChannel].isNextBarrier()) {
+ return BarrierBufferTest.createSuperstep(++currentSupersteps[currentChannel],
+ currentChannel);
+ } else {
+ Buffer buffer = bufferPools[currentChannel].requestBuffer();
+ buffer.getMemorySegment().putLong(0, c++);
+
+ return new BufferOrEvent(buffer, currentChannel);
+ }
+
+ }
+
+ @Override
+ public void sendTaskEvent(TaskEvent event) throws IOException {
+ }
+
+ @Override
+ public void registerListener(EventListener<InputGate> listener) {
+ }
+
+ }
+
+ protected interface BarrierGenerator {
+ public boolean isNextBarrier();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
new file mode 100644
index 0000000..b8af4ed
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.runtime.io.BarrierBuffer;
+import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
+import org.junit.Test;
+
+public class BarrierBufferTest {
+
+ @Test
+ public void testWithoutBarriers() throws IOException, InterruptedException {
+
+ List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
+ input.add(createBuffer(0));
+ input.add(createBuffer(0));
+ input.add(createBuffer(0));
+
+ InputGate mockIG = new MockInputGate(1, input);
+ AbstractReader mockAR = new MockReader(mockIG);
+
+ BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
+
+ assertEquals(input.get(0), bb.getNextNonBlocked());
+ assertEquals(input.get(1), bb.getNextNonBlocked());
+ assertEquals(input.get(2), bb.getNextNonBlocked());
+
+ bb.cleanup();
+ }
+
+ @Test
+ public void testOneChannelBarrier() throws IOException, InterruptedException {
+
+ List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
+ input.add(createBuffer(0));
+ input.add(createBuffer(0));
+ input.add(createSuperstep(1, 0));
+ input.add(createBuffer(0));
+ input.add(createBuffer(0));
+ input.add(createSuperstep(2, 0));
+ input.add(createBuffer(0));
+
+ InputGate mockIG = new MockInputGate(1, input);
+ AbstractReader mockAR = new MockReader(mockIG);
+
+ BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
+ BufferOrEvent nextBoe;
+
+ assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked());
+ assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked());
+ assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked());
+ bb.processSuperstep(nextBoe);
+ assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked());
+ assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked());
+ assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked());
+ bb.processSuperstep(nextBoe);
+ assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
+
+ bb.cleanup();
+ }
+
+ @Test
+ public void testMultiChannelBarrier() throws IOException, InterruptedException {
+
+ List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
+ input.add(createBuffer(0));
+ input.add(createBuffer(1));
+ input.add(createSuperstep(1, 0));
+ input.add(createSuperstep(2, 0));
+ input.add(createBuffer(0));
+ input.add(createSuperstep(3, 0));
+ input.add(createBuffer(0));
+ input.add(createBuffer(1));
+ input.add(createSuperstep(1, 1));
+ input.add(createBuffer(0));
+ input.add(createBuffer(1));
+ input.add(createSuperstep(2, 1));
+ input.add(createSuperstep(3, 1));
+ input.add(createSuperstep(4, 0));
+ input.add(createBuffer(0));
+ input.add(new BufferOrEvent(new EndOfPartitionEvent(), 1));
+
+
+ InputGate mockIG1 = new MockInputGate(2, input);
+ AbstractReader mockAR1 = new MockReader(mockIG1);
+
+ BarrierBuffer bb = new BarrierBuffer(mockIG1, mockAR1);
+ BufferOrEvent nextBoe;
+
+ check(input.get(0), nextBoe = bb.getNextNonBlocked());
+ check(input.get(1), nextBoe = bb.getNextNonBlocked());
+ check(input.get(2), nextBoe = bb.getNextNonBlocked());
+ bb.processSuperstep(nextBoe);
+ check(input.get(7), nextBoe = bb.getNextNonBlocked());
+ check(input.get(8), nextBoe = bb.getNextNonBlocked());
+ bb.processSuperstep(nextBoe);
+ check(input.get(3), nextBoe = bb.getNextNonBlocked());
+ bb.processSuperstep(nextBoe);
+ check(input.get(10), nextBoe = bb.getNextNonBlocked());
+ check(input.get(11), nextBoe = bb.getNextNonBlocked());
+ bb.processSuperstep(nextBoe);
+ check(input.get(4), nextBoe = bb.getNextNonBlocked());
+ check(input.get(5), nextBoe = bb.getNextNonBlocked());
+ bb.processSuperstep(nextBoe);
+ check(input.get(12), nextBoe = bb.getNextNonBlocked());
+ bb.processSuperstep(nextBoe);
+ check(input.get(6), nextBoe = bb.getNextNonBlocked());
+ check(input.get(9), nextBoe = bb.getNextNonBlocked());
+ check(input.get(13), nextBoe = bb.getNextNonBlocked());
+ bb.processSuperstep(nextBoe);
+ check(input.get(14), nextBoe = bb.getNextNonBlocked());
+ check(input.get(15), nextBoe = bb.getNextNonBlocked());
+
+ bb.cleanup();
+ }
+
+ private static void check(BufferOrEvent expected, BufferOrEvent actual) {
+ assertEquals(expected.isBuffer(), actual.isBuffer());
+ assertEquals(expected.getChannelIndex(), actual.getChannelIndex());
+ if (expected.isEvent()) {
+ assertEquals(expected.getEvent(), actual.getEvent());
+ }
+ }
+
+ protected static class MockInputGate implements InputGate {
+
+ private int numChannels;
+ private Queue<BufferOrEvent> boes;
+
+ public MockInputGate(int numChannels, List<BufferOrEvent> boes) {
+ this.numChannels = numChannels;
+ this.boes = new LinkedList<BufferOrEvent>(boes);
+ }
+
+ @Override
+ public int getNumberOfInputChannels() {
+ return numChannels;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return boes.isEmpty();
+ }
+
+ @Override
+ public void requestPartitions() throws IOException, InterruptedException {
+ }
+
+ @Override
+ public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
+ return boes.remove();
+ }
+
+ @Override
+ public void sendTaskEvent(TaskEvent event) throws IOException {
+ }
+
+ @Override
+ public void registerListener(EventListener<InputGate> listener) {
+ }
+
+ }
+
+ protected static class MockReader extends AbstractReader {
+
+ protected MockReader(InputGate inputGate) {
+ super(inputGate);
+ }
+
+ }
+
+ protected static BufferOrEvent createSuperstep(long id, int channel) {
+ return new BufferOrEvent(new StreamingSuperstep(id), channel);
+ }
+
+ protected static BufferOrEvent createBuffer(int channel) {
+ return new BufferOrEvent(new Buffer(new MemorySegment(new byte[] { 1 }),
+ new BufferRecycler() {
+
+ @Override
+ public void recycle(MemorySegment memorySegment) {
+ }
+ }), channel);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java
new file mode 100644
index 0000000..528829d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.streaming.runtime.io.BarrierBuffer;
+import org.apache.flink.streaming.runtime.io.CoRecordReader;
+import org.apache.flink.streaming.runtime.io.BarrierBufferTest.MockInputGate;
+import org.junit.Test;
+
+public class CoRecordReaderTest {
+
+ @Test
+ public void test() throws InterruptedException, IOException {
+
+ List<BufferOrEvent> input1 = new LinkedList<BufferOrEvent>();
+ input1.add(BarrierBufferTest.createBuffer(0));
+ input1.add(BarrierBufferTest.createSuperstep(1, 0));
+ input1.add(BarrierBufferTest.createBuffer(0));
+
+ InputGate ig1 = new MockInputGate(1, input1);
+
+ List<BufferOrEvent> input2 = new LinkedList<BufferOrEvent>();
+ input2.add(BarrierBufferTest.createBuffer(0));
+ input2.add(BarrierBufferTest.createBuffer(0));
+ input2.add(BarrierBufferTest.createSuperstep(1, 0));
+ input2.add(BarrierBufferTest.createBuffer(0));
+
+ InputGate ig2 = new MockInputGate(1, input2);
+
+ CoRecordReader<?, ?> coReader = new CoRecordReader<IOReadableWritable, IOReadableWritable>(
+ ig1, ig2);
+ BarrierBuffer b1 = coReader.barrierBuffer1;
+ BarrierBuffer b2 = coReader.barrierBuffer2;
+
+ coReader.addToAvailable(ig1);
+ coReader.addToAvailable(ig2);
+ coReader.addToAvailable(ig2);
+ coReader.addToAvailable(ig1);
+
+ assertEquals(1, coReader.getNextReaderIndexBlocking());
+ b1.getNextNonBlocked();
+
+ assertEquals(2, coReader.getNextReaderIndexBlocking());
+ b2.getNextNonBlocked();
+
+ assertEquals(2, coReader.getNextReaderIndexBlocking());
+ b2.getNextNonBlocked();
+
+ assertEquals(1, coReader.getNextReaderIndexBlocking());
+ b1.getNextNonBlocked();
+ b1.processSuperstep(input1.get(1));
+
+ coReader.addToAvailable(ig1);
+ coReader.addToAvailable(ig2);
+ coReader.addToAvailable(ig2);
+
+ assertEquals(2, coReader.getNextReaderIndexBlocking());
+ b2.getNextNonBlocked();
+ b2.processSuperstep(input2.get(2));
+
+ assertEquals(1, coReader.getNextReaderIndexBlocking());
+ b1.getNextNonBlocked();
+
+ assertEquals(2, coReader.getNextReaderIndexBlocking());
+ b2.getNextNonBlocked();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
new file mode 100644
index 0000000..e0fab17
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.streaming.runtime.io.BufferSpiller;
+import org.apache.flink.streaming.runtime.io.SpillReader;
+import org.apache.flink.streaming.runtime.io.SpillingBufferOrEvent;
+import org.junit.Test;
+
+public class SpillingBufferOrEventTest {
+
+ @Test
+ public void testSpilling() throws IOException, InterruptedException {
+ BufferSpiller bsp = new BufferSpiller();
+ SpillReader spr = new SpillReader();
+
+ BufferPool pool1 = new NetworkBufferPool(10, 256).createBufferPool(2, true);
+ BufferPool pool2 = new NetworkBufferPool(10, 256).createBufferPool(2, true);
+
+ Buffer b1 = pool1.requestBuffer();
+ b1.getMemorySegment().putInt(0, 10000);
+ BufferOrEvent boe1 = new BufferOrEvent(b1, 2);
+ SpillingBufferOrEvent sboe1 = new SpillingBufferOrEvent(boe1, bsp, spr);
+
+ assertTrue(sboe1.isSpilled());
+
+ Buffer b2 = pool2.requestBuffer();
+ b2.getMemorySegment().putInt(0, 10000);
+ BufferOrEvent boe2 = new BufferOrEvent(b2, 4);
+ SpillingBufferOrEvent sboe2 = new SpillingBufferOrEvent(boe2, bsp, spr);
+
+ assertTrue(sboe2.isSpilled());
+
+ Buffer b3 = pool1.requestBuffer();
+ b3.getMemorySegment().putInt(0, 50000);
+ BufferOrEvent boe3 = new BufferOrEvent(b3, 0);
+ SpillingBufferOrEvent sboe3 = new SpillingBufferOrEvent(boe3, bsp, spr);
+
+ assertTrue(sboe3.isSpilled());
+
+ Buffer b4 = pool2.requestBuffer();
+ b4.getMemorySegment().putInt(0, 60000);
+ BufferOrEvent boe4 = new BufferOrEvent(b4, 0);
+ SpillingBufferOrEvent sboe4 = new SpillingBufferOrEvent(boe4, bsp, spr);
+
+ assertTrue(sboe4.isSpilled());
+
+ bsp.close();
+
+ spr.setSpillFile(bsp.getSpillFile());
+
+ Buffer b1ret = sboe1.getBufferOrEvent().getBuffer();
+ assertEquals(10000, b1ret.getMemorySegment().getInt(0));
+ assertEquals(2, sboe1.getBufferOrEvent().getChannelIndex());
+ b1ret.recycle();
+
+ Buffer b2ret = sboe2.getBufferOrEvent().getBuffer();
+ assertEquals(10000, b2ret.getMemorySegment().getInt(0));
+ assertEquals(4, sboe2.getBufferOrEvent().getChannelIndex());
+ b2ret.recycle();
+
+ Buffer b3ret = sboe3.getBufferOrEvent().getBuffer();
+ assertEquals(50000, b3ret.getMemorySegment().getInt(0));
+ assertEquals(0, sboe3.getBufferOrEvent().getChannelIndex());
+ b3ret.recycle();
+
+ Buffer b4ret = sboe4.getBufferOrEvent().getBuffer();
+ assertEquals(60000, b4ret.getMemorySegment().getInt(0));
+ b4ret.recycle();
+
+ spr.close();
+ bsp.getSpillFile().delete();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
new file mode 100644
index 0000000..aa4d24a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BroadcastPartitionerTest {
+
+ private BroadcastPartitioner<Tuple> broadcastPartitioner1;
+ private BroadcastPartitioner<Tuple> broadcastPartitioner2;
+ private BroadcastPartitioner<Tuple> broadcastPartitioner3;
+
+ private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+ private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(null);
+
+ @Before
+ public void setPartitioner() {
+ broadcastPartitioner1 = new BroadcastPartitioner<Tuple>();
+ broadcastPartitioner2 = new BroadcastPartitioner<Tuple>();
+ broadcastPartitioner3 = new BroadcastPartitioner<Tuple>();
+
+ }
+
+ @Test
+ public void testSelectChannels() {
+ int[] first = new int[] { 0 };
+ int[] second = new int[] { 0, 1 };
+ int[] sixth = new int[] { 0, 1, 2, 3, 4, 5 };
+ sd.setInstance(streamRecord);
+ assertArrayEquals(first, broadcastPartitioner1.selectChannels(sd, 1));
+ assertArrayEquals(second, broadcastPartitioner2.selectChannels(sd, 2));
+ assertArrayEquals(sixth, broadcastPartitioner3.selectChannels(sd, 6));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
new file mode 100644
index 0000000..37638cf
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.DistributePartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DistributePartitionerTest {
+
+ private DistributePartitioner<Tuple> distributePartitioner;
+ private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+ private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
+ null);
+
+ @Before
+ public void setPartitioner() {
+ distributePartitioner = new DistributePartitioner<Tuple>(false);
+ }
+
+ @Test
+ public void testSelectChannelsLength() {
+ sd.setInstance(streamRecord);
+ assertEquals(1, distributePartitioner.selectChannels(sd, 1).length);
+ assertEquals(1, distributePartitioner.selectChannels(sd, 2).length);
+ assertEquals(1, distributePartitioner.selectChannels(sd, 1024).length);
+ }
+
+ @Test
+ public void testSelectChannelsInterval() {
+ sd.setInstance(streamRecord);
+ assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
+ assertEquals(1, distributePartitioner.selectChannels(sd, 3)[0]);
+ assertEquals(2, distributePartitioner.selectChannels(sd, 3)[0]);
+ assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
new file mode 100644
index 0000000..94d29ac
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FieldsPartitionerTest {
+
+ private FieldsPartitioner<Tuple> fieldsPartitioner;
+ private StreamRecord<Tuple> streamRecord1 = new StreamRecord<Tuple>()
+ .setObject(new Tuple2<String, Integer>("test", 0));
+ private StreamRecord<Tuple> streamRecord2 = new StreamRecord<Tuple>()
+ .setObject(new Tuple2<String, Integer>("test", 42));
+ private SerializationDelegate<StreamRecord<Tuple>> sd1 = new SerializationDelegate<StreamRecord<Tuple>>(
+ null);
+ private SerializationDelegate<StreamRecord<Tuple>> sd2 = new SerializationDelegate<StreamRecord<Tuple>>(
+ null);
+
+ @Before
+ public void setPartitioner() {
+ fieldsPartitioner = new FieldsPartitioner<Tuple>(new KeySelector<Tuple, String>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Tuple value) throws Exception {
+ return value.getField(0);
+ }
+ });
+ }
+
+ @Test
+ public void testSelectChannelsLength() {
+ sd1.setInstance(streamRecord1);
+ assertEquals(1, fieldsPartitioner.selectChannels(sd1, 1).length);
+ assertEquals(1, fieldsPartitioner.selectChannels(sd1, 2).length);
+ assertEquals(1, fieldsPartitioner.selectChannels(sd1, 1024).length);
+ }
+
+ @Test
+ public void testSelectChannelsGrouping() {
+ sd1.setInstance(streamRecord1);
+ sd2.setInstance(streamRecord2);
+
+ assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 1),
+ fieldsPartitioner.selectChannels(sd2, 1));
+ assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 2),
+ fieldsPartitioner.selectChannels(sd2, 2));
+ assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 1024),
+ fieldsPartitioner.selectChannels(sd2, 1024));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
new file mode 100644
index 0000000..c0d39da
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.DistributePartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ForwardPartitionerTest {
+
+ private DistributePartitioner<Tuple> forwardPartitioner;
+ private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+ private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
+ null);
+
+ @Before
+ public void setPartitioner() {
+ forwardPartitioner = new DistributePartitioner<Tuple>(true);
+ }
+
+ @Test
+ public void testSelectChannelsLength() {
+ sd.setInstance(streamRecord);
+ assertEquals(1, forwardPartitioner.selectChannels(sd, 1).length);
+ assertEquals(1, forwardPartitioner.selectChannels(sd, 2).length);
+ assertEquals(1, forwardPartitioner.selectChannels(sd, 1024).length);
+ }
+
+ @Test
+ public void testSelectChannelsInterval() {
+ sd.setInstance(streamRecord);
+ assertEquals(0, forwardPartitioner.selectChannels(sd, 1)[0]);
+ assertEquals(1, forwardPartitioner.selectChannels(sd, 2)[0]);
+ assertEquals(2, forwardPartitioner.selectChannels(sd, 1024)[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
new file mode 100644
index 0000000..69c00cd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+public class GlobalPartitionerTest {
+
+ private GlobalPartitioner<Tuple> globalPartitioner;
+ private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+ private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
+ null);
+
+ @Before
+ public void setPartitioner() {
+ globalPartitioner = new GlobalPartitioner<Tuple>();
+ }
+
+ @Test
+ public void testSelectChannels() {
+ int[] result = new int[] { 0 };
+
+ sd.setInstance(streamRecord);
+
+ assertArrayEquals(result, globalPartitioner.selectChannels(sd, 1));
+ assertArrayEquals(result, globalPartitioner.selectChannels(sd, 2));
+ assertArrayEquals(result, globalPartitioner.selectChannels(sd, 1024));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
new file mode 100644
index 0000000..d99a21e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ShufflePartitionerTest {
+
+ private ShufflePartitioner<Tuple> shufflePartitioner;
+ private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+ private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
+ null);
+
+ @Before
+ public void setPartitioner() {
+ shufflePartitioner = new ShufflePartitioner<Tuple>();
+ }
+
+ @Test
+ public void testSelectChannelsLength() {
+ sd.setInstance(streamRecord);
+ assertEquals(1, shufflePartitioner.selectChannels(sd, 1).length);
+ assertEquals(1, shufflePartitioner.selectChannels(sd, 2).length);
+ assertEquals(1, shufflePartitioner.selectChannels(sd, 1024).length);
+ }
+
+ @Test
+ public void testSelectChannelsInterval() {
+ sd.setInstance(streamRecord);
+ assertEquals(0, shufflePartitioner.selectChannels(sd, 1)[0]);
+
+ assertTrue(0 <= shufflePartitioner.selectChannels(sd, 2)[0]);
+ assertTrue(2 > shufflePartitioner.selectChannels(sd, 2)[0]);
+
+ assertTrue(0 <= shufflePartitioner.selectChannels(sd, 1024)[0]);
+ assertTrue(1024 > shufflePartitioner.selectChannels(sd, 1024)[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
deleted file mode 100644
index 5b98e8c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.runtime.state.OperatorState;
-import org.apache.flink.runtime.state.StateCheckpoint;
-import org.junit.Test;
-
-public class OperatorStateTest {
-
- @Test
- public void testOperatorState() {
- OperatorState<Integer> os = new OperatorState<Integer>(5);
-
- StateCheckpoint<Integer> scp = os.checkpoint();
-
- assertTrue(os.stateEquals(scp.restore()));
-
- assertEquals((Integer) 5, os.getState());
-
- os.update(10);
-
- assertEquals((Integer) 10, os.getState());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
index 98a6a8d..697c796 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
@@ -27,13 +27,13 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.streaming.io.IndexedReaderIterator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.co.CoStreamOperator;
+import org.apache.flink.streaming.runtime.io.CoReaderIterator;
+import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskContext;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
@@ -153,17 +153,17 @@ public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT> {
return mockIterator;
}
- public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoInvokable<IN1, IN2, OUT> invokable,
+ public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoStreamOperator<IN1, IN2, OUT> operator,
List<IN1> input1, List<IN2> input2) {
MockCoContext<IN1, IN2, OUT> mockContext = new MockCoContext<IN1, IN2, OUT>(input1, input2);
- invokable.setup(mockContext);
+ operator.setup(mockContext);
try {
- invokable.open(null);
- invokable.invoke();
- invokable.close();
+ operator.open(null);
+ operator.run();
+ operator.close();
} catch (Exception e) {
- throw new RuntimeException("Cannot invoke invokable.", e);
+ throw new RuntimeException("Cannot invoke operator.", e);
}
return mockContext.getOutputs();
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index af836e2..709a59a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -26,13 +26,13 @@ import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.streaming.io.IndexedReaderIterator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.io.CoReaderIterator;
+import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskContext;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
@@ -104,16 +104,16 @@ public class MockContext<IN, OUT> implements StreamTaskContext<OUT> {
return iterator;
}
- public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, OUT> invokable,
+ public static <IN, OUT> List<OUT> createAndExecute(StreamOperator<IN, OUT> operator,
List<IN> inputs) {
MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
- invokable.setup(mockContext);
+ operator.setup(mockContext);
try {
- invokable.open(null);
- invokable.invoke();
- invokable.close();
+ operator.open(null);
+ operator.run();
+ operator.close();
} catch (Exception e) {
- throw new RuntimeException("Cannot invoke invokable.", e);
+ throw new RuntimeException("Cannot invoke operator.", e);
}
return mockContext.getOutputs();