You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 10:03:49 UTC

[3/3] flink git commit: [FLINK-1706] IOTest added for BarrierBuffers

[FLINK-1706] IOTest added for BarrierBuffers

Closes #493


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ceea93d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ceea93d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ceea93d4

Branch: refs/heads/master
Commit: ceea93d41b6e8ae66cd96d03f2f27e0027f8ac4c
Parents: f625b8d
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Mar 18 23:41:08 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 09:21:11 2015 +0100

----------------------------------------------------------------------
 .../api/streamvertex/InputHandler.java          |   2 +-
 .../api/streamvertex/StreamVertex.java          |   5 +-
 .../flink/streaming/io/BarrierBufferIOTest.java | 159 +++++++++++++++++++
 3 files changed, 162 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ceea93d4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index 726df53..48a00d8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -91,7 +91,7 @@ public class InputHandler<IN> {
 	public void clearReaders() throws IOException {
 		if (inputs != null) {
 			inputs.clearBuffers();
+			inputs.cleanup();
 		}
-		inputs.cleanup();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ceea93d4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 30b69a7..dd8a463 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -114,9 +114,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 		if (configuration.getStateMonitoring() && !states.isEmpty()) {
 			getEnvironment().getJobManager().tell(
 					new StateBarrierAck(getEnvironment().getJobID(), getEnvironment()
-							.getJobVertexId(), context.getIndexOfThisSubtask(), barrierID, 
-							new LocalStateHandle(states)),
-					ActorRef.noSender());
+							.getJobVertexId(), context.getIndexOfThisSubtask(), barrierID,
+							new LocalStateHandle(states)), ActorRef.noSender());
 		} else {
 			getEnvironment().getJobManager().tell(
 					new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),

http://git-wip-us.apache.org/repos/asf/flink/blob/ceea93d4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java
new file mode 100644
index 0000000..24106c1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.junit.Test;
+
+public class BarrierBufferIOTest {
+
+	@Test
+	public void IOTest() throws IOException, InterruptedException {
+
+		BufferPool pool1 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+		BufferPool pool2 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+
+		MockInputGate myIG = new MockInputGate(new BufferPool[] { pool1, pool2 },
+				new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
+		// new BarrierSimulator[] { new CountBarrier(1000), new
+		// CountBarrier(1000) });
+
+		BarrierBuffer barrierBuffer = new BarrierBuffer(myIG,
+				new BarrierBufferTest.MockReader(myIG));
+
+		try {
+			// long time = System.currentTimeMillis();
+			for (int i = 0; i < 2000000; i++) {
+				BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
+				if (boe.isBuffer()) {
+					boe.getBuffer().recycle();
+				} else {
+					barrierBuffer.processSuperstep(boe);
+				}
+			}
+			// System.out.println("Ran for " + (System.currentTimeMillis() -
+			// time));
+		} catch (Exception e) {
+			fail();
+		} finally {
+			barrierBuffer.cleanup();
+		}
+	}
+
+	private static class RandomBarrier implements BarrierGenerator {
+		private static Random rnd = new Random();
+
+		double threshold;
+
+		public RandomBarrier(double expectedEvery) {
+			threshold = 1 / expectedEvery;
+		}
+
+		@Override
+		public boolean isNextBarrier() {
+			return rnd.nextDouble() < threshold;
+		}
+	}
+
+	private static class CountBarrier implements BarrierGenerator {
+
+		long every;
+		long c = 0;
+
+		public CountBarrier(long every) {
+			this.every = every;
+		}
+
+		@Override
+		public boolean isNextBarrier() {
+			return c++ % every == 0;
+		}
+	}
+
+	protected static class MockInputGate implements InputGate {
+
+		private int numChannels;
+		private BufferPool[] bufferPools;
+		private int[] currentSupersteps;
+		BarrierGenerator[] barrierGens;
+		int currentChannel = 0;
+		long c = 0;
+
+		public MockInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
+			this.numChannels = bufferPools.length;
+			this.currentSupersteps = new int[numChannels];
+			this.bufferPools = bufferPools;
+			this.barrierGens = barrierGens;
+		}
+
+		@Override
+		public int getNumberOfInputChannels() {
+			return numChannels;
+		}
+
+		@Override
+		public boolean isFinished() {
+			return false;
+		}
+
+		@Override
+		public void requestPartitions() throws IOException, InterruptedException {
+		}
+
+		@Override
+		public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
+			currentChannel = (currentChannel + 1) % numChannels;
+
+			if (barrierGens[currentChannel].isNextBarrier()) {
+				return BarrierBufferTest.createSuperstep(++currentSupersteps[currentChannel],
+						currentChannel);
+			} else {
+				Buffer buffer = bufferPools[currentChannel].requestBuffer();
+				buffer.getMemorySegment().putLong(0, c++);
+
+				return new BufferOrEvent(buffer, currentChannel);
+			}
+
+		}
+
+		@Override
+		public void sendTaskEvent(TaskEvent event) throws IOException {
+		}
+
+		@Override
+		public void registerListener(EventListener<InputGate> listener) {
+		}
+
+	}
+
+	protected interface BarrierGenerator {
+		public boolean isNextBarrier();
+	}
+
+}