You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:45 UTC

[21/63] [abbrv] Unify all job vertices to one type (rather than dedicated input/output types)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
index e7d590d..d1f4ae0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
@@ -33,10 +33,11 @@ import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
 import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
 import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.InputFormatInputVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
 import org.apache.flink.runtime.jobgraph.JobTaskVertex;
+import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex;
+import org.apache.flink.runtime.jobgraph.SimpleOutputVertex;
 import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
 import org.apache.flink.runtime.operators.CoGroupDriver;
 import org.apache.flink.runtime.operators.CollectorMapDriver;
@@ -118,7 +119,7 @@ public class CompensatableDanglingPageRank {
 		// --------------- the inputs ---------------------
 
 		// page rank input
-		JobInputVertex pageWithRankInput = JobGraphUtils.createInput(new ImprovedDanglingPageRankInputFormat(),
+		InputFormatInputVertex pageWithRankInput = JobGraphUtils.createInput(new ImprovedDanglingPageRankInputFormat(),
 			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism);
 		TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
 		pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
@@ -127,7 +128,7 @@ public class CompensatableDanglingPageRank {
 		pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
 
 		// edges as adjacency list
-		JobInputVertex adjacencyListInput = JobGraphUtils.createInput(new ImprovedAdjacencyListInputFormat(),
+		InputFormatInputVertex adjacencyListInput = JobGraphUtils.createInput(new ImprovedAdjacencyListInputFormat(),
 			adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism);
 		TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
 		adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
@@ -247,7 +248,7 @@ public class CompensatableDanglingPageRank {
 		
 		// --------------- the output ---------------------
 
-		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
+		OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
 		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
 		outputConfig.addInputToGroup(0);
 		outputConfig.setInputSerializer(recSerializer, 0);
@@ -256,10 +257,10 @@ public class CompensatableDanglingPageRank {
 		
 		// --------------- the auxiliaries ---------------------
 		
-		JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
+		SimpleOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
 			degreeOfParallelism);
 
-		JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
+		SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setNumberOfIterations(numIterations);
 		syncConfig.addIterationAggregator(CompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
index 7424de6..c365378 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
@@ -32,9 +32,9 @@ import org.apache.flink.runtime.io.network.channels.ChannelType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
 import org.apache.flink.runtime.jobgraph.JobTaskVertex;
+import org.apache.flink.runtime.jobgraph.SimpleInputVertex;
+import org.apache.flink.runtime.jobgraph.SimpleOutputVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.test.util.RecordAPITestBase;
 import org.junit.After;
@@ -99,7 +99,7 @@ public class NetworkStackThroughput {
 
 			JobGraph jobGraph = new JobGraph("Speed Test");
 
-			JobInputVertex producer = new JobInputVertex("Speed Test Producer", jobGraph);
+			SimpleInputVertex producer = new SimpleInputVertex("Speed Test Producer", jobGraph);
 			producer.setInvokableClass(SpeedTestProducer.class);
 			producer.setNumberOfSubtasks(numSubtasks);
 			producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
@@ -112,7 +112,7 @@ public class NetworkStackThroughput {
 				forwarder.setNumberOfSubtasks(numSubtasks);
 			}
 
-			JobOutputVertex consumer = new JobOutputVertex("Speed Test Consumer", jobGraph);
+			SimpleOutputVertex consumer = new SimpleOutputVertex("Speed Test Consumer", jobGraph);
 			consumer.setInvokableClass(SpeedTestConsumer.class);
 			consumer.setNumberOfSubtasks(numSubtasks);
 			consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);