You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/10 13:42:47 UTC
[5/7] flink git commit: [FLINK-4545] [network] Allow LocalBufferPool
to limited the number of used buffers
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 4d462d0..7c51bc2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.graph;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
@@ -32,6 +33,7 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -152,10 +154,17 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
StreamGraph streamGraph = env.getStreamGraph();
streamGraph.setJobName("test job");
JobGraph jobGraph = streamGraph.getJobGraph();
+ List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
assertEquals(2, jobGraph.getNumberOfVertices());
- assertEquals(1, jobGraph.getVerticesAsArray()[0].getParallelism());
- assertEquals(1, jobGraph.getVerticesAsArray()[1].getParallelism());
+ assertEquals(1, verticesSorted.get(0).getParallelism());
+ assertEquals(1, verticesSorted.get(1).getParallelism());
+
+ JobVertex sourceVertex = verticesSorted.get(0);
+ JobVertex mapSinkVertex = verticesSorted.get(1);
+
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType());
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED, mapSinkVertex.getInputs().get(0).getSource().getResultType());
}
/**
@@ -191,8 +200,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
.print();
JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
- JobVertex sourceVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
- JobVertex mapPrintVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
+ List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+ JobVertex sourceVertex = verticesSorted.get(0);
+ JobVertex mapPrintVertex = verticesSorted.get(1);
+
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType());
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED, mapPrintVertex.getInputs().get(0).getSource().getResultType());
StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration());
StreamConfig mapConfig = new StreamConfig(mapPrintVertex.getConfiguration());
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index ec55f19..4344a46 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -51,8 +51,8 @@ public class BarrierBufferMassiveRandomTest {
try {
ioMan = new IOManagerAsync();
- BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100);
- BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100);
+ BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, 100);
+ BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, 100);
RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(
new BufferPool[] { pool1, pool2 },