You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/10 13:42:47 UTC

[5/7] flink git commit: [FLINK-4545] [network] Allow LocalBufferPool to limited the number of used buffers

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 4d462d0..7c51bc2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.graph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
@@ -32,6 +33,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
@@ -152,10 +154,17 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		StreamGraph streamGraph = env.getStreamGraph();
 		streamGraph.setJobName("test job");
 		JobGraph jobGraph = streamGraph.getJobGraph();
+		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
 		
 		assertEquals(2, jobGraph.getNumberOfVertices());
-		assertEquals(1, jobGraph.getVerticesAsArray()[0].getParallelism());
-		assertEquals(1, jobGraph.getVerticesAsArray()[1].getParallelism());
+		assertEquals(1, verticesSorted.get(0).getParallelism());
+		assertEquals(1, verticesSorted.get(1).getParallelism());
+
+		JobVertex sourceVertex = verticesSorted.get(0);
+		JobVertex mapSinkVertex = verticesSorted.get(1);
+
+		assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType());
+		assertEquals(ResultPartitionType.PIPELINED_BOUNDED, mapSinkVertex.getInputs().get(0).getSource().getResultType());
 	}
 
 	/**
@@ -191,8 +200,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 			.print();
 		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
 
-		JobVertex sourceVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
-		JobVertex mapPrintVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
+		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+		JobVertex sourceVertex = verticesSorted.get(0);
+		JobVertex mapPrintVertex = verticesSorted.get(1);
+
+		assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType());
+		assertEquals(ResultPartitionType.PIPELINED_BOUNDED, mapPrintVertex.getInputs().get(0).getSource().getResultType());
 
 		StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration());
 		StreamConfig mapConfig = new StreamConfig(mapPrintVertex.getConfiguration());

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index ec55f19..4344a46 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -51,8 +51,8 @@ public class BarrierBufferMassiveRandomTest {
 		try {
 			ioMan = new IOManagerAsync();
 			
-			BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100);
-			BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100);
+			BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, 100);
+			BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, 100);
 
 			RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(
 					new BufferPool[] { pool1, pool2 },