You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:29 UTC
[05/63] [abbrv] Refactor job graph construction to incremental
attachment based
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/JobEventTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/JobEventTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/JobEventTest.java
index 0698b56..c8ded86 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/JobEventTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/JobEventTest.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.event.job;
import static org.junit.Assert.assertEquals;
@@ -25,18 +24,16 @@ import static org.junit.Assert.fail;
import java.io.IOException;
-import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.junit.Test;
/**
* This class contains tests concerning the serialization/deserialization of job events which have been derived from
* {@link org.apache.flink.runtime.event.job.AbstractEvent}.
- *
*/
public class JobEventTest {
+
/**
* This test checks the correct serialization/deserialization of a {@link JobEvent}.
*/
@@ -57,31 +54,4 @@ public class JobEventTest {
fail(ioe.getMessage());
}
}
-
- /**
- * This test checks the correct serialization/deserialization of a {@link VertexEvent}.
- */
- @Test
- public void testVertexEvent() {
-
- try {
-
- final VertexEvent orig = new VertexEvent(23423423L, new JobVertexID(), "Test Vertex", 2, 0,
- ExecutionState.READY, "Test Description");
- final VertexEvent copy = (VertexEvent) CommonTestUtils.createCopyWritable(orig);
-
- assertEquals(orig.getTimestamp(), copy.getTimestamp());
- assertEquals(orig.getJobVertexID(), copy.getJobVertexID());
- assertEquals(orig.getJobVertexName(), copy.getJobVertexName());
- assertEquals(orig.getTotalNumberOfSubtasks(), copy.getTotalNumberOfSubtasks());
- assertEquals(orig.getIndexOfSubtask(), copy.getIndexOfSubtask());
- assertEquals(orig.getCurrentExecutionState(), copy.getCurrentExecutionState());
- assertEquals(orig.getDescription(), copy.getDescription());
- assertEquals(orig.hashCode(), copy.hashCode());
- assertTrue(orig.equals(copy));
-
- } catch (IOException ioe) {
- fail(ioe.getMessage());
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/ManagementEventTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/ManagementEventTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/ManagementEventTest.java
index 2a30ae4..99e750b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/ManagementEventTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/ManagementEventTest.java
@@ -16,13 +16,14 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.event.job;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
+import org.apache.flink.runtime.event.job.RecentJobEvent;
+import org.apache.flink.runtime.execution.ExecutionState2;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.managementgraph.ManagementVertexID;
@@ -31,9 +32,9 @@ import org.junit.Test;
/**
* This test checks the proper serialization and deserialization of job events.
- *
*/
public class ManagementEventTest {
+
/**
* The time stamp used during the tests.
*/
@@ -51,7 +52,7 @@ public class ManagementEventTest {
public void testExecutionStateChangeEvent() {
final ExecutionStateChangeEvent orig = new ExecutionStateChangeEvent(TIMESTAMP, new ManagementVertexID(),
- ExecutionState.READY);
+ ExecutionState2.DEPLOYING);
final ExecutionStateChangeEvent copy = (ExecutionStateChangeEvent) ManagementTestUtils.createCopy(orig);
@@ -68,8 +69,7 @@ public class ManagementEventTest {
@Test
public void testRecentJobEvent() {
- final RecentJobEvent orig = new RecentJobEvent(new JobID(), JOBNAME, JobStatus.SCHEDULED, true, TIMESTAMP,
- TIMESTAMP);
+ final RecentJobEvent orig = new RecentJobEvent(new JobID(), JOBNAME, JobStatus.RUNNING, true, TIMESTAMP, TIMESTAMP);
final RecentJobEvent copy = (RecentJobEvent) ManagementTestUtils.createCopy(orig);
@@ -82,20 +82,4 @@ public class ManagementEventTest {
assertEquals(orig.hashCode(), copy.hashCode());
assertTrue(orig.equals(copy));
}
-
- /**
- * Tests serialization/deserialization for {@link VertexAssignmentEvent}.
- */
- @Test
- public void testVertexAssignmentEvent() {
-
- final VertexAssignmentEvent orig = new VertexAssignmentEvent(TIMESTAMP, new ManagementVertexID(), "test");
- final VertexAssignmentEvent copy = (VertexAssignmentEvent) ManagementTestUtils.createCopy(orig);
-
- assertEquals(orig.getVertexID(), copy.getVertexID());
- assertEquals(orig.getTimestamp(), copy.getTimestamp());
- assertEquals(orig.getInstanceName(), copy.getInstanceName());
- assertEquals(orig.hashCode(), copy.hashCode());
- assertTrue(orig.equals(copy));
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
new file mode 100644
index 0000000..498f773
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import java.util.Arrays;
+
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class AllVerticesIteratorTest {
+
+ @Test
+ public void testAllVertices() {
+ try {
+
+ AbstractJobVertex v1 = new AbstractJobVertex("v1");
+ AbstractJobVertex v2 = new AbstractJobVertex("v2");
+ AbstractJobVertex v3 = new AbstractJobVertex("v3");
+ AbstractJobVertex v4 = new AbstractJobVertex("v4");
+
+ v1.setParallelism(1);
+ v2.setParallelism(7);
+ v3.setParallelism(3);
+ v4.setParallelism(2);
+
+ ExecutionGraph eg = Mockito.mock(ExecutionGraph.class);
+
+ ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, v1, 1);
+ ExecutionJobVertex ejv2 = new ExecutionJobVertex(eg, v2, 1);
+ ExecutionJobVertex ejv3 = new ExecutionJobVertex(eg, v3, 1);
+ ExecutionJobVertex ejv4 = new ExecutionJobVertex(eg, v4, 1);
+
+ AllVerticesIterator iter = new AllVerticesIterator(Arrays.asList(ejv1, ejv2, ejv3, ejv4).iterator());
+
+ int numReturned = 0;
+ while (iter.hasNext()) {
+ iter.hasNext();
+ Assert.assertNotNull(iter.next());
+ numReturned++;
+ }
+
+ Assert.assertEquals(13, numReturned);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
new file mode 100644
index 0000000..b6f532e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -0,0 +1,570 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.mockito.Matchers;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.core.io.InputSplitSource;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.LogUtils;
+
+/**
+ * This class contains test concerning the correct conversion from {@link JobGraph} to {@link ExecutionGraph} objects.
+ */
+public class ExecutionGraphConstructionTest {
+
+ @BeforeClass
+ public static void setLogLevel() {
+ LogUtils.initializeDefaultTestConsoleLogger();
+ }
+
+
+ /**
+ * Creates a JobGraph of the following form:
+ *
+ * <pre>
+ * v1--->v2-->\
+ * \
+ * v4 --->\
+ * ----->/ \
+ * v3-->/ v5
+ * \ /
+ * ------------->/
+ * </pre>
+ */
+ @Test
+ public void testCreateSimpleGraphBipartite() {
+
+ final JobID jobId = new JobID();
+ final String jobName = "Test Job Sample Name";
+ final Configuration cfg = new Configuration();
+
+ AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+ AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+ AbstractJobVertex v3 = new AbstractJobVertex("vertex3");
+ AbstractJobVertex v4 = new AbstractJobVertex("vertex4");
+ AbstractJobVertex v5 = new AbstractJobVertex("vertex5");
+
+ v1.setParallelism(5);
+ v2.setParallelism(7);
+ v3.setParallelism(2);
+ v4.setParallelism(11);
+ v5.setParallelism(4);
+
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.BIPARTITE);
+ v4.connectNewDataSetAsInput(v2, DistributionPattern.BIPARTITE);
+ v4.connectNewDataSetAsInput(v3, DistributionPattern.BIPARTITE);
+ v5.connectNewDataSetAsInput(v4, DistributionPattern.BIPARTITE);
+ v5.connectNewDataSetAsInput(v3, DistributionPattern.BIPARTITE);
+
+ List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
+
+ ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+ try {
+ eg.attachJobGraph(ordered);
+ }
+ catch (JobException e) {
+ e.printStackTrace();
+ fail("Job failed with exception: " + e.getMessage());
+ }
+
+ verifyTestGraph(eg, jobId, v1, v2, v3, v4, v5);
+ }
+
+ @Test
+ public void testAttachViaDataSets() {
+ final JobID jobId = new JobID();
+ final String jobName = "Test Job Sample Name";
+ final Configuration cfg = new Configuration();
+
+ // construct part one of the execution graph
+ AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+ AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+ AbstractJobVertex v3 = new AbstractJobVertex("vertex3");
+
+ v1.setParallelism(5);
+ v2.setParallelism(7);
+ v3.setParallelism(2);
+
+ // this creates an intermediate result for v1
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.BIPARTITE);
+
+ // create results for v2 and v3
+ IntermediateDataSet v2result = v2.createAndAddResultDataSet();
+ IntermediateDataSet v3result_1 = v3.createAndAddResultDataSet();
+ IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet();
+
+
+ List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3));
+
+ ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+ try {
+ eg.attachJobGraph(ordered);
+ }
+ catch (JobException e) {
+ e.printStackTrace();
+ fail("Job failed with exception: " + e.getMessage());
+ }
+
+ // attach the second part of the graph
+
+ AbstractJobVertex v4 = new AbstractJobVertex("vertex4");
+ AbstractJobVertex v5 = new AbstractJobVertex("vertex5");
+ v4.setParallelism(11);
+ v5.setParallelism(4);
+
+ v4.connectDataSetAsInput(v2result, DistributionPattern.BIPARTITE);
+ v4.connectDataSetAsInput(v3result_1, DistributionPattern.BIPARTITE);
+ v5.connectNewDataSetAsInput(v4, DistributionPattern.BIPARTITE);
+ v5.connectDataSetAsInput(v3result_2, DistributionPattern.BIPARTITE);
+
+ List<AbstractJobVertex> ordered2 = new ArrayList<AbstractJobVertex>(Arrays.asList(v4, v5));
+
+ try {
+ eg.attachJobGraph(ordered2);
+ }
+ catch (JobException e) {
+ e.printStackTrace();
+ fail("Job failed with exception: " + e.getMessage());
+ }
+
+ // verify
+ verifyTestGraph(eg, jobId, v1, v2, v3, v4, v5);
+ }
+
+ @Test
+ public void testAttachViaIds() {
+ final JobID jobId = new JobID();
+ final String jobName = "Test Job Sample Name";
+ final Configuration cfg = new Configuration();
+
+ // construct part one of the execution graph
+ AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+ AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+ AbstractJobVertex v3 = new AbstractJobVertex("vertex3");
+
+ v1.setParallelism(5);
+ v2.setParallelism(7);
+ v3.setParallelism(2);
+
+ // this creates an intermediate result for v1
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.BIPARTITE);
+
+ // create results for v2 and v3
+ IntermediateDataSet v2result = v2.createAndAddResultDataSet();
+ IntermediateDataSet v3result_1 = v3.createAndAddResultDataSet();
+ IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet();
+
+
+ List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3));
+
+ ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+ try {
+ eg.attachJobGraph(ordered);
+ }
+ catch (JobException e) {
+ e.printStackTrace();
+ fail("Job failed with exception: " + e.getMessage());
+ }
+
+ // attach the second part of the graph
+
+ AbstractJobVertex v4 = new AbstractJobVertex("vertex4");
+ AbstractJobVertex v5 = new AbstractJobVertex("vertex5");
+ v4.setParallelism(11);
+ v5.setParallelism(4);
+
+ v4.connectIdInput(v2result.getId(), DistributionPattern.BIPARTITE);
+ v4.connectIdInput(v3result_1.getId(), DistributionPattern.BIPARTITE);
+ v5.connectNewDataSetAsInput(v4, DistributionPattern.BIPARTITE);
+ v5.connectIdInput(v3result_2.getId(), DistributionPattern.BIPARTITE);
+
+ List<AbstractJobVertex> ordered2 = new ArrayList<AbstractJobVertex>(Arrays.asList(v4, v5));
+
+ try {
+ eg.attachJobGraph(ordered2);
+ }
+ catch (JobException e) {
+ e.printStackTrace();
+ fail("Job failed with exception: " + e.getMessage());
+ }
+
+ // verify
+ verifyTestGraph(eg, jobId, v1, v2, v3, v4, v5);
+ }
+
+ private void verifyTestGraph(ExecutionGraph eg, JobID jobId,
+ AbstractJobVertex v1, AbstractJobVertex v2, AbstractJobVertex v3,
+ AbstractJobVertex v4, AbstractJobVertex v5)
+ {
+ Map<JobVertexID, ExecutionJobVertex> vertices = eg.getAllVertices();
+
+ // verify v1
+ {
+ ExecutionJobVertex e1 = vertices.get(v1.getID());
+ assertNotNull(e1);
+
+ // basic properties
+ assertEquals(v1.getParallelism(), e1.getParallelism());
+ assertEquals(v1.getID(), e1.getJobVertexId());
+ assertEquals(jobId, e1.getJobId());
+ assertEquals(v1, e1.getJobVertex());
+
+ // produced data sets
+ assertEquals(1, e1.getProducedDataSets().length);
+ assertEquals(v1.getProducedDataSets().get(0).getId(), e1.getProducedDataSets()[0].getId());
+ assertEquals(v1.getParallelism(), e1.getProducedDataSets()[0].getPartitions().length);
+
+ // task vertices
+ assertEquals(v1.getParallelism(), e1.getTaskVertices().length);
+
+ int num = 0;
+ for (ExecutionVertex2 ev : e1.getTaskVertices()) {
+ assertEquals(jobId, ev.getJobId());
+ assertEquals(v1.getID(), ev.getJobvertexId());
+
+ assertEquals(v1.getParallelism(), ev.getTotalNumberOfParallelSubtasks());
+ assertEquals(num++, ev.getParallelSubtaskIndex());
+
+ assertEquals(0, ev.getNumberOfInputs());
+ }
+ }
+
+ // verify v2
+ {
+ ExecutionJobVertex e2 = vertices.get(v2.getID());
+ assertNotNull(e2);
+
+ // produced data sets
+ assertEquals(1, e2.getProducedDataSets().length);
+ assertEquals(v2.getProducedDataSets().get(0).getId(), e2.getProducedDataSets()[0].getId());
+ assertEquals(v2.getParallelism(), e2.getProducedDataSets()[0].getPartitions().length);
+
+ // task vertices
+ assertEquals(v2.getParallelism(), e2.getTaskVertices().length);
+
+ int num = 0;
+ for (ExecutionVertex2 ev : e2.getTaskVertices()) {
+ assertEquals(jobId, ev.getJobId());
+ assertEquals(v2.getID(), ev.getJobvertexId());
+
+ assertEquals(v2.getParallelism(), ev.getTotalNumberOfParallelSubtasks());
+ assertEquals(num++, ev.getParallelSubtaskIndex());
+
+ assertEquals(1, ev.getNumberOfInputs());
+ ExecutionEdge2[] inputs = ev.getInputEdges(0);
+ assertEquals(v1.getParallelism(), inputs.length);
+
+ int sumOfPartitions = 0;
+ for (ExecutionEdge2 inEdge : inputs) {
+ assertEquals(0,inEdge.getInputNum());
+ sumOfPartitions += inEdge.getSource().getPartition();
+ }
+
+ assertEquals(10, sumOfPartitions);
+ }
+ }
+
+ // verify v3
+ {
+ ExecutionJobVertex e3 = vertices.get(v3.getID());
+ assertNotNull(e3);
+
+ // produced data sets
+ assertEquals(2, e3.getProducedDataSets().length);
+ assertEquals(v3.getProducedDataSets().get(0).getId(), e3.getProducedDataSets()[0].getId());
+ assertEquals(v3.getProducedDataSets().get(1).getId(), e3.getProducedDataSets()[1].getId());
+ assertEquals(v3.getParallelism(), e3.getProducedDataSets()[0].getPartitions().length);
+ assertEquals(v3.getParallelism(), e3.getProducedDataSets()[1].getPartitions().length);
+
+ // task vertices
+ assertEquals(v3.getParallelism(), e3.getTaskVertices().length);
+
+ int num = 0;
+ for (ExecutionVertex2 ev : e3.getTaskVertices()) {
+ assertEquals(jobId, ev.getJobId());
+ assertEquals(v3.getID(), ev.getJobvertexId());
+
+ assertEquals(v3.getParallelism(), ev.getTotalNumberOfParallelSubtasks());
+ assertEquals(num++, ev.getParallelSubtaskIndex());
+
+ assertEquals(0, ev.getNumberOfInputs());
+ }
+ }
+
+ // verify v4
+ {
+ ExecutionJobVertex e4 = vertices.get(v4.getID());
+ assertNotNull(e4);
+
+ // produced data sets
+ assertEquals(1, e4.getProducedDataSets().length);
+ assertEquals(v4.getProducedDataSets().get(0).getId(), e4.getProducedDataSets()[0].getId());
+
+ // task vertices
+ assertEquals(v4.getParallelism(), e4.getTaskVertices().length);
+
+ int num = 0;
+ for (ExecutionVertex2 ev : e4.getTaskVertices()) {
+ assertEquals(jobId, ev.getJobId());
+ assertEquals(v4.getID(), ev.getJobvertexId());
+
+ assertEquals(v4.getParallelism(), ev.getTotalNumberOfParallelSubtasks());
+ assertEquals(num++, ev.getParallelSubtaskIndex());
+
+ assertEquals(2, ev.getNumberOfInputs());
+ // first input
+ {
+ ExecutionEdge2[] inputs = ev.getInputEdges(0);
+ assertEquals(v2.getParallelism(), inputs.length);
+
+ int sumOfPartitions = 0;
+ for (ExecutionEdge2 inEdge : inputs) {
+ assertEquals(0, inEdge.getInputNum());
+ sumOfPartitions += inEdge.getSource().getPartition();
+ }
+
+ assertEquals(21, sumOfPartitions);
+ }
+ // second input
+ {
+ ExecutionEdge2[] inputs = ev.getInputEdges(1);
+ assertEquals(v3.getParallelism(), inputs.length);
+
+ int sumOfPartitions = 0;
+ for (ExecutionEdge2 inEdge : inputs) {
+ assertEquals(1, inEdge.getInputNum());
+ sumOfPartitions += inEdge.getSource().getPartition();
+ }
+
+ assertEquals(1, sumOfPartitions);
+ }
+ }
+ }
+
+ // verify v5
+ {
+ ExecutionJobVertex e5 = vertices.get(v5.getID());
+ assertNotNull(e5);
+
+ // produced data sets
+ assertEquals(0, e5.getProducedDataSets().length);
+
+ // task vertices
+ assertEquals(v5.getParallelism(), e5.getTaskVertices().length);
+
+ int num = 0;
+ for (ExecutionVertex2 ev : e5.getTaskVertices()) {
+ assertEquals(jobId, ev.getJobId());
+ assertEquals(v5.getID(), ev.getJobvertexId());
+
+ assertEquals(v5.getParallelism(), ev.getTotalNumberOfParallelSubtasks());
+ assertEquals(num++, ev.getParallelSubtaskIndex());
+
+ assertEquals(2, ev.getNumberOfInputs());
+ // first input
+ {
+ ExecutionEdge2[] inputs = ev.getInputEdges(0);
+ assertEquals(v4.getParallelism(), inputs.length);
+
+ int sumOfPartitions = 0;
+ for (ExecutionEdge2 inEdge : inputs) {
+ assertEquals(0, inEdge.getInputNum());
+ sumOfPartitions += inEdge.getSource().getPartition();
+ }
+
+ assertEquals(55, sumOfPartitions);
+ }
+ // second input
+ {
+ ExecutionEdge2[] inputs = ev.getInputEdges(1);
+ assertEquals(v3.getParallelism(), inputs.length);
+
+ int sumOfPartitions = 0;
+ for (ExecutionEdge2 inEdge : inputs) {
+ assertEquals(1, inEdge.getInputNum());
+ sumOfPartitions += inEdge.getSource().getPartition();
+ }
+
+ assertEquals(1, sumOfPartitions);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testCannotConnectMissingId() {
+ final JobID jobId = new JobID();
+ final String jobName = "Test Job Sample Name";
+ final Configuration cfg = new Configuration();
+
+ // construct part one of the execution graph
+ AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+ v1.setParallelism(7);
+
+ List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1));
+
+ ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+ try {
+ eg.attachJobGraph(ordered);
+ }
+ catch (JobException e) {
+ e.printStackTrace();
+ fail("Job failed with exception: " + e.getMessage());
+ }
+
+ // attach the second part of the graph
+ AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+ v2.connectIdInput(new IntermediateDataSetID(), DistributionPattern.BIPARTITE);
+
+ List<AbstractJobVertex> ordered2 = new ArrayList<AbstractJobVertex>(Arrays.asList(v2));
+
+ try {
+ eg.attachJobGraph(ordered2);
+ fail("Attached wrong jobgraph");
+ }
+ catch (JobException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testCannotConnectWrongOrder() {
+ final JobID jobId = new JobID();
+ final String jobName = "Test Job Sample Name";
+ final Configuration cfg = new Configuration();
+
+ AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+ AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+ AbstractJobVertex v3 = new AbstractJobVertex("vertex3");
+ AbstractJobVertex v4 = new AbstractJobVertex("vertex4");
+ AbstractJobVertex v5 = new AbstractJobVertex("vertex5");
+
+ v1.setParallelism(5);
+ v2.setParallelism(7);
+ v3.setParallelism(2);
+ v4.setParallelism(11);
+ v5.setParallelism(4);
+
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.BIPARTITE);
+ v4.connectNewDataSetAsInput(v2, DistributionPattern.BIPARTITE);
+ v4.connectNewDataSetAsInput(v3, DistributionPattern.BIPARTITE);
+ v5.connectNewDataSetAsInput(v4, DistributionPattern.BIPARTITE);
+ v5.connectNewDataSetAsInput(v3, DistributionPattern.BIPARTITE);
+
+ List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
+
+ ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+ try {
+ eg.attachJobGraph(ordered);
+ fail("Attached wrong jobgraph");
+ }
+ catch (JobException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testSetupInputSplits() {
+ try {
+ final InputSplit[] emptySplits = new InputSplit[0];
+
+ InputSplitAssigner assigner1 = mock(InputSplitAssigner.class);
+ InputSplitAssigner assigner2 = mock(InputSplitAssigner.class);
+
+ @SuppressWarnings("unchecked")
+ InputSplitSource<InputSplit> source1 = mock(InputSplitSource.class);
+ @SuppressWarnings("unchecked")
+ InputSplitSource<InputSplit> source2 = mock(InputSplitSource.class);
+
+ when(source1.createInputSplits(Matchers.anyInt())).thenReturn(emptySplits);
+ when(source2.createInputSplits(Matchers.anyInt())).thenReturn(emptySplits);
+ when(source1.getInputSplitAssigner(emptySplits)).thenReturn(assigner1);
+ when(source2.getInputSplitAssigner(emptySplits)).thenReturn(assigner2);
+
+ final JobID jobId = new JobID();
+ final String jobName = "Test Job Sample Name";
+ final Configuration cfg = new Configuration();
+
+ AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+ AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+ AbstractJobVertex v3 = new AbstractJobVertex("vertex3");
+ AbstractJobVertex v4 = new AbstractJobVertex("vertex4");
+ AbstractJobVertex v5 = new AbstractJobVertex("vertex5");
+
+ v1.setParallelism(5);
+ v2.setParallelism(7);
+ v3.setParallelism(2);
+ v4.setParallelism(11);
+ v5.setParallelism(4);
+
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.BIPARTITE);
+ v4.connectNewDataSetAsInput(v2, DistributionPattern.BIPARTITE);
+ v4.connectNewDataSetAsInput(v3, DistributionPattern.BIPARTITE);
+ v5.connectNewDataSetAsInput(v4, DistributionPattern.BIPARTITE);
+ v5.connectNewDataSetAsInput(v3, DistributionPattern.BIPARTITE);
+
+ v3.setInputSplitSource(source1);
+ v5.setInputSplitSource(source2);
+
+ List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
+
+ ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+ try {
+ eg.attachJobGraph(ordered);
+ }
+ catch (JobException e) {
+ e.printStackTrace();
+ fail("Job failed with exception: " + e.getMessage());
+ }
+
+ assertEquals(assigner1, eg.getAllVertices().get(v3.getID()).getSplitAssigner());
+ assertEquals(assigner2, eg.getAllVertices().get(v5.getID()).getSplitAssigner());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
new file mode 100644
index 0000000..9705dcd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.doAnswer;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.protocols.TaskOperationProtocol;
+import org.apache.flink.runtime.taskmanager.TaskOperationResult;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class ExecutionGraphDeploymentTest {
+
+ @Test
+ public void testBuildDeploymentDescriptor() {
+ try {
+ final JobID jobId = new JobID();
+
+ final JobVertexID jid1 = new JobVertexID();
+ final JobVertexID jid2 = new JobVertexID();
+ final JobVertexID jid3 = new JobVertexID();
+ final JobVertexID jid4 = new JobVertexID();
+
+ AbstractJobVertex v1 = new AbstractJobVertex("v1", jid1);
+ AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
+ AbstractJobVertex v3 = new AbstractJobVertex("v3", jid3);
+ AbstractJobVertex v4 = new AbstractJobVertex("v4", jid4);
+
+ v1.setParallelism(10);
+ v2.setParallelism(10);
+ v3.setParallelism(10);
+ v4.setParallelism(10);
+
+ v1.setInvokableClass(RegularPactTask.class);
+ v2.setInvokableClass(RegularPactTask.class);
+ v3.setInvokableClass(RegularPactTask.class);
+ v4.setInvokableClass(RegularPactTask.class);
+
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.BIPARTITE);
+ v3.connectNewDataSetAsInput(v2, DistributionPattern.BIPARTITE);
+ v4.connectNewDataSetAsInput(v2, DistributionPattern.BIPARTITE);
+
+ ExecutionGraph eg = spy(new ExecutionGraph(jobId, "some job", new Configuration()));
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ final Runnable parameter = (Runnable) invocation.getArguments()[0];
+ parameter.run();
+ return null;
+ }
+
+ }).when(eg).execute(Matchers.any(Runnable.class));
+
+ List<AbstractJobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
+
+ eg.attachJobGraph(ordered);
+
+ ExecutionJobVertex ejv = eg.getAllVertices().get(jid2);
+ ExecutionVertex2 vertex = ejv.getTaskVertices()[3];
+
+ // just some reference (needs not be atomic)
+ final AtomicReference<TaskDeploymentDescriptor> reference = new AtomicReference<TaskDeploymentDescriptor>();
+
+ // mock taskmanager to simply accept the call
+ TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+ when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenAnswer(new Answer<TaskOperationResult>() {
+ @Override
+ public TaskOperationResult answer(InvocationOnMock invocation) {
+ final TaskDeploymentDescriptor parameter = (TaskDeploymentDescriptor) invocation.getArguments()[0];
+ reference.set(parameter);
+ return new TaskOperationResult(jid2, 0, true);
+ }
+ });
+
+ final Instance instance = getInstance(taskManager);
+ final AllocatedSlot slot = instance.allocateSlot(jobId);
+
+ assertEquals(ExecutionState2.CREATED, vertex.getExecutionState());
+ vertex.deployToSlot(slot);
+ assertEquals(ExecutionState2.RUNNING, vertex.getExecutionState());
+
+ TaskDeploymentDescriptor descr = reference.get();
+ assertNotNull(descr);
+
+ assertEquals(jobId, descr.getJobID());
+ assertEquals(jid2, descr.getVertexID());
+ assertEquals(3, descr.getIndexInSubtaskGroup());
+ assertEquals(10, descr.getCurrentNumberOfSubtasks());
+ assertEquals(RegularPactTask.class.getName(), descr.getInvokableClassName());
+ assertEquals("v2", descr.getTaskName());
+
+ assertEquals(2, descr.getOutputGates().size());
+ assertEquals(1, descr.getInputGates().size());
+
+ assertEquals(10, descr.getOutputGates().get(0).getChannels().size());
+ assertEquals(10, descr.getOutputGates().get(1).getChannels().size());
+ assertEquals(10, descr.getInputGates().get(0).getChannels().size());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTest.java
deleted file mode 100644
index 36e3640..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTest.java
+++ /dev/null
@@ -1,955 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.executiongraph;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobgraph.JobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
-import org.apache.flink.runtime.jobgraph.JobTaskVertex;
-import org.apache.flink.runtime.operators.DataSinkTask;
-import org.apache.flink.runtime.operators.DataSourceTask;
-import org.apache.flink.runtime.testutils.ServerTestUtils;
-import org.junit.Test;
-import org.apache.flink.api.java.io.DiscardingOuputFormat;
-import org.apache.flink.api.java.io.TextInputFormat;
-
-/**
- * This class contains test concerning the correct conversion from {@link JobGraph} to {@link ExecutionGraph} objects.
- *
- */
-public class ExecutionGraphTest {
- /*
- * input1 -> task1 -> output1
- * output1 shares instance with input1
- * input1 shares instance with task1
- * no subtasks defined
- * input1 is default, task1 is m1.large, output1 is m1.xlarge
- * no channel types defined
- */
- @Test
- public void testConvertJobGraphToExecutionGraph1() {
-
- File inputFile = null;
- JobID jobID = null;
-
- try {
- inputFile = ServerTestUtils.createInputFile(0);
-
- // create job graph
- final JobGraph jg = new JobGraph("Job Graph 1");
- jobID = jg.getJobID();
-
- // input vertex
- final JobInputVertex i1 = new JobInputVertex("Input 1", jg);
- i1.setNumberOfSubtasks(1);
- i1.setInvokableClass(DataSourceTask.class);
- TextInputFormat inputFormat = new TextInputFormat(new Path(inputFile.toURI()));
- i1.setInputFormat(inputFormat);
-
- // task vertex
- final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
- t1.setInvokableClass(ForwardTask1Input1Output.class);
-
- // output vertex
- final JobOutputVertex o1 = new JobOutputVertex("Output 1", jg);
- o1.setNumberOfSubtasks(1);
- o1.setInvokableClass(DataSinkTask.class);
- o1.setOutputFormat(new DiscardingOuputFormat<Object>());
-
- o1.setVertexToShareInstancesWith(i1);
- i1.setVertexToShareInstancesWith(t1);
-
- // connect vertices
- i1.connectTo(t1);
- t1.connectTo(o1);
-
- LibraryCacheManager.register(jobID, new String[0]);
-
- final ExecutionGraph eg = new ExecutionGraph(jg, 1);
-
- // test all methods of ExecutionGraph
- final ExecutionStage executionStage = eg.getCurrentExecutionStage();
- assertEquals(1, executionStage.getMaxNumberSubtasks());
-
- assertEquals(jobID, eg.getJobID());
- assertEquals(0, eg.getIndexOfCurrentExecutionStage());
- assertEquals(1, eg.getNumberOfInputVertices());
- assertEquals(1, eg.getNumberOfOutputVertices());
- assertEquals(1, eg.getNumberOfStages());
- assertNotNull(eg.getInputVertex(0));
- assertNull(eg.getInputVertex(1));
- assertNotNull(eg.getOutputVertex(0));
- assertNull(eg.getOutputVertex(1));
- assertNotNull(eg.getStage(0));
- assertNull(eg.getStage(1));
-
- // test all methods of ExecutionStage stage0
- ExecutionStage es = eg.getStage(0);
-
- assertEquals(3, es.getNumberOfStageMembers());
- assertEquals(0, es.getStageNumber());
- assertNotNull(es.getStageMember(0));
- assertNotNull(es.getStageMember(1));
- assertNotNull(es.getStageMember(2));
- assertNull(es.getStageMember(3));
-
- // test all methods of ExecutionGroupVertex
- ExecutionGroupVertex egv0 = null; // input1
- ExecutionGroupVertex egv1 = null; // output1
- ExecutionGroupVertex egv2 = null; // task1
-
- if (es.getStageMember(0).getName().equals("Input 1")) {
- egv0 = es.getStageMember(0);
- } else if (es.getStageMember(0).getName().equals("Output 1")) {
- egv1 = es.getStageMember(0);
- } else {
- egv2 = es.getStageMember(0);
- }
-
- if (es.getStageMember(1).getName().equals("Input 1")) {
- egv0 = es.getStageMember(1);
- } else if (es.getStageMember(1).getName().equals("Output 1")) {
- egv1 = es.getStageMember(1);
- } else {
- egv2 = es.getStageMember(1);
- }
-
- if (es.getStageMember(2).getName().equals("Input 1")) {
- egv0 = es.getStageMember(2);
- } else if (es.getStageMember(2).getName().equals("Output 1")) {
- egv1 = es.getStageMember(2);
- } else {
- egv2 = es.getStageMember(2);
- }
-
- // egv0 (input1)
- assertNull(egv0.getBackwardEdge(0));
- assertNotNull(egv0.getConfiguration());
- assertEquals(1, egv0.getCurrentNumberOfGroupMembers());
- assertNotNull(egv0.getExecutionSignature());
- assertEquals(es, egv0.getExecutionStage());
- assertNotNull(egv0.getForwardEdge(0));
- assertNull(egv0.getForwardEdge(1));
- assertNotNull(egv0.getForwardEdges(egv2));
- assertNotNull(egv0.getGroupMember(0));
- assertNull(egv0.getGroupMember(1));
- assertEquals(1, egv0.getInputSplits().length);
- assertEquals("Input 1", egv0.getName());
- assertEquals(0, egv0.getNumberOfBackwardLinks());
- assertEquals(1, egv0.getNumberOfForwardLinks());
- assertEquals(0, egv0.getStageNumber());
- assertEquals(1, egv0.getUserDefinedNumberOfMembers());
- assertEquals("Task 1", egv0.getVertexToShareInstancesWith().getName());
-
- // egv1 (output1)
- assertNotNull(egv1.getBackwardEdge(0));
- assertNull(egv1.getBackwardEdge(1));
- assertNotNull(egv1.getBackwardEdges(egv2));
- assertNotNull(egv1.getConfiguration());
- assertEquals(1, egv1.getCurrentNumberOfGroupMembers());
- assertNotNull(egv1.getExecutionSignature());
- assertEquals(es, egv1.getExecutionStage());
- assertNull(egv1.getForwardEdge(0));
- assertNotNull(egv1.getGroupMember(0));
- assertNull(egv1.getGroupMember(1));
- assertEquals("Output 1", egv1.getName());
- assertEquals(1, egv1.getNumberOfBackwardLinks());
- assertEquals(0, egv1.getNumberOfForwardLinks());
- assertEquals(0, egv1.getStageNumber());
- assertEquals(1, egv1.getUserDefinedNumberOfMembers());
- assertEquals("Input 1", egv1.getVertexToShareInstancesWith().getName());
-
- // egv2 (task1)
- assertNotNull(egv2.getBackwardEdge(0));
- assertNull(egv2.getBackwardEdge(1));
- assertNotNull(egv2.getBackwardEdges(egv0));
- assertNotNull(egv2.getConfiguration());
- assertEquals(1, egv2.getCurrentNumberOfGroupMembers());
- assertNotNull(egv2.getExecutionSignature());
- assertEquals(es, egv2.getExecutionStage());
- assertNotNull(egv2.getForwardEdge(0));
- assertNull(egv2.getForwardEdge(1));
- assertNotNull(egv2.getForwardEdges(egv1));
- assertNotNull(egv2.getGroupMember(0));
- assertNull(egv2.getGroupMember(1));
- assertEquals("Task 1", egv2.getName());
- assertEquals(1, egv2.getNumberOfBackwardLinks());
- assertEquals(1, egv2.getNumberOfForwardLinks());
- assertEquals(0, egv2.getStageNumber());
- assertEquals(1, egv2.getUserDefinedNumberOfMembers());
- assertNull(egv2.getVertexToShareInstancesWith());
-
- // test all methods of ExecutionVertex
- ExecutionVertex ev0 = egv0.getGroupMember(0); // input1
- ExecutionVertex ev1 = egv1.getGroupMember(0); // output1
- ExecutionVertex ev2 = egv2.getGroupMember(0); // task1
-
- // ev0 (input1)
- assertEquals(egv0, ev0.getGroupVertex());
- assertNotNull(ev0.getID());
- assertEquals("Input 1", ev0.getName());
-
- // ev1 (output1)
- assertEquals(egv1, ev1.getGroupVertex());
- assertNotNull(ev1.getID());
- assertEquals("Output 1", ev1.getName());
-
- // ev2 (task1)
- assertEquals(egv2, ev2.getGroupVertex());
- assertNotNull(ev2.getID());
- assertEquals("Task 1", ev2.getName());
-
- assertEquals(ev0.getAllocatedResource(), ev1.getAllocatedResource());
- assertEquals(ev0.getAllocatedResource(), ev2.getAllocatedResource());
-
- // test channels
- assertEquals(ChannelType.NETWORK, eg.getChannelType(ev0, ev2));
- assertEquals(ChannelType.NETWORK, eg.getChannelType(ev2, ev1));
-
- } catch (GraphConversionException e) {
- fail(e.getMessage());
- } catch (JobGraphDefinitionException e) {
- fail(e.getMessage());
- } catch (IOException e) {
- fail(e.getMessage());
- } finally {
- if (inputFile != null) {
- inputFile.delete();
- }
- if (jobID != null) {
- try {
- LibraryCacheManager.unregister(jobID);
- } catch (IOException e) {
- }
- }
- }
- }
-
- /*
- * input1 -> task1 -> output1
- * no subtasks defined
- * input1 is default, task1 is m1.large, output1 is m1.xlarge
- * all channels are INMEMORY
- */
- @Test
- public void testConvertJobGraphToExecutionGraph2() {
-
- File inputFile = null;
- JobID jobID = null;
-
- try {
- inputFile = ServerTestUtils.createInputFile(0);
-
- // create job graph
- final JobGraph jg = new JobGraph("Job Graph 1");
- jobID = jg.getJobID();
-
- // input vertex
- final JobInputVertex i1 = new JobInputVertex("Input 1", jg);
- i1.setInvokableClass(DataSourceTask.class);
- i1.setInputFormat(new TextInputFormat(new Path(inputFile.toURI())));
- i1.setNumberOfSubtasks(1);
-
- // task vertex
- final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
- t1.setInvokableClass(ForwardTask1Input1Output.class);
-
- // output vertex
- final JobOutputVertex o1 = new JobOutputVertex("Output 1", jg);
- o1.setNumberOfSubtasks(1);
- o1.setInvokableClass(DataSinkTask.class);
- o1.setOutputFormat(new DiscardingOuputFormat<Object>());
-
- // connect vertices
- i1.connectTo(t1, ChannelType.IN_MEMORY);
- t1.connectTo(o1, ChannelType.IN_MEMORY);
-
- LibraryCacheManager.register(jobID, new String[0]);
-
- // now convert job graph to execution graph
- final ExecutionGraph eg = new ExecutionGraph(jg, 1);
-
- // test instance types in ExecutionGraph
- final ExecutionStage executionStage = eg.getCurrentExecutionStage();
- assertEquals(1, executionStage.getMaxNumberSubtasks());
-
- // stage0
- ExecutionStage es = eg.getStage(0);
- ExecutionGroupVertex egv0 = null; // input1
- ExecutionGroupVertex egv1 = null; // output1
- ExecutionGroupVertex egv2 = null; // task1
- if (es.getStageMember(0).getName().equals("Input 1")) {
- egv0 = es.getStageMember(0);
- } else if (es.getStageMember(0).getName().equals("Output 1")) {
- egv1 = es.getStageMember(0);
- } else {
- egv2 = es.getStageMember(0);
- }
- if (es.getStageMember(1).getName().equals("Input 1")) {
- egv0 = es.getStageMember(1);
- } else if (es.getStageMember(1).getName().equals("Output 1")) {
- egv1 = es.getStageMember(1);
- } else {
- egv2 = es.getStageMember(1);
- }
- if (es.getStageMember(2).getName().equals("Input 1")) {
- egv0 = es.getStageMember(2);
- } else if (es.getStageMember(2).getName().equals("Output 1")) {
- egv1 = es.getStageMember(2);
- } else {
- egv2 = es.getStageMember(2);
- }
- ExecutionVertex ev0 = egv0.getGroupMember(0); // input1
- ExecutionVertex ev1 = egv1.getGroupMember(0); // output1
- ExecutionVertex ev2 = egv2.getGroupMember(0); // task1
- assertEquals(ev0.getAllocatedResource(), ev1.getAllocatedResource());
- assertEquals(ev0.getAllocatedResource(), ev2.getAllocatedResource());
- } catch (GraphConversionException e) {
- fail(e.getMessage());
- } catch (IOException e) {
- fail(e.getMessage());
- } catch (JobGraphDefinitionException e) {
- fail(e.getMessage());
- } finally {
- if (inputFile != null) {
- inputFile.delete();
- }
- if (jobID != null) {
- try {
- LibraryCacheManager.unregister(jobID);
- } catch (IOException e) {
- }
- }
- }
- }
-
- /*
- * input1 -> task1 ->
- * task3 -> output1
- * input2 -> task2 ->
- * each vertex has 2 subtasks
- * no instance types defined
- * no channel types defined
- */
- @Test
- public void testConvertJobGraphToExecutionGraph3() {
-
- File inputFile1 = null;
- File inputFile2 = null;
- File outputFile = null;
- JobID jobID = null;
-
- try {
-
- inputFile1 = ServerTestUtils.createInputFile(0);
- inputFile2 = ServerTestUtils.createInputFile(0);
- outputFile = new File(ServerTestUtils.getRandomFilename());
-
- // create job graph
- final JobGraph jg = new JobGraph("Job Graph 1");
- jobID = jg.getJobID();
-
- // input vertex
- final JobInputVertex i1 = new JobInputVertex("Input 1", jg);
- i1.setInvokableClass(DataSourceTask.class);
- i1.setInputFormat(new TextInputFormat(new Path(inputFile1.toURI())));
- i1.setNumberOfSubtasks(2);
-
- final JobInputVertex i2 = new JobInputVertex("Input 2", jg);
- i2.setInvokableClass(DataSourceTask.class);
- i2.setInputFormat(new TextInputFormat(new Path(inputFile2.toURI())));
- i2.setNumberOfSubtasks(2);
-
- // task vertex
- final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
- t1.setInvokableClass(ForwardTask1Input1Output.class);
- t1.setNumberOfSubtasks(2);
- final JobTaskVertex t2 = new JobTaskVertex("Task 2", jg);
- t2.setInvokableClass(ForwardTask1Input1Output.class);
- t2.setNumberOfSubtasks(2);
- final JobTaskVertex t3 = new JobTaskVertex("Task 3", jg);
- t3.setInvokableClass(ForwardTask2Inputs1Output.class);
- t3.setNumberOfSubtasks(2);
-
-
- // output vertex
- final JobOutputVertex o1 = new JobOutputVertex("Output 1", jg);
- o1.setInvokableClass(DataSinkTask.class);
- o1.setOutputFormat(new DiscardingOuputFormat<Object>());
- o1.setNumberOfSubtasks(2);
- i1.setVertexToShareInstancesWith(t1);
- t1.setVertexToShareInstancesWith(t3);
- i2.setVertexToShareInstancesWith(t2);
- t2.setVertexToShareInstancesWith(t3);
- t3.setVertexToShareInstancesWith(o1);
-
- // connect vertices
- i1.connectTo(t1);
- i2.connectTo(t2);
- t1.connectTo(t3);
- t2.connectTo(t3);
- t3.connectTo(o1);
-
- LibraryCacheManager.register(jobID, new String[0]);
-
- final ExecutionGraph eg = new ExecutionGraph(jg, 1);
-
- // test instance types in ExecutionGraph
- final ExecutionStage executionStage = eg.getCurrentExecutionStage();
- assertEquals(2, executionStage.getMaxNumberSubtasks());
-
- // stage0
- final ExecutionStage es = eg.getStage(0);
- ExecutionGroupVertex egv0 = null; // input1
- ExecutionGroupVertex egv1 = null; // input2
- ExecutionGroupVertex egv2 = null; // task1
- ExecutionGroupVertex egv3 = null; // task2
- ExecutionGroupVertex egv4 = null; // task3
- ExecutionGroupVertex egv5 = null; // output1
- if (es.getStageMember(0).getName().equals("Input 1")) {
- egv0 = es.getStageMember(0);
- } else if (es.getStageMember(0).getName().equals("Input 2")) {
- egv1 = es.getStageMember(0);
- } else if (es.getStageMember(0).getName().equals("Task 1")) {
- egv2 = es.getStageMember(0);
- } else if (es.getStageMember(0).getName().equals("Task 2")) {
- egv3 = es.getStageMember(0);
- } else if (es.getStageMember(0).getName().equals("Task 3")) {
- egv4 = es.getStageMember(0);
- } else {
- egv5 = es.getStageMember(0);
- }
-
- if (es.getStageMember(1).getName().equals("Input 1")) {
- egv0 = es.getStageMember(1);
- } else if (es.getStageMember(1).getName().equals("Input 2")) {
- egv1 = es.getStageMember(1);
- } else if (es.getStageMember(1).getName().equals("Task 1")) {
- egv2 = es.getStageMember(1);
- } else if (es.getStageMember(1).getName().equals("Task 2")) {
- egv3 = es.getStageMember(1);
- } else if (es.getStageMember(1).getName().equals("Task 3")) {
- egv4 = es.getStageMember(1);
- } else {
- egv5 = es.getStageMember(1);
- }
- if (es.getStageMember(2).getName().equals("Input 1")) {
- egv0 = es.getStageMember(2);
- } else if (es.getStageMember(2).getName().equals("Input 2")) {
- egv1 = es.getStageMember(2);
- } else if (es.getStageMember(2).getName().equals("Task 1")) {
- egv2 = es.getStageMember(2);
- } else if (es.getStageMember(2).getName().equals("Task 2")) {
- egv3 = es.getStageMember(2);
- } else if (es.getStageMember(2).getName().equals("Task 3")) {
- egv4 = es.getStageMember(2);
- } else {
- egv5 = es.getStageMember(2);
- }
- if (es.getStageMember(3).getName().equals("Input 1")) {
- egv0 = es.getStageMember(3);
- } else if (es.getStageMember(3).getName().equals("Input 2")) {
- egv1 = es.getStageMember(3);
- } else if (es.getStageMember(3).getName().equals("Task 1")) {
- egv2 = es.getStageMember(3);
- } else if (es.getStageMember(3).getName().equals("Task 2")) {
- egv3 = es.getStageMember(3);
- } else if (es.getStageMember(3).getName().equals("Task 3")) {
- egv4 = es.getStageMember(3);
- } else {
- egv5 = es.getStageMember(3);
- }
- if (es.getStageMember(4).getName().equals("Input 1")) {
- egv0 = es.getStageMember(4);
- } else if (es.getStageMember(4).getName().equals("Input 2")) {
- egv1 = es.getStageMember(4);
- } else if (es.getStageMember(4).getName().equals("Task 1")) {
- egv2 = es.getStageMember(4);
- } else if (es.getStageMember(4).getName().equals("Task 2")) {
- egv3 = es.getStageMember(4);
- } else if (es.getStageMember(4).getName().equals("Task 3")) {
- egv4 = es.getStageMember(4);
- } else {
- egv5 = es.getStageMember(4);
- }
- if (es.getStageMember(5).getName().equals("Input 1")) {
- egv0 = es.getStageMember(5);
- } else if (es.getStageMember(5).getName().equals("Input 2")) {
- egv1 = es.getStageMember(5);
- } else if (es.getStageMember(5).getName().equals("Task 1")) {
- egv2 = es.getStageMember(5);
- } else if (es.getStageMember(5).getName().equals("Task 2")) {
- egv3 = es.getStageMember(5);
- } else if (es.getStageMember(5).getName().equals("Task 3")) {
- egv4 = es.getStageMember(5);
- } else {
- egv5 = es.getStageMember(5);
- }
- final ExecutionVertex i1_0 = egv0.getGroupMember(0); // input1
- final ExecutionVertex i1_1 = egv0.getGroupMember(1); // input1
- final ExecutionVertex i2_0 = egv1.getGroupMember(0); // input2
- final ExecutionVertex i2_1 = egv1.getGroupMember(1); // input2
- final ExecutionVertex t1_0 = egv2.getGroupMember(0); // task1
- final ExecutionVertex t1_1 = egv2.getGroupMember(1); // task1
- final ExecutionVertex t2_0 = egv3.getGroupMember(0); // task2
- final ExecutionVertex t2_1 = egv3.getGroupMember(1); // task2
- final ExecutionVertex t3_0 = egv4.getGroupMember(0); // task3
- final ExecutionVertex t3_1 = egv4.getGroupMember(1); // task3
- final ExecutionVertex o1_0 = egv5.getGroupMember(0); // output1
- final ExecutionVertex o1_1 = egv5.getGroupMember(1); // otuput1
-
- // instance 1
- assertTrue((t1_0.getAllocatedResource().equals(i1_0.getAllocatedResource()) && !t1_0.getAllocatedResource()
- .equals(i1_1.getAllocatedResource()))
- || (!t1_0.getAllocatedResource().equals(i1_0.getAllocatedResource()) && t1_0.getAllocatedResource()
- .equals(i1_1.getAllocatedResource())));
- assertTrue((t1_0.getAllocatedResource().equals(i2_0.getAllocatedResource()) && !t1_0.getAllocatedResource()
- .equals(i2_1.getAllocatedResource()))
- || (!t1_0.getAllocatedResource().equals(i2_0.getAllocatedResource()) && t1_0.getAllocatedResource()
- .equals(i2_1.getAllocatedResource())));
- assertTrue((t1_0.getAllocatedResource().equals(t2_0.getAllocatedResource()) && !t1_0.getAllocatedResource()
- .equals(t2_1.getAllocatedResource()))
- || (!t1_0.getAllocatedResource().equals(t2_0.getAllocatedResource()) && t1_0.getAllocatedResource()
- .equals(t2_1.getAllocatedResource())));
- assertTrue((t1_0.getAllocatedResource().equals(t3_0.getAllocatedResource()) && !t1_0.getAllocatedResource()
- .equals(t3_1.getAllocatedResource()))
- || (!t1_0.getAllocatedResource().equals(t3_0.getAllocatedResource()) && t1_0.getAllocatedResource()
- .equals(t3_1.getAllocatedResource())));
- assertTrue((t1_0.getAllocatedResource().equals(o1_0.getAllocatedResource()) && !t1_0.getAllocatedResource()
- .equals(o1_1.getAllocatedResource()))
- || (!t1_0.getAllocatedResource().equals(o1_0.getAllocatedResource()) && t1_0.getAllocatedResource()
- .equals(o1_1.getAllocatedResource())));
- // instance 2
- assertTrue((t1_1.getAllocatedResource().equals(i1_0.getAllocatedResource()) && !t1_1.getAllocatedResource()
- .equals(i1_1.getAllocatedResource()))
- || (!t1_1.getAllocatedResource().equals(i1_0.getAllocatedResource()) && t1_1.getAllocatedResource()
- .equals(i1_1.getAllocatedResource())));
- assertTrue((t1_1.getAllocatedResource().equals(i2_0.getAllocatedResource()) && !t1_1.getAllocatedResource()
- .equals(i2_1.getAllocatedResource()))
- || (!t1_1.getAllocatedResource().equals(i2_0.getAllocatedResource()) && t1_1.getAllocatedResource()
- .equals(i2_1.getAllocatedResource())));
- assertTrue((t1_1.getAllocatedResource().equals(t2_0.getAllocatedResource()) && !t1_1.getAllocatedResource()
- .equals(t2_1.getAllocatedResource()))
- || (!t1_1.getAllocatedResource().equals(t2_0.getAllocatedResource()) && t1_1.getAllocatedResource()
- .equals(t2_1.getAllocatedResource())));
- assertTrue((t1_1.getAllocatedResource().equals(t3_0.getAllocatedResource()) && !t1_1.getAllocatedResource()
- .equals(t3_1.getAllocatedResource()))
- || (!t1_1.getAllocatedResource().equals(t3_0.getAllocatedResource()) && t1_1.getAllocatedResource()
- .equals(t3_1.getAllocatedResource())));
- assertTrue((t1_1.getAllocatedResource().equals(o1_0.getAllocatedResource()) && !t1_1.getAllocatedResource()
- .equals(o1_1.getAllocatedResource()))
- || (!t1_1.getAllocatedResource().equals(o1_0.getAllocatedResource()) && t1_1.getAllocatedResource()
- .equals(o1_1.getAllocatedResource())));
- } catch (GraphConversionException e) {
- fail(e.getMessage());
- } catch (IOException e) {
- fail(e.getMessage());
- } catch (JobGraphDefinitionException e) {
- fail(e.getMessage());
- } finally {
- if (inputFile1 != null) {
- inputFile1.delete();
- }
- if (inputFile2 != null) {
- inputFile2.delete();
- }
- if (outputFile != null) {
- outputFile.delete();
- }
- if (jobID != null) {
- try {
- LibraryCacheManager.unregister(jobID);
- } catch (IOException ioe) {
- }
- }
- }
- }
-
- /*
- * input1 -> task1 -> output1
- * -> task3 -> task4
- * input2 -> task2 -> output2
- * all subtasks defined
- * all instance types defined
- * all channel types defined
- */
- @Test
- public void testConvertJobGraphToExecutionGraph4() {
-
- File inputFile1 = null;
- File inputFile2 = null;
- File outputFile1 = null;
- File outputFile2 = null;
- JobID jobID = null;
-
- try {
-
- inputFile1 = ServerTestUtils.createInputFile(0);
- inputFile2 = ServerTestUtils.createInputFile(0);
- outputFile1 = new File(ServerTestUtils.getRandomFilename());
- outputFile2 = new File(ServerTestUtils.getRandomFilename());
-
- // create job graph
- final JobGraph jg = new JobGraph("Job Graph 1");
- jobID = jg.getJobID();
-
- // input vertex
- final JobInputVertex i1 = new JobInputVertex("Input 1", jg);
- i1.setInvokableClass(DataSourceTask.class);
- i1.setInputFormat(new TextInputFormat(new Path(inputFile1.toURI())));
- i1.setNumberOfSubtasks(4);
- final JobInputVertex i2 = new JobInputVertex("Input 2", jg);
- i2.setInvokableClass(DataSourceTask.class);
- i2.setInputFormat(new TextInputFormat(new Path(inputFile2.toURI())));
- i2.setNumberOfSubtasks(4);
- // task vertex
- final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
- t1.setInvokableClass(ForwardTask1Input1Output.class);
- t1.setNumberOfSubtasks(4);
- final JobTaskVertex t2 = new JobTaskVertex("Task 2", jg);
- t2.setInvokableClass(ForwardTask1Input1Output.class);
- t2.setNumberOfSubtasks(4);
- final JobTaskVertex t3 = new JobTaskVertex("Task 3", jg);
- t3.setInvokableClass(ForwardTask2Inputs1Output.class);
- t3.setNumberOfSubtasks(8);
- final JobTaskVertex t4 = new JobTaskVertex("Task 4", jg);
- t4.setInvokableClass(ForwardTask1Input2Outputs.class);
- t4.setNumberOfSubtasks(8);
- // output vertex
- final JobOutputVertex o1 = new JobOutputVertex("Output 1", jg);
- o1.setInvokableClass(DataSinkTask.class);
- o1.setOutputFormat(new DiscardingOuputFormat<Object>());
- o1.setNumberOfSubtasks(4);
- final JobOutputVertex o2 = new JobOutputVertex("Output 2", jg);
- o2.setInvokableClass(DataSinkTask.class);
- o2.setOutputFormat(new DiscardingOuputFormat<Object>());
- o2.setNumberOfSubtasks(4);
- o1.setVertexToShareInstancesWith(o2);
-
- // connect vertices
- i1.connectTo(t1, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
- i2.connectTo(t2, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
- t1.connectTo(t3, ChannelType.NETWORK);
- t2.connectTo(t3, ChannelType.NETWORK);
- t3.connectTo(t4, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
- t4.connectTo(o1, ChannelType.NETWORK);
- t4.connectTo(o2, ChannelType.NETWORK);
-
- LibraryCacheManager.register(jobID, new String[0]);
-
- // now convert job graph to execution graph
- final ExecutionGraph eg = new ExecutionGraph(jg, 1);
-
- // test instance types in ExecutionGraph
- ExecutionStage executionStage = eg.getCurrentExecutionStage();
- assertNotNull(executionStage);
- assertEquals(0, executionStage.getStageNumber());
-
- assertEquals(20, executionStage.getRequiredSlots());
- // Fake transition to next stage by triggering execution state changes manually
- final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(eg, eg.getIndexOfCurrentExecutionStage(),
- true, true);
-
- while (it.hasNext()) {
- final ExecutionVertex ev = it.next();
- ev.updateExecutionState(ExecutionState.SCHEDULED);
- ev.updateExecutionState(ExecutionState.ASSIGNED);
- ev.updateExecutionState(ExecutionState.READY);
- ev.updateExecutionState(ExecutionState.STARTING);
- ev.updateExecutionState(ExecutionState.RUNNING);
- ev.updateExecutionState(ExecutionState.FINISHING);
- ev.updateExecutionState(ExecutionState.FINISHED);
- }
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- } finally {
- if (inputFile1 != null) {
- inputFile1.delete();
- }
- if (inputFile2 != null) {
- inputFile2.delete();
- }
- if (outputFile1 != null) {
- outputFile1.delete();
- }
- if (outputFile2 != null) {
- outputFile2.delete();
- }
- if (jobID != null) {
- try {
- LibraryCacheManager.unregister(jobID);
- } catch (IOException e) {
- }
- }
- }
- }
-
- /**
- * Tests the conversion of a job graph representing a self cross to an execution graph.
- */
- @Test
- public void testConvertSelfCross() {
-
- final String inputTaskName = "Self Cross Input";
- final String crossTaskName = "Self Cross Task";
- final String outputTaskName = "Self Cross Output";
- final int degreeOfParallelism = 4;
- File inputFile = null;
- File outputFile = null;
- JobID jobID = null;
-
- try {
- inputFile = ServerTestUtils.createInputFile(0);
- outputFile = new File(ServerTestUtils.getRandomFilename());
-
- // create job graph
- final JobGraph jg = new JobGraph("Self Cross Test Job");
- jobID = jg.getJobID();
-
- // input vertex
- final JobInputVertex input = new JobInputVertex(inputTaskName, jg);
- input.setInvokableClass(DataSourceTask.class);
- input.setInputFormat(new TextInputFormat(new Path(inputFile.toURI())));
- input.setNumberOfSubtasks(degreeOfParallelism);
-
- // cross vertex
- final JobTaskVertex cross = new JobTaskVertex(crossTaskName, jg);
- cross.setInvokableClass(SelfCrossForwardTask.class);
- cross.setNumberOfSubtasks(degreeOfParallelism);
-
- // output vertex
- final JobOutputVertex output = new JobOutputVertex(outputTaskName, jg);
- output.setInvokableClass(DataSinkTask.class);
- output.setOutputFormat(new DiscardingOuputFormat<Object>());
- output.setNumberOfSubtasks(degreeOfParallelism);
-
- // connect vertices
- input.connectTo(cross, ChannelType.IN_MEMORY, 0, 0,
- DistributionPattern.POINTWISE);
- input.connectTo(cross, ChannelType.NETWORK, 1, 1,
- DistributionPattern.BIPARTITE);
- cross.connectTo(output, ChannelType.IN_MEMORY, 0, 0,
- DistributionPattern.POINTWISE);
-
- LibraryCacheManager.register(jobID, new String[0]);
-
- // now convert job graph to execution graph
- final ExecutionGraph eg = new ExecutionGraph(jg, 1);
-
- assertEquals(1, eg.getNumberOfStages());
-
- final ExecutionStage stage = eg.getStage(0);
-
- assertEquals(3, stage.getNumberOfStageMembers());
-
- ExecutionGroupVertex inputGroupVertex = null;
- ExecutionGroupVertex crossGroupVertex = null;
- ExecutionGroupVertex outputGroupVertex = null;
- final ExecutionGroupVertexIterator groupIt = new ExecutionGroupVertexIterator(eg, true, -1);
- while (groupIt.hasNext()) {
-
- ExecutionGroupVertex gv = groupIt.next();
- if (inputTaskName.equals(gv.getName())) {
- inputGroupVertex = gv;
- } else if (crossTaskName.equals(gv.getName())) {
- crossGroupVertex = gv;
- } else if (outputTaskName.equals(gv.getName())) {
- outputGroupVertex = gv;
- }
- }
-
- assertNotNull(inputGroupVertex);
- assertNotNull(crossGroupVertex);
- assertNotNull(outputGroupVertex);
-
- assertEquals(degreeOfParallelism, inputGroupVertex.getCurrentNumberOfGroupMembers());
- assertEquals(degreeOfParallelism, crossGroupVertex.getCurrentNumberOfGroupMembers());
- assertEquals(degreeOfParallelism, outputGroupVertex.getCurrentNumberOfGroupMembers());
-
- // Check that all subtasks on a pipeline share the same instance
- assertEquals(inputGroupVertex.getGroupMember(0).getAllocatedResource(), crossGroupVertex.getGroupMember(0)
- .getAllocatedResource());
- assertEquals(inputGroupVertex.getGroupMember(1).getAllocatedResource(), crossGroupVertex.getGroupMember(1)
- .getAllocatedResource());
- assertEquals(inputGroupVertex.getGroupMember(2).getAllocatedResource(), crossGroupVertex.getGroupMember(2)
- .getAllocatedResource());
- assertEquals(inputGroupVertex.getGroupMember(3).getAllocatedResource(), crossGroupVertex.getGroupMember(3)
- .getAllocatedResource());
-
- assertEquals(crossGroupVertex.getGroupMember(0).getAllocatedResource(), outputGroupVertex.getGroupMember(0)
- .getAllocatedResource());
- assertEquals(crossGroupVertex.getGroupMember(1).getAllocatedResource(), outputGroupVertex.getGroupMember(1)
- .getAllocatedResource());
- assertEquals(crossGroupVertex.getGroupMember(2).getAllocatedResource(), outputGroupVertex.getGroupMember(2)
- .getAllocatedResource());
- assertEquals(crossGroupVertex.getGroupMember(3).getAllocatedResource(), outputGroupVertex.getGroupMember(3)
- .getAllocatedResource());
-
- // Check that all subtasks on different pipelines run on different instances
- assertFalse(inputGroupVertex.getGroupMember(0).getAllocatedResource()
- .equals(inputGroupVertex.getGroupMember(1).getAllocatedResource()));
- assertFalse(inputGroupVertex.getGroupMember(1).getAllocatedResource()
- .equals(inputGroupVertex.getGroupMember(2).getAllocatedResource()));
- assertFalse(inputGroupVertex.getGroupMember(2).getAllocatedResource()
- .equals(inputGroupVertex.getGroupMember(3).getAllocatedResource()));
-
- } catch (GraphConversionException e) {
- fail(e.getMessage());
- } catch (JobGraphDefinitionException e) {
- fail(e.getMessage());
- } catch (IOException ioe) {
- fail(ioe.getMessage());
- } finally {
- if (inputFile != null) {
- inputFile.delete();
- }
- if (outputFile != null) {
- outputFile.delete();
- }
- if (jobID != null) {
- try {
- LibraryCacheManager.unregister(jobID);
- } catch (IOException e) {
- }
- }
- }
- }
-
- /**
- * This test checks the correctness of the instance sharing API. In particular, the test checks the behavior of the
- * instance sharing as reported broken in ticket #198
- */
- @Test
- public void testInstanceSharing() {
-
- final int degreeOfParallelism = 4;
- File inputFile1 = null;
- File outputFile1 = null;
- JobID jobID = null;
-
- try {
-
- inputFile1 = ServerTestUtils.createInputFile(0);
- outputFile1 = new File(ServerTestUtils.getRandomFilename());
-
- // create job graph
- final JobGraph jg = new JobGraph("Instance Sharing Test Job");
- jobID = jg.getJobID();
-
- // input vertex
- final JobInputVertex input1 = new JobInputVertex("Input 1", jg);
- input1.setInvokableClass(DataSourceTask.class);
- input1.setInputFormat(new TextInputFormat(new Path(inputFile1.toURI())));
- input1.setNumberOfSubtasks(degreeOfParallelism);
-
-
-
- // forward vertex 1
- final JobTaskVertex forward1 = new JobTaskVertex("Forward 1", jg);
- forward1.setInvokableClass(ForwardTask1Input1Output.class);
- forward1.setNumberOfSubtasks(degreeOfParallelism);
-
- // forward vertex 2
- final JobTaskVertex forward2 = new JobTaskVertex("Forward 2", jg);
- forward2.setInvokableClass(ForwardTask1Input1Output.class);
- forward2.setNumberOfSubtasks(degreeOfParallelism);
-
- // forward vertex 3
- final JobTaskVertex forward3 = new JobTaskVertex("Forward 3", jg);
- forward3.setInvokableClass(ForwardTask1Input1Output.class);
- forward3.setNumberOfSubtasks(degreeOfParallelism);
-
- // output vertex
- final JobOutputVertex output1 = new JobOutputVertex("Output 1", jg);
- output1.setInvokableClass(DataSinkTask.class);
- output1.setOutputFormat(new DiscardingOuputFormat<Object>());
- output1.setNumberOfSubtasks(degreeOfParallelism);
-
- // connect vertices
- input1.connectTo(forward1, ChannelType.IN_MEMORY,
- DistributionPattern.POINTWISE);
- forward1.connectTo(forward2, ChannelType.IN_MEMORY,
- DistributionPattern.POINTWISE);
- forward2.connectTo(forward3, ChannelType.NETWORK,
- DistributionPattern.POINTWISE);
- forward3.connectTo(output1, ChannelType.IN_MEMORY);
-
- // setup instance sharing
- input1.setVertexToShareInstancesWith(forward1);
- forward1.setVertexToShareInstancesWith(forward2);
- forward2.setVertexToShareInstancesWith(forward3);
- forward3.setVertexToShareInstancesWith(output1);
-
- LibraryCacheManager.register(jobID, new String[0]);
-
- // now convert job graph to execution graph
- final ExecutionGraph eg = new ExecutionGraph(jg, 1);
-
- // Check number of stages
- assertEquals(1, eg.getNumberOfStages());
-
- // Check number of vertices in stage
- final ExecutionStage stage = eg.getStage(0);
- assertEquals(5, stage.getNumberOfStageMembers());
-
- final int numberOfRequiredSlots = stage.getMaxNumberSubtasks();
- assertEquals(degreeOfParallelism, numberOfRequiredSlots);
-
- } catch (GraphConversionException e) {
- fail(e.getMessage());
- } catch (JobGraphDefinitionException e) {
- fail(e.getMessage());
- } catch (IOException ioe) {
- fail(ioe.getMessage());
- } finally {
- if (inputFile1 != null) {
- inputFile1.delete();
- }
- if (outputFile1 != null) {
- outputFile1.delete();
- }
- if (jobID != null) {
- try {
- LibraryCacheManager.unregister(jobID);
- } catch (IOException e) {
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
new file mode 100644
index 0000000..2207475
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.protocols.TaskOperationProtocol;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class ExecutionGraphTestUtils {
+
+ // --------------------------------------------------------------------------------------------
+ // state modifications
+ // --------------------------------------------------------------------------------------------
+
+ public static void setVertexState(ExecutionVertex2 vertex, ExecutionState2 state) {
+ try {
+ Field f = ExecutionVertex2.class.getDeclaredField("state");
+ f.setAccessible(true);
+ f.set(vertex, state);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Modifying the state failed", e);
+ }
+ }
+
+ public static void setVertexResource(ExecutionVertex2 vertex, AllocatedSlot slot) {
+ try {
+ Field f = ExecutionVertex2.class.getDeclaredField("assignedSlot");
+ f.setAccessible(true);
+ f.set(vertex, slot);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Modifying the slot failed", e);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // utility mocking methods
+ // --------------------------------------------------------------------------------------------
+
+ public static Instance getInstance(final TaskOperationProtocol top) throws Exception {
+ HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
+ InetAddress address = InetAddress.getByName("127.0.0.1");
+ InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10000, 10001);
+
+ return new Instance(connection, new InstanceID(), hardwareDescription, 1) {
+ @Override
+ public TaskOperationProtocol getTaskManagerProxy() {
+ return top;
+ }
+ };
+ }
+
+ public static ExecutionJobVertex getJobVertexNotExecuting(JobVertexID id) throws JobException {
+ ExecutionJobVertex ejv = getJobVertexBase(id);
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ return null;
+ }
+ }).when(ejv).execute(Matchers.any(Runnable.class));
+
+ return ejv;
+ }
+
+ public static ExecutionJobVertex getJobVertexExecutingSynchronously(JobVertexID id) throws JobException {
+ ExecutionJobVertex ejv = getJobVertexBase(id);
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ Runnable r = (Runnable) invocation.getArguments()[0];
+ r.run();
+ return null;
+ }
+ }).when(ejv).execute(Matchers.any(Runnable.class));
+
+ return ejv;
+ }
+
+ public static ExecutionJobVertex getJobVertexExecutingAsynchronously(JobVertexID id) throws JobException {
+ ExecutionJobVertex ejv = getJobVertexBase(id);
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ Runnable r = (Runnable) invocation.getArguments()[0];
+ new Thread(r).start();
+ return null;
+ }
+ }).when(ejv).execute(Matchers.any(Runnable.class));
+
+ return ejv;
+ }
+
+ public static ExecutionJobVertex getJobVertexExecutingTriggered(JobVertexID id, final ActionQueue queue) throws JobException {
+ ExecutionJobVertex ejv = getJobVertexBase(id);
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+
+ final Runnable action = (Runnable) invocation.getArguments()[0];
+ queue.queueAction(action);
+ return null;
+ }
+ }).when(ejv).execute(Matchers.any(Runnable.class));
+
+ return ejv;
+ }
+
+ private static ExecutionJobVertex getJobVertexBase(JobVertexID id) throws JobException {
+ AbstractJobVertex ajv = new AbstractJobVertex("TestVertex", id);
+ ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
+
+ ExecutionGraph graph = new ExecutionGraph(new JobID(), "test job", new Configuration());
+
+ return spy(new ExecutionJobVertex(graph, ajv, 1));
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static final class ActionQueue {
+
+ private final LinkedBlockingQueue<Runnable> runnables = new LinkedBlockingQueue<Runnable>();
+
+ public void triggerNextAction() {
+ Runnable r = runnables.remove();
+ r.run();
+ }
+
+ public Runnable popNextAction() {
+ Runnable r = runnables.remove();
+ return r;
+ }
+
+ public void queueAction(Runnable r) {
+ this.runnables.add(r);
+ }
+ }
+}