You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:27 UTC
[03/63] [abbrv] Refactor job graph construction to incremental
attachment based
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index c28f946..768ac82 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -18,98 +18,218 @@
package org.apache.flink.runtime.jobgraph;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import java.util.List;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobTaskVertex;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
-/**
- * This class contains tests related to the JobGraph
- */
public class JobGraphTest {
- public JobGraphTest() {
- }
-
- @BeforeClass
- public static void setUpClass() throws Exception {
- }
-
- @AfterClass
- public static void tearDownClass() throws Exception {
- }
-
- @Before
- public void setUp() {
+ @Test
+ public void testSerialization() {
+ try {
+ JobGraph jg = new JobGraph("The graph");
+
+ // add some configuration values
+ {
+ jg.getJobConfiguration().setString("some key", "some value");
+ jg.getJobConfiguration().setDouble("Life of ", Math.PI);
+ }
+
+ // add some vertices
+ {
+ AbstractJobVertex source1 = new AbstractJobVertex("source1");
+ AbstractJobVertex source2 = new AbstractJobVertex("source2");
+ AbstractJobVertex target = new AbstractJobVertex("target");
+ target.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
+ target.connectNewDataSetAsInput(source2, DistributionPattern.BIPARTITE);
+
+ jg.addVertex(source1);
+ jg.addVertex(source2);
+ jg.addVertex(target);
+ }
+
+ // de-/serialize and compare
+ JobGraph copy = CommonTestUtils.createCopyWritable(jg);
+
+ assertEquals(jg.getName(), copy.getName());
+ assertEquals(jg.getJobID(), copy.getJobID());
+ assertEquals(jg.getJobConfiguration(), copy.getJobConfiguration());
+ assertEquals(jg.getNumberOfVertices(), copy.getNumberOfVertices());
+
+ for (AbstractJobVertex vertex : copy.getVertices()) {
+ AbstractJobVertex original = jg.findVertexByID(vertex.getID());
+ assertNotNull(original);
+ assertEquals(original.getName(), vertex.getName());
+ assertEquals(original.getNumberOfInputs(), vertex.getNumberOfInputs());
+ assertEquals(original.getNumberOfProducedIntermediateDataSets(), vertex.getNumberOfProducedIntermediateDataSets());
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
-
- @After
- public void tearDown() {
+
+ @Test
+ public void testTopologicalSort1() {
+ try {
+ AbstractJobVertex source1 = new AbstractJobVertex("source1");
+ AbstractJobVertex source2 = new AbstractJobVertex("source2");
+ AbstractJobVertex target1 = new AbstractJobVertex("target1");
+ AbstractJobVertex target2 = new AbstractJobVertex("target2");
+ AbstractJobVertex intermediate1 = new AbstractJobVertex("intermediate1");
+ AbstractJobVertex intermediate2 = new AbstractJobVertex("intermediate2");
+
+ target1.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
+ target2.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
+ target2.connectNewDataSetAsInput(intermediate2, DistributionPattern.POINTWISE);
+ intermediate2.connectNewDataSetAsInput(intermediate1, DistributionPattern.POINTWISE);
+ intermediate1.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
+
+ JobGraph graph = new JobGraph("TestGraph", source1, source2, intermediate1, intermediate2, target1, target2);
+ List<AbstractJobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
+
+ assertBefore(source1, target1, sorted);
+ assertBefore(source1, target2, sorted);
+ assertBefore(source2, target2, sorted);
+ assertBefore(source2, intermediate1, sorted);
+ assertBefore(source2, intermediate2, sorted);
+ assertBefore(intermediate1, target2, sorted);
+ assertBefore(intermediate2, target2, sorted);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
-
+
@Test
- /**
- * This test ensures that the JobGraph edges are correctly set (forward/backward edges)
- */
- public void testJobGraph() {
- // check if the backward edge really points to the preceding vertex
- final JobGraph jg = new JobGraph();
-
- final JobTaskVertex v1 = new JobTaskVertex(jg);
- final JobTaskVertex v2 = new JobTaskVertex(jg);
-
+ public void testTopologicalSort2() {
try {
- v1.connectTo(v2);
- } catch (JobGraphDefinitionException ex) {
- Logger.getLogger(JobGraphTest.class.getName()).log(Level.SEVERE, null, ex);
+ AbstractJobVertex source1 = new AbstractJobVertex("source1");
+ AbstractJobVertex source2 = new AbstractJobVertex("source2");
+ AbstractJobVertex root = new AbstractJobVertex("root");
+ AbstractJobVertex l11 = new AbstractJobVertex("layer 1 - 1");
+ AbstractJobVertex l12 = new AbstractJobVertex("layer 1 - 2");
+ AbstractJobVertex l13 = new AbstractJobVertex("layer 1 - 3");
+ AbstractJobVertex l2 = new AbstractJobVertex("layer 2");
+
+ root.connectNewDataSetAsInput(l13, DistributionPattern.POINTWISE);
+ root.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
+ root.connectNewDataSetAsInput(l2, DistributionPattern.POINTWISE);
+
+ l2.connectNewDataSetAsInput(l11, DistributionPattern.POINTWISE);
+ l2.connectNewDataSetAsInput(l12, DistributionPattern.POINTWISE);
+
+ l11.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
+
+ l12.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
+ l12.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
+
+ l13.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
+
+ JobGraph graph = new JobGraph("TestGraph", source1, source2, root, l11, l13, l12, l2);
+ List<AbstractJobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
+
+ assertBefore(source1, root, sorted);
+ assertBefore(source2, root, sorted);
+ assertBefore(l11, root, sorted);
+ assertBefore(l12, root, sorted);
+ assertBefore(l13, root, sorted);
+ assertBefore(l2, root, sorted);
+
+ assertBefore(l11, l2, sorted);
+ assertBefore(l12, l2, sorted);
+ assertBefore(l2, root, sorted);
+
+ assertBefore(source1, l2, sorted);
+ assertBefore(source2, l2, sorted);
+
+ assertBefore(source2, l13, sorted);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
}
-
- assertEquals(v1, v2.getBackwardConnection(0).getConnectedVertex());
-
}
-
- /**
- * In this test we construct a job graph and set the dependency chain for instance sharing in a way that a cycle is
- * created. The test is considered successful if the cycle is detected.
- */
+
@Test
- public void detectCycleInInstanceSharingDependencyChain() {
-
- final JobGraph jg = new JobGraph();
-
- final JobTaskVertex v1 = new JobTaskVertex("v1", jg);
- final JobTaskVertex v2 = new JobTaskVertex("v2", jg);
- final JobTaskVertex v3 = new JobTaskVertex("v3", jg);
- final JobTaskVertex v4 = new JobTaskVertex("v4", jg);
-
+ public void testTopoSortCyclicGraphNoSources() {
try {
- v1.connectTo(v2);
- v2.connectTo(v3);
- v3.connectTo(v4);
- } catch (JobGraphDefinitionException ex) {
- Logger.getLogger(JobGraphTest.class.getName()).log(Level.SEVERE, null, ex);
+ AbstractJobVertex v1 = new AbstractJobVertex("1");
+ AbstractJobVertex v2 = new AbstractJobVertex("2");
+ AbstractJobVertex v3 = new AbstractJobVertex("3");
+ AbstractJobVertex v4 = new AbstractJobVertex("4");
+
+ v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+ v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE);
+ v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE);
+
+ JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, v4);
+ try {
+ jg.getVerticesSortedTopologicallyFromSources();
+ fail("Failed to raise error on topologically sorting cyclic graph.");
+ }
+ catch (InvalidProgramException e) {
+ // that what we wanted
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTopoSortCyclicGraphIntermediateCycle() {
+ try{
+ AbstractJobVertex source = new AbstractJobVertex("source");
+ AbstractJobVertex v1 = new AbstractJobVertex("1");
+ AbstractJobVertex v2 = new AbstractJobVertex("2");
+ AbstractJobVertex v3 = new AbstractJobVertex("3");
+ AbstractJobVertex v4 = new AbstractJobVertex("4");
+ AbstractJobVertex target = new AbstractJobVertex("target");
+
+ v1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
+ v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+ v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE);
+ v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE);
+ target.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE);
+
+ JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, v4, source, target);
+ try {
+ jg.getVerticesSortedTopologicallyFromSources();
+ fail("Failed to raise error on topologically sorting cyclic graph.");
+ }
+ catch (InvalidProgramException e) {
+ // that what we wanted
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static final void assertBefore(AbstractJobVertex v1, AbstractJobVertex v2, List<AbstractJobVertex> list) {
+ boolean seenFirst = false;
+ for (AbstractJobVertex v : list) {
+ if (v == v1) {
+ seenFirst = true;
+ }
+ else if (v == v2) {
+ if (!seenFirst) {
+ fail("The first vertex (" + v1 + ") is not before the second vertex (" + v2 + ")");
+ }
+ break;
+ }
}
-
- // Dependency chain is acyclic
- v1.setVertexToShareInstancesWith(v2);
- v3.setVertexToShareInstancesWith(v2);
- v4.setVertexToShareInstancesWith(v1);
-
- assertEquals(jg.isInstanceDependencyChainAcyclic(), true);
-
- // Create a cycle v4 -> v1 -> v2 -> v4
- v2.setVertexToShareInstancesWith(v4);
-
- assertEquals(jg.isInstanceDependencyChainAcyclic(), false);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
new file mode 100644
index 0000000..e1d862e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.common.io.InitializeOnMaster;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.java.io.DiscardingOuputFormat;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+@SuppressWarnings("serial")
+public class JobTaskVertexTest {
+
+ @Test
+ public void testConnectDirectly() {
+ AbstractJobVertex source = new AbstractJobVertex("source");
+ AbstractJobVertex target = new AbstractJobVertex("target");
+ target.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
+
+ assertTrue(source.isInputVertex());
+ assertFalse(source.isOutputVertex());
+ assertFalse(target.isInputVertex());
+ assertTrue(target.isOutputVertex());
+
+ assertEquals(1, source.getNumberOfProducedIntermediateDataSets());
+ assertEquals(1, target.getNumberOfInputs());
+
+ assertEquals(target.getInputs().get(0).getSource(), source.getProducedDataSets().get(0));
+
+ assertEquals(1, source.getProducedDataSets().get(0).getConsumers().size());
+ assertEquals(target, source.getProducedDataSets().get(0).getConsumers().get(0).getTarget());
+ }
+
+ @Test
+ public void testConnectMultipleTargets() {
+ AbstractJobVertex source = new AbstractJobVertex("source");
+ AbstractJobVertex target1= new AbstractJobVertex("target1");
+ AbstractJobVertex target2 = new AbstractJobVertex("target2");
+ target1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
+ target2.connectDataSetAsInput(source.getProducedDataSets().get(0), DistributionPattern.BIPARTITE);
+
+ assertTrue(source.isInputVertex());
+ assertFalse(source.isOutputVertex());
+ assertFalse(target1.isInputVertex());
+ assertTrue(target1.isOutputVertex());
+ assertFalse(target2.isInputVertex());
+ assertTrue(target2.isOutputVertex());
+
+ assertEquals(1, source.getNumberOfProducedIntermediateDataSets());
+ assertEquals(2, source.getProducedDataSets().get(0).getConsumers().size());
+
+ assertEquals(target1.getInputs().get(0).getSource(), source.getProducedDataSets().get(0));
+ assertEquals(target2.getInputs().get(0).getSource(), source.getProducedDataSets().get(0));
+ }
+
+ @Test
+ public void testOutputFormatVertex() {
+ try {
+ final TestingOutputFormat outputFormat = new TestingOutputFormat();
+ final OutputFormatVertex of = new OutputFormatVertex("Name");
+ new TaskConfig(of.getConfiguration()).setStubWrapper(new UserCodeObjectWrapper<OutputFormat<?>>(outputFormat));
+ final ClassLoader cl = getClass().getClassLoader();
+
+ try {
+ of.initializeOnMaster(cl);
+ fail("Did not throw expected exception.");
+ } catch (TestException e) {
+ // all good
+ }
+
+ OutputFormatVertex copy = SerializationUtils.clone(of);
+ try {
+ copy.initializeOnMaster(cl);
+ fail("Did not throw expected exception.");
+ } catch (TestException e) {
+ // all good
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testInputFormatVertex() {
+ try {
+ final TestInputFormat inputFormat = new TestInputFormat();
+ final InputFormatVertex vertex = new InputFormatVertex("Name");
+ new TaskConfig(vertex.getConfiguration()).setStubWrapper(new UserCodeObjectWrapper<InputFormat<?, ?>>(inputFormat));
+
+ final ClassLoader cl = getClass().getClassLoader();
+
+ vertex.initializeOnMaster(cl);
+ InputSplit[] splits = vertex.getInputSplitSource().createInputSplits(77);
+
+ assertNotNull(splits);
+ assertEquals(1, splits.length);
+ assertEquals(TestSplit.class, splits[0].getClass());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static final class TestingOutputFormat extends DiscardingOuputFormat<Object> implements InitializeOnMaster {
+ @Override
+ public void initializeGlobal(int parallelism) throws IOException {
+ throw new TestException();
+ }
+ }
+
+ private static final class TestException extends IOException {}
+
+ // --------------------------------------------------------------------------------------------
+
+ private static final class TestSplit extends GenericInputSplit {}
+
+ private static final class TestInputFormat extends GenericInputFormat<Object> {
+
+ @Override
+ public boolean reachedEnd() {
+ return false;
+ }
+
+ @Override
+ public Object nextRecord(Object reuse) {
+ return null;
+ }
+
+ @Override
+ public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
+ return new GenericInputSplit[] { new TestSplit() };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DoubleSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DoubleSourceTask.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DoubleSourceTask.java
deleted file mode 100644
index f021de7..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DoubleSourceTask.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.fs.LineReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-
-public class DoubleSourceTask extends AbstractInvokable {
-
- private RecordWriter<StringRecord> output1 = null;
-
- private RecordWriter<StringRecord> output2 = null;
-
- @Override
- public void invoke() throws Exception {
- this.output1.initializeSerializers();
- this.output2.initializeSerializers();
-
- final Iterator<FileInputSplit> splitIterator = getInputSplits();
-
- while (splitIterator.hasNext()) {
-
- final FileInputSplit split = splitIterator.next();
-
- final long start = split.getStart();
- final long length = split.getLength();
-
- final FileSystem fs = FileSystem.get(split.getPath().toUri());
-
- final FSDataInputStream fdis = fs.open(split.getPath());
-
- final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
-
- byte[] line = lineReader.readLine();
-
- while (line != null) {
-
- // Create a string object from the data read
- StringRecord str = new StringRecord();
- str.set(line);
-
- // Send out string
- output1.emit(str);
- output2.emit(str);
-
- line = lineReader.readLine();
- }
-
- // Close the stream;
- lineReader.close();
- }
-
- this.output1.flush();
- this.output2.flush();
- }
-
- @Override
- public void registerInputOutput() {
- this.output1 = new RecordWriter<StringRecord>(this);
- this.output2 = new RecordWriter<StringRecord>(this);
- }
-
- private Iterator<FileInputSplit> getInputSplits() {
-
- final InputSplitProvider provider = getEnvironment().getInputSplitProvider();
-
- return new Iterator<FileInputSplit>() {
-
- private FileInputSplit nextSplit;
-
- private boolean exhausted;
-
- @Override
- public boolean hasNext() {
- if (exhausted) {
- return false;
- }
-
- if (nextSplit != null) {
- return true;
- }
-
- FileInputSplit split = (FileInputSplit) provider.getNextInputSplit();
-
- if (split != null) {
- this.nextSplit = split;
- return true;
- }
- else {
- exhausted = true;
- return false;
- }
- }
-
- @Override
- public FileInputSplit next() {
- if (this.nextSplit == null && !hasNext()) {
- throw new NoSuchElementException();
- }
-
- final FileInputSplit tmp = this.nextSplit;
- this.nextSplit = null;
- return tmp;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DoubleTargetTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DoubleTargetTask.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DoubleTargetTask.java
deleted file mode 100644
index 4caf479..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DoubleTargetTask.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.io.network.api.RecordReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-public class DoubleTargetTask extends AbstractInvokable {
-
- private RecordReader<StringRecord> input1 = null;
-
- private RecordReader<StringRecord> input2 = null;
-
- private RecordWriter<StringRecord> output = null;
-
- @Override
- public void invoke() throws Exception {
-
- this.output.initializeSerializers();
-
- while (this.input1.hasNext()) {
-
- StringRecord s = input1.next();
- this.output.emit(s);
- }
-
- while (this.input2.hasNext()) {
-
- StringRecord s = input2.next();
- this.output.emit(s);
- }
-
- this.output.flush();
-
- }
-
- @Override
- public void registerInputOutput() {
- this.input1 = new RecordReader<StringRecord>(this, StringRecord.class);
- this.input2 = new RecordReader<StringRecord>(this, StringRecord.class);
- this.output = new RecordWriter<StringRecord>(this);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ForwardTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ForwardTask.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ForwardTask.java
deleted file mode 100644
index 2d0d8aa..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ForwardTask.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.io.network.api.RecordReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-public class ForwardTask extends AbstractInvokable {
-
- private RecordReader<StringRecord> input = null;
- private RecordWriter<StringRecord> output = null;
-
- @Override
- public void invoke() throws Exception {
-
- this.output.initializeSerializers();
-
- while (this.input.hasNext()) {
-
- StringRecord s = input.next();
- this.output.emit(s);
- }
-
- this.output.flush();
- }
-
- @Override
- public void registerInputOutput() {
- this.input = new RecordReader<StringRecord>(this, StringRecord.class);
- this.output = new RecordWriter<StringRecord>(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index 0b222f5..b8aac10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -16,48 +16,27 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.jobmanager;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
+import java.net.ServerSocket;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.ExecutionMode;
-import org.apache.flink.runtime.client.JobClient;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.client.AbstractJobResult;
+import org.apache.flink.runtime.client.JobSubmissionResult;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
-import org.apache.flink.runtime.jobgraph.JobTaskVertex;
-import org.apache.flink.runtime.operators.DataSinkTask;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testutils.ServerTestUtils;
-import org.apache.flink.runtime.testutils.tasks.DoubleSourceTask;
-import org.apache.flink.runtime.testutils.tasks.FileLineReader;
-import org.apache.flink.runtime.testutils.tasks.FileLineWriter;
-import org.apache.flink.runtime.testutils.tasks.JobFileInputVertex;
-import org.apache.flink.runtime.testutils.tasks.JobFileOutputVertex;
-import org.apache.flink.runtime.util.JarFileCreator;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.tasks.NoOpInvokable;
import org.junit.Test;
/**
@@ -65,990 +44,92 @@ import org.junit.Test;
*/
public class JobManagerITCase {
- /**
- * The name of the test directory some tests read their input from.
- */
- private static final String INPUT_DIRECTORY = "testDirectory";
-
- private static Configuration configuration;
-
- private static JobManager jobManager;
-
- /**
- * Starts the JobManager in local mode.
- */
- @BeforeClass
- public static void startNephele() {
- try {
- Configuration cfg = new Configuration();
- cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1");
- cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 6123);
- cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);
- cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-
- GlobalConfiguration.includeConfiguration(cfg);
-
- configuration = GlobalConfiguration.getConfiguration(new String[] { ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY });
-
- jobManager = new JobManager(ExecutionMode.LOCAL);
-
- // Wait for the local task manager to arrive
- ServerTestUtils.waitForJobManagerToBecomeReady(jobManager);
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Could not start job manager: " + e.getMessage());
- }
- }
-
- /**
- * Stops the JobManager
- */
- @AfterClass
- public static void stopNephele() {
- jobManager.shutdown();
- jobManager = null;
- }
-
- /**
- * Tests the correctness of the union record reader with non-empty inputs.
- */
- @Test
- public void testUnionWithNonEmptyInput() {
- testUnion(1000000);
- }
-
- /**
- * Tests of the Nephele channels with a large (> 1 MB) file.
- */
@Test
- public void testExecutionWithLargeInputFile() {
- test(1000000);
- }
-
- /**
- * Tests of the Nephele channels with a file of zero bytes size.
- */
- @Test
- public void testExecutionWithZeroSizeInputFile() {
- test(0);
- }
-
- /**
- * Tests the execution of a job with a directory as input. The test directory contains files of different length.
- */
- @Test
- public void testExecutionWithDirectoryInput() {
-
- // Define size of input
- final int sizeOfInput = 100;
-
- // Create test directory
- final String testDirectory = ServerTestUtils.getTempDir() + File.separator + INPUT_DIRECTORY;
- final File td = new File(testDirectory);
- if (!td.exists()) {
- td.mkdir();
- }
-
- File inputFile1 = null;
- File inputFile2 = null;
- File outputFile = null;
- File jarFile = null;
- JobClient jobClient = null;
-
+ public void testSingleVertexJob() {
try {
- // Get name of the forward class
- final String forwardClassName = ForwardTask.class.getSimpleName();
-
- // Create input and jar files
- inputFile1 = ServerTestUtils.createInputFile(INPUT_DIRECTORY, 0);
- inputFile2 = ServerTestUtils.createInputFile(INPUT_DIRECTORY, sizeOfInput);
- outputFile = new File(ServerTestUtils.getTempDir() + File.separator + ServerTestUtils.getRandomFilename());
- jarFile = ServerTestUtils.createJarFile(forwardClassName);
-
- // Create job graph
- final JobGraph jg = new JobGraph("Job Graph 1");
-
- // input vertex
- final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
- i1.setInvokableClass(FileLineReader.class);
- i1.setFilePath(new Path(new File(testDirectory).toURI()));
- i1.setNumberOfSubtasks(1);
-
- // task vertex 1
- final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
- t1.setInvokableClass(ForwardTask.class);
- t1.setNumberOfSubtasks(1);
-
- // task vertex 2
- final JobTaskVertex t2 = new JobTaskVertex("Task 2", jg);
- t2.setInvokableClass(ForwardTask.class);
- t2.setNumberOfSubtasks(1);
-
- // output vertex
- JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
- o1.setInvokableClass(FileLineWriter.class);
- o1.setFilePath(new Path(outputFile.toURI()));
- o1.setNumberOfSubtasks(1);
-
- t1.setVertexToShareInstancesWith(i1);
- t2.setVertexToShareInstancesWith(i1);
- o1.setVertexToShareInstancesWith(i1);
-
- // connect vertices
- try {
- i1.connectTo(t1, ChannelType.NETWORK);
- t1.connectTo(t2, ChannelType.IN_MEMORY);
- t2.connectTo(o1, ChannelType.IN_MEMORY);
- } catch (JobGraphDefinitionException e) {
- e.printStackTrace();
- }
-
- // add jar
- jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + forwardClassName + ".jar").toURI()));
-
- // Create job client and launch job
- jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
- jobClient.submitJobAndWait();
-
- // Finally, compare output file to initial number sequence
- final BufferedReader bufferedReader = new BufferedReader(new FileReader(outputFile));
- for (int i = 0; i < sizeOfInput; i++) {
- final String number = bufferedReader.readLine();
- try {
- assertEquals(i, Integer.parseInt(number));
- } catch (NumberFormatException e) {
- fail(e.getMessage());
- }
- }
-
- bufferedReader.close();
-
- } catch (NumberFormatException e) {
- e.printStackTrace();
- fail(e.getMessage());
- } catch (JobExecutionException e) {
- e.printStackTrace();
- fail(e.getMessage());
- } catch (IOException ioe) {
- ioe.printStackTrace();
- fail(ioe.getMessage());
- } finally {
- // Remove temporary files
- if (inputFile1 != null) {
- inputFile1.delete();
- }
- if (inputFile2 != null) {
- inputFile2.delete();
- }
- if (outputFile != null) {
- outputFile.delete();
- }
- if (jarFile != null) {
- jarFile.delete();
- }
-
- // Remove test directory
- if (td != null) {
- td.delete();
- }
-
- if (jobClient != null) {
- jobClient.close();
- }
- }
- }
-
- /**
- * Tests the Nephele execution when an exception occurs. In particular, it is tested if the information that is
- * wrapped by the exception is correctly passed on to the client.
- */
- @Test
- public void testExecutionWithException() {
-
- final String exceptionClassName = ExceptionTask.class.getSimpleName();
- File inputFile = null;
- File outputFile = null;
- File jarFile = null;
- JobClient jobClient = null;
-
- try {
-
- inputFile = ServerTestUtils.createInputFile(0);
- outputFile = new File(ServerTestUtils.getTempDir() + File.separator + ServerTestUtils.getRandomFilename());
- jarFile = ServerTestUtils.createJarFile(exceptionClassName);
-
- // Create job graph
- final JobGraph jg = new JobGraph("Job Graph for Exception Test");
-
- // input vertex
- final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
- i1.setInvokableClass(FileLineReader.class);
- i1.setFilePath(new Path(inputFile.toURI()));
-
- // task vertex 1
- final JobTaskVertex t1 = new JobTaskVertex("Task with Exception", jg);
- t1.setInvokableClass(ExceptionTask.class);
-
- // output vertex
- JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
- o1.setInvokableClass(FileLineWriter.class);
- o1.setFilePath(new Path(outputFile.toURI()));
-
- t1.setVertexToShareInstancesWith(i1);
- o1.setVertexToShareInstancesWith(i1);
-
- // connect vertices
- i1.connectTo(t1, ChannelType.IN_MEMORY);
- t1.connectTo(o1, ChannelType.IN_MEMORY);
-
- // add jar
- jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + exceptionClassName + ".jar")
- .toURI()));
-
- // Create job client and launch job
- jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
+ final AbstractJobVertex vertex = new AbstractJobVertex("Test Vertex");
+ vertex.setParallelism(3);
+ vertex.setInvokableClass(NoOpInvokable.class);
- try {
- jobClient.submitJobAndWait();
- } catch (JobExecutionException e) {
- // Check if the correct error message is encapsulated in the exception
- if (e.getMessage() == null) {
- fail("JobExecutionException does not contain an error message");
- }
- if (!e.getMessage().contains(ExceptionTask.ERROR_MESSAGE)) {
- fail("JobExecutionException does not contain the expected error message");
- }
-
- return;
- }
-
- fail("Expected exception but did not receive it");
-
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- } finally {
-
- // Remove temporary files
- if (inputFile != null) {
- inputFile.delete();
- }
- if (outputFile != null) {
- outputFile.delete();
- }
- if (jarFile != null) {
- jarFile.delete();
- }
-
- if (jobClient != null) {
- jobClient.close();
- }
- }
- }
-
- /**
- * Tests the Nephele execution when a runtime exception during the registration of the input/output gates occurs.
- */
- @Test
- public void testExecutionWithRuntimeException() {
-
- final String runtimeExceptionClassName = RuntimeExceptionTask.class.getSimpleName();
- File inputFile = null;
- File outputFile = null;
- File jarFile = null;
- JobClient jobClient = null;
-
- try {
-
- inputFile = ServerTestUtils.createInputFile(100);
- outputFile = new File(ServerTestUtils.getTempDir() + File.separator + ServerTestUtils.getRandomFilename());
- jarFile = ServerTestUtils.createJarFile(runtimeExceptionClassName);
-
- // Create job graph
- final JobGraph jg = new JobGraph("Job Graph for Exception Test");
-
- // input vertex
- final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
- i1.setInvokableClass(FileLineReader.class);
- i1.setFilePath(new Path(inputFile.toURI()));
-
- // task vertex 1
- final JobTaskVertex t1 = new JobTaskVertex("Task with Exception", jg);
- t1.setInvokableClass(RuntimeExceptionTask.class);
-
- // output vertex
- JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
- o1.setInvokableClass(FileLineWriter.class);
- o1.setFilePath(new Path(outputFile.toURI()));
-
- t1.setVertexToShareInstancesWith(i1);
- o1.setVertexToShareInstancesWith(i1);
-
- // connect vertices
- i1.connectTo(t1, ChannelType.IN_MEMORY);
- t1.connectTo(o1, ChannelType.IN_MEMORY);
-
- // add jar
- jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + runtimeExceptionClassName
- + ".jar").toURI()));
-
- // Create job client and launch job
- jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
+ final JobGraph jobGraph = new JobGraph("Test Job", vertex);
+ JobManager jm = startJobManager();
try {
- jobClient.submitJobAndWait();
- } catch (JobExecutionException e) {
-
- // Check if the correct error message is encapsulated in the exception
- if (e.getMessage() == null) {
- fail("JobExecutionException does not contain an error message");
- }
- if (!e.getMessage().contains(RuntimeExceptionTask.RUNTIME_EXCEPTION_MESSAGE)) {
- fail("JobExecutionException does not contain the expected error message");
- }
-
- // Check if the correct error message is encapsulated in the exception
- return;
- }
-
- fail("Expected exception but did not receive it");
-
- } catch (JobGraphDefinitionException jgde) {
- fail(jgde.getMessage());
- } catch (IOException ioe) {
- fail(ioe.getMessage());
- } finally {
-
- // Remove temporary files
- if (inputFile != null) {
- inputFile.delete();
- }
- if (outputFile != null) {
- outputFile.delete();
- }
- if (jarFile != null) {
- jarFile.delete();
- }
-
- if (jobClient != null) {
- jobClient.close();
- }
- }
- }
-
- /**
- * Tests the Nephele execution when a runtime exception in the output format occurs.
- */
- @Test
- public void testExecutionWithRuntimeExceptionInOutputFormat() {
-
- final String runtimeExceptionClassName = RuntimeExceptionTask.class.getSimpleName();
- File inputFile = null;
- File outputFile = null;
- File jarFile = null;
- JobClient jobClient = null;
-
- try {
-
- inputFile = ServerTestUtils.createInputFile(100);
- outputFile = new File(ServerTestUtils.getTempDir() + File.separator + ServerTestUtils.getRandomFilename());
- jarFile = ServerTestUtils.createJarFile(runtimeExceptionClassName);
-
- // Create job graph
- final JobGraph jg = new JobGraph("Job Graph for Exception Test");
-
- // input vertex
- final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
- i1.setInvokableClass(FileLineReader.class);
- i1.setFilePath(new Path(inputFile.toURI()));
- i1.setNumberOfSubtasks(1);
-
- // task vertex 1
- final JobTaskVertex t1 = new JobTaskVertex("Task with Exception", jg);
- t1.setInvokableClass(ForwardTask.class);
-
- // output vertex
- JobOutputVertex o1 = new JobOutputVertex("Output 1", jg);
- o1.setNumberOfSubtasks(1);
- o1.setInvokableClass(DataSinkTask.class);
- ExceptionOutputFormat outputFormat = new ExceptionOutputFormat();
- o1.setOutputFormat(outputFormat);
- TaskConfig outputConfig = new TaskConfig(o1.getConfiguration());
- outputConfig.setStubWrapper(new UserCodeObjectWrapper<OutputFormat<?>>(outputFormat));
-// outputConfig.addInputToGroup(0);
-//
-// ValueSerializer<StringRecord> serializer = new ValueSerializer<StringRecord>(StringRecord.class);
-// RuntimeStatefulSerializerFactory<StringRecord> serializerFactory = new RuntimeStatefulSerializerFactory<StringRecord>(serializer, StringRecord.class);
-// outputConfig.setInputSerializer(serializerFactory, 0);
-
- t1.setVertexToShareInstancesWith(i1);
- o1.setVertexToShareInstancesWith(i1);
-
- // connect vertices
- i1.connectTo(t1, ChannelType.IN_MEMORY);
- t1.connectTo(o1, ChannelType.IN_MEMORY);
-
- // add jar
- jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + runtimeExceptionClassName
- + ".jar").toURI()));
-
- // Create job client and launch job
- jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
-
- try {
- jobClient.submitJobAndWait();
- } catch (JobExecutionException e) {
-
- // Check if the correct error message is encapsulated in the exception
- if (e.getMessage() == null) {
- fail("JobExecutionException does not contain an error message");
- }
- if (!e.getMessage().contains(RuntimeExceptionTask.RUNTIME_EXCEPTION_MESSAGE)) {
- fail("JobExecutionException does not contain the expected error message, " +
- "but instead: " + e.getMessage());
+
+ // we need to register the job at the library cache manager (with no libraries)
+ LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+
+ long deadline = System.currentTimeMillis() + 60*1000;
+ boolean success = false;
+
+ while (System.currentTimeMillis() < deadline) {
+ JobStatus state = eg.getState();
+ if (state == JobStatus.FINISHED) {
+ success = true;
+ break;
+ }
+
+ else if (state == JobStatus.FAILED || state == JobStatus.CANCELED) {
+ break;
+ }
+ else {
+ Thread.sleep(200);
+ }
+ }
+
+ assertTrue("The job did not finish successfully.", success);
}
-
- // Check if the correct error message is encapsulated in the exception
- return;
- }
-
- fail("Expected exception but did not receive it");
-
- } catch (JobGraphDefinitionException jgde) {
- fail(jgde.getMessage());
- } catch (IOException ioe) {
- fail(ioe.getMessage());
- } finally {
-
- // Remove temporary files
- if (inputFile != null) {
- inputFile.delete();
- }
- if (outputFile != null) {
- outputFile.delete();
- }
- if (jarFile != null) {
- jarFile.delete();
- }
-
- if (jobClient != null) {
- jobClient.close();
- }
- }
- }
-
- /**
- * Creates a file with a sequence of 0 to <code>limit</code> integer numbers
- * and triggers a sample job. The sample reads all the numbers from the input file and pushes them through a
- * network, a file, and an in-memory channel. Eventually, the numbers are written back to an output file. The test
- * is considered successful if the input file equals the output file.
- *
- * @param limit
- * the upper bound for the sequence of numbers to be generated
- */
- private void test(final int limit) {
-
- JobClient jobClient = null;
-
- try {
-
- // Get name of the forward class
- final String forwardClassName = ForwardTask.class.getSimpleName();
-
- // Create input and jar files
- final File inputFile = ServerTestUtils.createInputFile(limit);
- final File outputFile = new File(ServerTestUtils.getTempDir() + File.separator
- + ServerTestUtils.getRandomFilename());
- final File jarFile = ServerTestUtils.createJarFile(forwardClassName);
-
- // Create job graph
- final JobGraph jg = new JobGraph("Job Graph 1");
-
- // input vertex
- final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
- i1.setInvokableClass(FileLineReader.class);
- i1.setFilePath(new Path(inputFile.toURI()));
- i1.setNumberOfSubtasks(1);
-
- // task vertex 1
- final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
- t1.setInvokableClass(ForwardTask.class);
- t1.setNumberOfSubtasks(1);
-
- // task vertex 2
- final JobTaskVertex t2 = new JobTaskVertex("Task 2", jg);
- t2.setInvokableClass(ForwardTask.class);
- t2.setNumberOfSubtasks(1);
-
- // output vertex
- JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
- o1.setInvokableClass(FileLineWriter.class);
- o1.setFilePath(new Path(outputFile.toURI()));
- o1.setNumberOfSubtasks(1);
-
- t1.setVertexToShareInstancesWith(i1);
- t2.setVertexToShareInstancesWith(i1);
- o1.setVertexToShareInstancesWith(i1);
-
- // connect vertices
- try {
- i1.connectTo(t1, ChannelType.NETWORK);
- t1.connectTo(t2, ChannelType.IN_MEMORY);
- t2.connectTo(o1, ChannelType.IN_MEMORY);
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- // add jar
- jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + forwardClassName + ".jar")
- .toURI()));
-
- // Create job client and launch job
- jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
-
- try {
- jobClient.submitJobAndWait();
- } catch (JobExecutionException e) {
- fail(e.getMessage());
- }
-
- // Finally, compare output file to initial number sequence
- final BufferedReader bufferedReader = new BufferedReader(new FileReader(outputFile));
- for (int i = 0; i < limit; i++) {
- final String number = bufferedReader.readLine();
- try {
- assertEquals(i, Integer.parseInt(number));
- } catch (NumberFormatException e) {
- fail(e.getMessage());
+ else {
+ // already done, that was fast;
}
}
-
- bufferedReader.close();
-
- // Remove temporary files
- inputFile.delete();
- outputFile.delete();
- jarFile.delete();
-
- } catch (IOException ioe) {
- ioe.printStackTrace();
- fail(ioe.getMessage());
- } finally {
- if (jobClient != null) {
- jobClient.close();
+ finally {
+ jm.shutdown();
}
}
- }
-
- /**
- * Tests the Nephele execution with a job that has two vertices, that are connected twice with each other with
- * different channel types.
- */
- @Test
- public void testExecutionDoubleConnection() {
-
- File inputFile = null;
- File outputFile = null;
- File jarFile = new File(ServerTestUtils.getTempDir() + File.separator + "doubleConnection.jar");
- JobClient jobClient = null;
-
- try {
-
- inputFile = ServerTestUtils.createInputFile(0);
- outputFile = new File(ServerTestUtils.getTempDir() + File.separator + ServerTestUtils.getRandomFilename());
-
- // Create required jar file
- JarFileCreator jfc = new JarFileCreator(jarFile);
- jfc.addClass(DoubleSourceTask.class);
- jfc.addClass(DoubleTargetTask.class);
- jfc.createJarFile();
-
- // Create job graph
- final JobGraph jg = new JobGraph("Job Graph for Double Connection Test");
-
- // input vertex
- final JobFileInputVertex i1 = new JobFileInputVertex("Input with two Outputs", jg);
- i1.setInvokableClass(DoubleSourceTask.class);
- i1.setFilePath(new Path(inputFile.toURI()));
-
- // task vertex 1
- final JobTaskVertex t1 = new JobTaskVertex("Task with two Inputs", jg);
- t1.setInvokableClass(DoubleTargetTask.class);
-
- // output vertex
- JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
- o1.setInvokableClass(FileLineWriter.class);
- o1.setFilePath(new Path(outputFile.toURI()));
-
- t1.setVertexToShareInstancesWith(i1);
- o1.setVertexToShareInstancesWith(i1);
-
- // connect vertices
- i1.connectTo(t1, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
- i1.connectTo(t1, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
- t1.connectTo(o1, ChannelType.IN_MEMORY);
-
- // add jar
- jg.addJar(new Path(jarFile.toURI()));
-
- // Create job client and launch job
- jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
- jobClient.submitJobAndWait();
-
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- } finally {
-
- // Remove temporary files
- if (inputFile != null) {
- inputFile.delete();
- }
- if (outputFile != null) {
- outputFile.delete();
- }
- if (jarFile != null) {
- jarFile.delete();
- }
-
- if (jobClient != null) {
- jobClient.close();
- }
- }
- }
-
- /**
- * Tests the Nephele job execution when the graph and the tasks are given no specific name.
- */
- @Test
- public void testEmptyTaskNames() {
-
- File inputFile = null;
- File outputFile = null;
- File jarFile = new File(ServerTestUtils.getTempDir() + File.separator + "emptyNames.jar");
- JobClient jobClient = null;
-
- try {
-
- inputFile = ServerTestUtils.createInputFile(0);
- outputFile = new File(ServerTestUtils.getTempDir() + File.separator + ServerTestUtils.getRandomFilename());
-
- // Create required jar file
- JarFileCreator jfc = new JarFileCreator(jarFile);
- jfc.addClass(DoubleSourceTask.class);
- jfc.addClass(DoubleTargetTask.class);
- jfc.createJarFile();
-
- // Create job graph
- final JobGraph jg = new JobGraph();
-
- // input vertex
- final JobFileInputVertex i1 = new JobFileInputVertex(jg);
- i1.setInvokableClass(FileLineReader.class);
- i1.setFilePath(new Path(inputFile.toURI()));
-
- // output vertex
- JobFileOutputVertex o1 = new JobFileOutputVertex(jg);
- o1.setInvokableClass(FileLineWriter.class);
- o1.setFilePath(new Path(outputFile.toURI()));
-
- o1.setVertexToShareInstancesWith(i1);
-
- // connect vertices
- i1.connectTo(o1, ChannelType.IN_MEMORY);
-
- // add jar
- jg.addJar(new Path(jarFile.toURI()));
-
- // Create job client and launch job
- jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
- jobClient.submitJobAndWait();
- } catch (Exception e) {
+ catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
- } finally {
-
- // Remove temporary files
- if (inputFile != null) {
- inputFile.delete();
- }
- if (outputFile != null) {
- outputFile.delete();
- }
- if (jarFile != null) {
- jarFile.delete();
- }
-
- if (jobClient != null) {
- jobClient.close();
- }
}
}
-
- /**
- * Tests the correctness of the union record reader with empty inputs.
- */
- @Test
- public void testUnionWithEmptyInput() {
- testUnion(0);
- }
-
- /**
- * Tests the correctness of the union reader for different input sizes.
- *
- * @param limit
- * the upper bound for the sequence of numbers to be generated
- */
- private void testUnion(final int limit) {
-
- File inputFile1 = null;
- File inputFile2 = null;
- File outputFile = null;
- File jarFile = new File(ServerTestUtils.getTempDir() + File.separator + "unionWithEmptyInput.jar");
- JobClient jobClient = null;
-
- try {
-
- inputFile1 = ServerTestUtils.createInputFile(limit);
- inputFile2 = ServerTestUtils.createInputFile(limit);
- outputFile = new File(ServerTestUtils.getTempDir() + File.separator + ServerTestUtils.getRandomFilename());
-
- // Create required jar file
- JarFileCreator jfc = new JarFileCreator(jarFile);
- jfc.addClass(UnionTask.class);
- jfc.createJarFile();
-
- // Create job graph
- final JobGraph jg = new JobGraph("Union job " + limit);
-
- // input vertex 1
- final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
- i1.setInvokableClass(FileLineReader.class);
- i1.setFilePath(new Path(inputFile1.toURI()));
-
- // input vertex 2
- final JobFileInputVertex i2 = new JobFileInputVertex("Input 2", jg);
- i2.setInvokableClass(FileLineReader.class);
- i2.setFilePath(new Path(inputFile2.toURI()));
-
- // union task
- final JobTaskVertex u1 = new JobTaskVertex("Union", jg);
- u1.setInvokableClass(UnionTask.class);
-
- // output vertex
- JobFileOutputVertex o1 = new JobFileOutputVertex("Output", jg);
- o1.setInvokableClass(FileLineWriter.class);
- o1.setFilePath(new Path(outputFile.toURI()));
- o1.setNumberOfSubtasks(1);
-
- i1.setVertexToShareInstancesWith(o1);
- i2.setVertexToShareInstancesWith(o1);
- u1.setVertexToShareInstancesWith(o1);
-
- // connect vertices
- i1.connectTo(u1, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
- i2.connectTo(u1, ChannelType.IN_MEMORY);
- u1.connectTo(o1, ChannelType.IN_MEMORY);
-
- // add jar
- jg.addJar(new Path(jarFile.toURI()));
-
- // Create job client and launch job
- jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
-
- try {
- jobClient.submitJobAndWait();
- } catch (JobExecutionException e) {
- fail(e.getMessage());
- }
-
- // Finally, check the output
- final Map<Integer, Integer> expectedNumbers = new HashMap<Integer, Integer>();
- final Integer two = Integer.valueOf(2);
- for (int i = 0; i < limit; ++i) {
- expectedNumbers.put(Integer.valueOf(i), two);
- }
-
- final BufferedReader bufferedReader = new BufferedReader(new FileReader(outputFile));
- String line = bufferedReader.readLine();
- while (line != null) {
-
- final Integer number = Integer.valueOf(Integer.parseInt(line));
- Integer val = expectedNumbers.get(number);
- if (val == null) {
- fail("Found unexpected number in union output: " + number);
- } else {
- val = Integer.valueOf(val.intValue() - 1);
- if (val.intValue() < 0) {
- fail(val + " occurred more than twice in union output");
- }
- if (val.intValue() == 0) {
- expectedNumbers.remove(number);
- } else {
- expectedNumbers.put(number, val);
- }
- }
-
- line = bufferedReader.readLine();
- }
-
- bufferedReader.close();
-
- if (!expectedNumbers.isEmpty()) {
- final StringBuilder str = new StringBuilder();
- str.append("The following numbers have not been found in the union output:\n");
- final Iterator<Map.Entry<Integer, Integer>> it = expectedNumbers.entrySet().iterator();
- while (it.hasNext()) {
- final Map.Entry<Integer, Integer> entry = it.next();
- str.append(entry.getKey().toString());
- str.append(" (");
- str.append(entry.getValue().toString());
- str.append("x)\n");
- }
-
- fail(str.toString());
- }
-
- } catch (JobGraphDefinitionException jgde) {
- fail(jgde.getMessage());
- } catch (IOException ioe) {
- fail(ioe.getMessage());
- } finally {
-
- // Remove temporary files
- if (inputFile1 != null) {
- inputFile1.delete();
- }
- if (inputFile2 != null) {
- inputFile2.delete();
- }
- if (outputFile != null) {
- outputFile.delete();
- }
- if (jarFile != null) {
- jarFile.delete();
- }
-
- if (jobClient != null) {
- jobClient.close();
- }
- }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static final JobManager startJobManager() throws Exception {
+ Configuration cfg = new Configuration();
+ cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+ cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getAvailablePort());
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
+
+ GlobalConfiguration.includeConfiguration(cfg);
+
+ JobManager jm = new JobManager(ExecutionMode.LOCAL);
+ return jm;
}
-
- /**
- * Tests the execution of a job with a large degree of parallelism. In particular, the tests checks that the overall
- * runtime of the test does not exceed a certain time limit.
- */
- @Test
- public void testExecutionWithLargeDoP() {
-
- // The degree of parallelism to be used by tasks in this job.
- final int numberOfSubtasks = 64;
-
- File inputFile1 = null;
- File inputFile2 = null;
- File outputFile = null;
- File jarFile = new File(ServerTestUtils.getTempDir() + File.separator + "largeDoP.jar");
- JobClient jobClient = null;
-
- try {
-
- inputFile1 = ServerTestUtils.createInputFile(0);
- inputFile2 = ServerTestUtils.createInputFile(0);
- outputFile = new File(ServerTestUtils.getTempDir() + File.separator + ServerTestUtils.getRandomFilename());
-
- // Create required jar file
- JarFileCreator jfc = new JarFileCreator(jarFile);
- jfc.addClass(UnionTask.class);
- jfc.createJarFile();
-
- // Create job graph
- final JobGraph jg = new JobGraph("Job with large DoP (" + numberOfSubtasks + ")");
-
- // input vertex 1
- final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
- i1.setInvokableClass(FileLineReader.class);
- i1.setFilePath(new Path(inputFile1.toURI()));
- i1.setNumberOfSubtasks(numberOfSubtasks);
-
- // input vertex 2
- final JobFileInputVertex i2 = new JobFileInputVertex("Input 2", jg);
- i2.setInvokableClass(FileLineReader.class);
- i2.setFilePath(new Path(inputFile2.toURI()));
- i2.setNumberOfSubtasks(numberOfSubtasks);
-
- // union task
- final JobTaskVertex f1 = new JobTaskVertex("Forward 1", jg);
- f1.setInvokableClass(DoubleTargetTask.class);
- f1.setNumberOfSubtasks(numberOfSubtasks);
-
- // output vertex
- JobFileOutputVertex o1 = new JobFileOutputVertex("Output", jg);
- o1.setInvokableClass(FileLineWriter.class);
- o1.setFilePath(new Path(outputFile.toURI()));
- o1.setNumberOfSubtasks(numberOfSubtasks);
-
- i1.setVertexToShareInstancesWith(o1);
- i2.setVertexToShareInstancesWith(o1);
- f1.setVertexToShareInstancesWith(o1);
-
- // connect vertices
- i1.connectTo(f1, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
- i2.connectTo(f1, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
- f1.connectTo(o1, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-
- // add jar
- jg.addJar(new Path(jarFile.toURI()));
-
- // Create job client and launch job
- jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
-
+
+ private static int getAvailablePort() throws IOException {
+ for (int i = 0; i < 50; i++) {
+ ServerSocket serverSocket = null;
try {
-
- jobClient.submitJobAndWait();
- } catch (JobExecutionException e) {
- // Job execution should lead to an error due to lack of resources
- return;
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- fail("Undetected lack of resources");
-
- } catch (JobGraphDefinitionException jgde) {
- fail(jgde.getMessage());
- } catch (IOException ioe) {
- fail(ioe.getMessage());
- } finally {
-
- // Remove temporary files
- if (inputFile1 != null) {
- inputFile1.delete();
- }
- if (inputFile2 != null) {
- inputFile2.delete();
- }
- if (outputFile != null) {
- if (outputFile.isDirectory()) {
- final String[] files = outputFile.list();
- final String outputDir = outputFile.getAbsolutePath();
- for (final String file : files) {
- new File(outputDir + File.separator + file).delete();
- }
+ serverSocket = new ServerSocket(0);
+ int port = serverSocket.getLocalPort();
+ if (port != 0) {
+ return port;
}
- outputFile.delete();
- }
- if (jarFile != null) {
- jarFile.delete();
- }
-
- if (jobClient != null) {
- jobClient.close();
+ } finally {
+ serverSocket.close();
}
}
+
+ throw new IOException("could not find free port");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
new file mode 100644
index 0000000..835af95
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+public class JobManagerTest {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RuntimeExceptionTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RuntimeExceptionTask.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RuntimeExceptionTask.java
deleted file mode 100644
index 6004bc1..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RuntimeExceptionTask.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-/**
- * This task throws a {@link RuntimeException} when the method <code>registerInputOutput</code> is called.
- */
-public class RuntimeExceptionTask extends AbstractInvokable {
-
- /**
- * The message which is used for the test runtime exception.
- */
- public static final String RUNTIME_EXCEPTION_MESSAGE = "This is a test runtime exception";
-
-
- @Override
- public void registerInputOutput() {
- throw new RuntimeException(RUNTIME_EXCEPTION_MESSAGE);
- }
-
- @Override
- public void invoke() {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/UnionTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/UnionTask.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/UnionTask.java
deleted file mode 100644
index 9d01907..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/UnionTask.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.io.network.api.MutableRecordReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.io.network.api.UnionRecordReader;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-/**
- * A simple implementation of a task using a {@link UnionRecordReader}.
- */
-public class UnionTask extends AbstractInvokable {
-
- /**
- * The union record reader to be used during the tests.
- */
- private UnionRecordReader<StringRecord> unionReader;
-
- private RecordWriter<StringRecord> writer;
-
-
- @Override
- public void registerInputOutput() {
-
- @SuppressWarnings("unchecked")
- MutableRecordReader<StringRecord>[] recordReaders = (MutableRecordReader<StringRecord>[]) new MutableRecordReader<?>[2];
- recordReaders[0] = new MutableRecordReader<StringRecord>(this);
- recordReaders[1] = new MutableRecordReader<StringRecord>(this);
- this.unionReader = new UnionRecordReader<StringRecord>(recordReaders, StringRecord.class);
-
- this.writer = new RecordWriter<StringRecord>(this);
- }
-
- @Override
- public void invoke() throws Exception {
- this.writer.initializeSerializers();
-
- while (this.unionReader.hasNext()) {
- this.writer.emit(this.unionReader.next());
- }
-
- this.writer.flush();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java
deleted file mode 100644
index 2b2c81d..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.splitassigner;
-
-import static org.junit.Assert.*;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.InputSplit;
-import org.junit.Test;
-
-
-public class DefaultSplitAssignerTest {
-
- @Test
- public void testSerialSplitAssignment() {
- try {
- final int NUM_SPLITS = 50;
-
- Set<InputSplit> splits = new HashSet<InputSplit>();
- for (int i = 0; i < NUM_SPLITS; i++) {
- splits.add(new GenericInputSplit(i, NUM_SPLITS));
- }
-
- DefaultInputSplitAssigner ia = new DefaultInputSplitAssigner(splits);
- InputSplit is = null;
- while ((is = ia.getNextInputSplit("")) != null) {
- assertTrue(splits.remove(is));
- }
-
- assertTrue(splits.isEmpty());
- assertNull(ia.getNextInputSplit(""));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testConcurrentSplitAssignment() {
- try {
- final int NUM_THREADS = 10;
- final int NUM_SPLITS = 500;
- final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
-
- Set<InputSplit> splits = new HashSet<InputSplit>();
- for (int i = 0; i < NUM_SPLITS; i++) {
- splits.add(new GenericInputSplit(i, NUM_SPLITS));
- }
-
- final DefaultInputSplitAssigner ia = new DefaultInputSplitAssigner(splits);
-
- final AtomicInteger splitsRetrieved = new AtomicInteger(0);
- final AtomicInteger sumOfIds = new AtomicInteger(0);
-
- Runnable retriever = new Runnable() {
-
- @Override
- public void run() {
- String host = "";
- GenericInputSplit split;
- while ((split = (GenericInputSplit) ia.getNextInputSplit(host)) != null) {
- splitsRetrieved.incrementAndGet();
- sumOfIds.addAndGet(split.getSplitNumber());
- }
- }
- };
-
- // create the threads
- Thread[] threads = new Thread[NUM_THREADS];
- for (int i = 0; i < NUM_THREADS; i++) {
- threads[i] = new Thread(retriever);
- threads[i].setDaemon(true);
- }
-
- // launch concurrently
- for (int i = 0; i < NUM_THREADS; i++) {
- threads[i].start();
- }
-
- // sync
- for (int i = 0; i < NUM_THREADS; i++) {
- threads[i].join(5000);
- }
-
- // verify
- for (int i = 0; i < NUM_THREADS; i++) {
- if (threads[i].isAlive()) {
- fail("The concurrency test case is erroneous, the thread did not respond in time.");
- }
- }
-
- assertEquals(NUM_SPLITS, splitsRetrieved.get());
- assertEquals(SUM_OF_IDS, sumOfIds.get());
-
- // nothing left
- assertNull(ia.getNextInputSplit(""));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java
deleted file mode 100644
index fdc1cf4..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.splitassigner;
-
-import static org.junit.Assert.*;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.LocatableInputSplit;
-
-import org.junit.Test;
-
-
-public class LocatableSplitAssignerTest {
-
- @Test
- public void testSerialSplitAssignmentWithNullHost() {
- try {
- final int NUM_SPLITS = 50;
- final String[][] hosts = new String[][] {
- new String[] { "localhost" },
- new String[0],
- null
- };
-
- // load some splits
- Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
- for (int i = 0; i < NUM_SPLITS; i++) {
- splits.add(new LocatableInputSplit(i, hosts[i%3]));
- }
-
- // get all available splits
- LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
- InputSplit is = null;
- while ((is = ia.getNextInputSplit(null)) != null) {
- assertTrue(splits.remove(is));
- }
-
- // check we had all
- assertTrue(splits.isEmpty());
- assertNull(ia.getNextInputSplit(""));
- assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
- assertEquals(0, ia.getNumberOfLocalAssignments());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSerialSplitAssignmentAllForSameHost() {
- try {
- final int NUM_SPLITS = 50;
-
- // load some splits
- Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
- for (int i = 0; i < NUM_SPLITS; i++) {
- splits.add(new LocatableInputSplit(i, "testhost"));
- }
-
- // get all available splits
- LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
- InputSplit is = null;
- while ((is = ia.getNextInputSplit("testhost")) != null) {
- assertTrue(splits.remove(is));
- }
-
- // check we had all
- assertTrue(splits.isEmpty());
- assertNull(ia.getNextInputSplit(""));
-
- assertEquals(0, ia.getNumberOfRemoteAssignments());
- assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSerialSplitAssignmentAllForRemoteHost() {
- try {
- final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" };
- final int NUM_SPLITS = 10 * hosts.length;
-
- // load some splits
- Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
- for (int i = 0; i < NUM_SPLITS; i++) {
- splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
- }
-
- // get all available splits
- LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
- InputSplit is = null;
- while ((is = ia.getNextInputSplit("testhost")) != null) {
- assertTrue(splits.remove(is));
- }
-
- // check we had all
- assertTrue(splits.isEmpty());
- assertNull(ia.getNextInputSplit("anotherHost"));
-
- assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
- assertEquals(0, ia.getNumberOfLocalAssignments());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSerialSplitAssignmentMixedLocalHost() {
- try {
- final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" };
- final int NUM_SPLITS = 10 * hosts.length;
-
- // load some splits
- Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
- for (int i = 0; i < NUM_SPLITS; i++) {
- splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
- }
-
- // get all available splits
- LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
- InputSplit is = null;
- int i = 0;
- while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length])) != null) {
- assertTrue(splits.remove(is));
- }
-
- // check we had all
- assertTrue(splits.isEmpty());
- assertNull(ia.getNextInputSplit("anotherHost"));
-
- assertEquals(0, ia.getNumberOfRemoteAssignments());
- assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testConcurrentSplitAssignmentNullHost() {
- try {
- final int NUM_THREADS = 10;
- final int NUM_SPLITS = 500;
- final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
-
- final String[][] hosts = new String[][] {
- new String[] { "localhost" },
- new String[0],
- null
- };
-
- // load some splits
- Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
- for (int i = 0; i < NUM_SPLITS; i++) {
- splits.add(new LocatableInputSplit(i, hosts[i%3]));
- }
-
- final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
-
- final AtomicInteger splitsRetrieved = new AtomicInteger(0);
- final AtomicInteger sumOfIds = new AtomicInteger(0);
-
- Runnable retriever = new Runnable() {
-
- @Override
- public void run() {
- LocatableInputSplit split;
- while ((split = ia.getNextInputSplit(null)) != null) {
- splitsRetrieved.incrementAndGet();
- sumOfIds.addAndGet(split.getSplitNumber());
- }
- }
- };
-
- // create the threads
- Thread[] threads = new Thread[NUM_THREADS];
- for (int i = 0; i < NUM_THREADS; i++) {
- threads[i] = new Thread(retriever);
- threads[i].setDaemon(true);
- }
-
- // launch concurrently
- for (int i = 0; i < NUM_THREADS; i++) {
- threads[i].start();
- }
-
- // sync
- for (int i = 0; i < NUM_THREADS; i++) {
- threads[i].join(5000);
- }
-
- // verify
- for (int i = 0; i < NUM_THREADS; i++) {
- if (threads[i].isAlive()) {
- fail("The concurrency test case is erroneous, the thread did not respond in time.");
- }
- }
-
- assertEquals(NUM_SPLITS, splitsRetrieved.get());
- assertEquals(SUM_OF_IDS, sumOfIds.get());
-
- // nothing left
- assertNull(ia.getNextInputSplit(""));
-
- assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
- assertEquals(0, ia.getNumberOfLocalAssignments());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testConcurrentSplitAssignmentForSingleHost() {
- try {
- final int NUM_THREADS = 10;
- final int NUM_SPLITS = 500;
- final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
-
- // load some splits
- Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
- for (int i = 0; i < NUM_SPLITS; i++) {
- splits.add(new LocatableInputSplit(i, "testhost"));
- }
-
- final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
-
- final AtomicInteger splitsRetrieved = new AtomicInteger(0);
- final AtomicInteger sumOfIds = new AtomicInteger(0);
-
- Runnable retriever = new Runnable() {
-
- @Override
- public void run() {
- LocatableInputSplit split;
- while ((split = ia.getNextInputSplit("testhost")) != null) {
- splitsRetrieved.incrementAndGet();
- sumOfIds.addAndGet(split.getSplitNumber());
- }
- }
- };
-
- // create the threads
- Thread[] threads = new Thread[NUM_THREADS];
- for (int i = 0; i < NUM_THREADS; i++) {
- threads[i] = new Thread(retriever);
- threads[i].setDaemon(true);
- }
-
- // launch concurrently
- for (int i = 0; i < NUM_THREADS; i++) {
- threads[i].start();
- }
-
- // sync
- for (int i = 0; i < NUM_THREADS; i++) {
- threads[i].join(5000);
- }
-
- // verify
- for (int i = 0; i < NUM_THREADS; i++) {
- if (threads[i].isAlive()) {
- fail("The concurrency test case is erroneous, the thread did not respond in time.");
- }
- }
-
- assertEquals(NUM_SPLITS, splitsRetrieved.get());
- assertEquals(SUM_OF_IDS, sumOfIds.get());
-
- // nothing left
- assertNull(ia.getNextInputSplit("testhost"));
-
- assertEquals(0, ia.getNumberOfRemoteAssignments());
- assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testConcurrentSplitAssignmentForMultipleHosts() {
- try {
- final int NUM_THREADS = 10;
- final int NUM_SPLITS = 500;
- final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
-
- final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" };
-
- // load some splits
- Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
- for (int i = 0; i < NUM_SPLITS; i++) {
- splits.add(new LocatableInputSplit(i, hosts[i%hosts.length]));
- }
-
- final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
-
- final AtomicInteger splitsRetrieved = new AtomicInteger(0);
- final AtomicInteger sumOfIds = new AtomicInteger(0);
-
- Runnable retriever = new Runnable() {
-
- @Override
- public void run() {
- final String threadHost = hosts[(int) (Math.random() * hosts.length)];
-
- LocatableInputSplit split;
- while ((split = ia.getNextInputSplit(threadHost)) != null) {
- splitsRetrieved.incrementAndGet();
- sumOfIds.addAndGet(split.getSplitNumber());
- }
- }
- };
-
- // create the threads
- Thread[] threads = new Thread[NUM_THREADS];
- for (int i = 0; i < NUM_THREADS; i++) {
- threads[i] = new Thread(retriever);
- threads[i].setDaemon(true);
- }
-
- // launch concurrently
- for (int i = 0; i < NUM_THREADS; i++) {
- threads[i].start();
- }
-
- // sync
- for (int i = 0; i < NUM_THREADS; i++) {
- threads[i].join(5000);
- }
-
- // verify
- for (int i = 0; i < NUM_THREADS; i++) {
- if (threads[i].isAlive()) {
- fail("The concurrency test case is erroneous, the thread did not respond in time.");
- }
- }
-
- assertEquals(NUM_SPLITS, splitsRetrieved.get());
- assertEquals(SUM_OF_IDS, sumOfIds.get());
-
- // nothing left
- assertNull(ia.getNextInputSplit("testhost"));
-
- // at least one fraction of hosts needs be local, no matter how bad the thread races
- assertTrue(ia.getNumberOfLocalAssignments() >= NUM_SPLITS / hosts.length);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/NoOpInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/NoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/NoOpInvokable.java
new file mode 100644
index 0000000..c58da13
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/NoOpInvokable.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * An invokable that does nothing.
+ */
+public class NoOpInvokable extends AbstractInvokable {
+
+ @Override
+ public void registerInputOutput() {}
+
+ @Override
+ public void invoke() {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
index 93e52f6..c20e5dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators.sort;
import java.util.ArrayList;
@@ -37,7 +36,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.runtime.operators.sort.MergeIterator;
import org.apache.flink.runtime.operators.sort.MergeMatchIterator;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index ccb5215..86c32ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators.testutils;
import java.util.ArrayList;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index aed4c78..a4d0f32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -250,16 +250,6 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
}
@Override
- public void userThreadStarted(final Thread userThread) {
- // Nothing to do here
- }
-
- @Override
- public void userThreadFinished(final Thread userThread) {
- // Nothing to do here
- }
-
- @Override
public InputSplitProvider getInputSplitProvider() {
return this.inputSplitProvider;
}