You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:45 UTC
[21/63] [abbrv] Unify all job vertices to one type (rather than
dedicated input/output types)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
index e7d590d..d1f4ae0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
@@ -33,10 +33,11 @@ import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.InputFormatInputVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
import org.apache.flink.runtime.jobgraph.JobTaskVertex;
+import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex;
+import org.apache.flink.runtime.jobgraph.SimpleOutputVertex;
import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
import org.apache.flink.runtime.operators.CoGroupDriver;
import org.apache.flink.runtime.operators.CollectorMapDriver;
@@ -118,7 +119,7 @@ public class CompensatableDanglingPageRank {
// --------------- the inputs ---------------------
// page rank input
- JobInputVertex pageWithRankInput = JobGraphUtils.createInput(new ImprovedDanglingPageRankInputFormat(),
+ InputFormatInputVertex pageWithRankInput = JobGraphUtils.createInput(new ImprovedDanglingPageRankInputFormat(),
pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism);
TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
@@ -127,7 +128,7 @@ public class CompensatableDanglingPageRank {
pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
// edges as adjacency list
- JobInputVertex adjacencyListInput = JobGraphUtils.createInput(new ImprovedAdjacencyListInputFormat(),
+ InputFormatInputVertex adjacencyListInput = JobGraphUtils.createInput(new ImprovedAdjacencyListInputFormat(),
adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism);
TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
@@ -247,7 +248,7 @@ public class CompensatableDanglingPageRank {
// --------------- the output ---------------------
- JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
+ OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
outputConfig.addInputToGroup(0);
outputConfig.setInputSerializer(recSerializer, 0);
@@ -256,10 +257,10 @@ public class CompensatableDanglingPageRank {
// --------------- the auxiliaries ---------------------
- JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
+ SimpleOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
degreeOfParallelism);
- JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
+ SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
syncConfig.setNumberOfIterations(numIterations);
syncConfig.addIterationAggregator(CompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
index 7424de6..c365378 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
@@ -32,9 +32,9 @@ import org.apache.flink.runtime.io.network.channels.ChannelType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
import org.apache.flink.runtime.jobgraph.JobTaskVertex;
+import org.apache.flink.runtime.jobgraph.SimpleInputVertex;
+import org.apache.flink.runtime.jobgraph.SimpleOutputVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.test.util.RecordAPITestBase;
import org.junit.After;
@@ -99,7 +99,7 @@ public class NetworkStackThroughput {
JobGraph jobGraph = new JobGraph("Speed Test");
- JobInputVertex producer = new JobInputVertex("Speed Test Producer", jobGraph);
+ SimpleInputVertex producer = new SimpleInputVertex("Speed Test Producer", jobGraph);
producer.setInvokableClass(SpeedTestProducer.class);
producer.setNumberOfSubtasks(numSubtasks);
producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
@@ -112,7 +112,7 @@ public class NetworkStackThroughput {
forwarder.setNumberOfSubtasks(numSubtasks);
}
- JobOutputVertex consumer = new JobOutputVertex("Speed Test Consumer", jobGraph);
+ SimpleOutputVertex consumer = new SimpleOutputVertex("Speed Test Consumer", jobGraph);
consumer.setInvokableClass(SpeedTestConsumer.class);
consumer.setNumberOfSubtasks(numSubtasks);
consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);