You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 10:03:49 UTC
[3/3] flink git commit: [FLINK-1706] IOTest added for BarrierBuffers
[FLINK-1706] IOTest added for BarrierBuffers
Closes #493
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ceea93d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ceea93d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ceea93d4
Branch: refs/heads/master
Commit: ceea93d41b6e8ae66cd96d03f2f27e0027f8ac4c
Parents: f625b8d
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Mar 18 23:41:08 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 09:21:11 2015 +0100
----------------------------------------------------------------------
.../api/streamvertex/InputHandler.java | 2 +-
.../api/streamvertex/StreamVertex.java | 5 +-
.../flink/streaming/io/BarrierBufferIOTest.java | 159 +++++++++++++++++++
3 files changed, 162 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ceea93d4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index 726df53..48a00d8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -91,7 +91,7 @@ public class InputHandler<IN> {
public void clearReaders() throws IOException {
if (inputs != null) {
inputs.clearBuffers();
+ inputs.cleanup();
}
- inputs.cleanup();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ceea93d4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 30b69a7..dd8a463 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -114,9 +114,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
if (configuration.getStateMonitoring() && !states.isEmpty()) {
getEnvironment().getJobManager().tell(
new StateBarrierAck(getEnvironment().getJobID(), getEnvironment()
- .getJobVertexId(), context.getIndexOfThisSubtask(), barrierID,
- new LocalStateHandle(states)),
- ActorRef.noSender());
+ .getJobVertexId(), context.getIndexOfThisSubtask(), barrierID,
+ new LocalStateHandle(states)), ActorRef.noSender());
} else {
getEnvironment().getJobManager().tell(
new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
http://git-wip-us.apache.org/repos/asf/flink/blob/ceea93d4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java
new file mode 100644
index 0000000..24106c1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.junit.Test;
+
+public class BarrierBufferIOTest {
+
+ @Test
+ public void IOTest() throws IOException, InterruptedException {
+
+ BufferPool pool1 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+ BufferPool pool2 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+
+ MockInputGate myIG = new MockInputGate(new BufferPool[] { pool1, pool2 },
+ new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
+ // new BarrierSimulator[] { new CountBarrier(1000), new
+ // CountBarrier(1000) });
+
+ BarrierBuffer barrierBuffer = new BarrierBuffer(myIG,
+ new BarrierBufferTest.MockReader(myIG));
+
+ try {
+ // long time = System.currentTimeMillis();
+ for (int i = 0; i < 2000000; i++) {
+ BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
+ if (boe.isBuffer()) {
+ boe.getBuffer().recycle();
+ } else {
+ barrierBuffer.processSuperstep(boe);
+ }
+ }
+ // System.out.println("Ran for " + (System.currentTimeMillis() -
+ // time));
+ } catch (Exception e) {
+ fail();
+ } finally {
+ barrierBuffer.cleanup();
+ }
+ }
+
+ private static class RandomBarrier implements BarrierGenerator {
+ private static Random rnd = new Random();
+
+ double threshold;
+
+ public RandomBarrier(double expectedEvery) {
+ threshold = 1 / expectedEvery;
+ }
+
+ @Override
+ public boolean isNextBarrier() {
+ return rnd.nextDouble() < threshold;
+ }
+ }
+
+ private static class CountBarrier implements BarrierGenerator {
+
+ long every;
+ long c = 0;
+
+ public CountBarrier(long every) {
+ this.every = every;
+ }
+
+ @Override
+ public boolean isNextBarrier() {
+ return c++ % every == 0;
+ }
+ }
+
+ protected static class MockInputGate implements InputGate {
+
+ private int numChannels;
+ private BufferPool[] bufferPools;
+ private int[] currentSupersteps;
+ BarrierGenerator[] barrierGens;
+ int currentChannel = 0;
+ long c = 0;
+
+ public MockInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
+ this.numChannels = bufferPools.length;
+ this.currentSupersteps = new int[numChannels];
+ this.bufferPools = bufferPools;
+ this.barrierGens = barrierGens;
+ }
+
+ @Override
+ public int getNumberOfInputChannels() {
+ return numChannels;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return false;
+ }
+
+ @Override
+ public void requestPartitions() throws IOException, InterruptedException {
+ }
+
+ @Override
+ public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
+ currentChannel = (currentChannel + 1) % numChannels;
+
+ if (barrierGens[currentChannel].isNextBarrier()) {
+ return BarrierBufferTest.createSuperstep(++currentSupersteps[currentChannel],
+ currentChannel);
+ } else {
+ Buffer buffer = bufferPools[currentChannel].requestBuffer();
+ buffer.getMemorySegment().putLong(0, c++);
+
+ return new BufferOrEvent(buffer, currentChannel);
+ }
+
+ }
+
+ @Override
+ public void sendTaskEvent(TaskEvent event) throws IOException {
+ }
+
+ @Override
+ public void registerListener(EventListener<InputGate> listener) {
+ }
+
+ }
+
+ protected interface BarrierGenerator {
+ public boolean isNextBarrier();
+ }
+
+}