You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:45:58 UTC
[62/82] [abbrv] incubator-flink git commit: Fixed JobManagerITCase to
properly wait for task managers to deregister their tasks. Replaced the
scheduler's execution service with akka's futures. Introduced
TestStreamEnvironment to use ForkableFlinkMiniClus
Fixed JobManagerITCase to properly wait for task managers to deregister their tasks. Replaced the scheduler's execution service with akka's futures. Introduced TestStreamEnvironment to use ForkableFlinkMiniCluster for test execution.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c175ebe8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c175ebe8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c175ebe8
Branch: refs/heads/master
Commit: c175ebe84f39c208d0f5ae6e09d45a76869ee86e
Parents: bd4ee47
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 17 13:02:16 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 18 18:58:31 2014 +0100
----------------------------------------------------------------------
.../api/avro/AvroExternalJarProgramITCase.java | 6 +-
.../flink-streaming-core/pom.xml | 7 +
.../api/environment/LocalStreamEnvironment.java | 5 -
.../environment/StreamExecutionEnvironment.java | 2 +-
.../flink/streaming/util/ClusterUtil.java | 1 -
.../apache/flink/streaming/api/IterateTest.java | 5 +-
.../apache/flink/streaming/api/PrintTest.java | 7 +-
.../streaming/api/WindowCrossJoinTest.java | 6 +-
.../flink/streaming/api/WriteAsCsvTest.java | 6 +-
.../flink/streaming/api/WriteAsTextTest.java | 6 +-
.../api/collector/DirectedOutputTest.java | 6 +-
.../api/streamvertex/StreamVertexTest.java | 11 +-
.../streaming/util/TestStreamEnvironment.java | 72 +++
.../org/apache/flink/client/program/Client.java | 2 +-
.../flink/configuration/ConfigConstants.java | 2 +-
.../examples/scala/graph/DeltaPageRank.scala | 2 +-
.../runtime/jobmanager/scheduler/Scheduler.java | 34 +-
.../apache/flink/runtime/client/JobClient.scala | 12 +-
.../flink/runtime/jobmanager/JobManager.scala | 1 -
.../runtime/minicluster/FlinkMiniCluster.scala | 73 +--
.../minicluster/LocalFlinkMiniCluster.scala | 107 +++-
.../flink/runtime/taskmanager/TaskManager.scala | 16 +-
.../ExecutionVertexCancelTest.java | 5 +-
.../ExecutionVertexDeploymentTest.java | 10 +-
.../ScheduleWithCoLocationHintTest.java | 19 +
.../scheduler/SchedulerIsolatedTasksTest.java | 29 +-
.../scheduler/SchedulerSlotSharingTest.java | 32 +-
.../ExecutionGraphRestartTest.scala | 1 -
.../TaskManagerLossFailsTasksTest.scala | 1 -
.../runtime/jobmanager/JobManagerITCase.scala | 623 ++++++++-----------
.../runtime/testingUtils/TestingCluster.scala | 2 +-
.../testingUtils/TestingJobManager.scala | 25 +-
.../TestingJobManagerMessages.scala | 1 +
.../testingUtils/TestingTaskManager.scala | 33 +-
.../TestingTaskManagerMessages.scala | 4 +
flink-scala/pom.xml | 4 +-
.../api/scala/typeutils/EitherSerializer.scala | 2 +-
.../api/scala/typeutils/EitherTypeInfo.scala | 2 +-
.../api/scala/typeutils/NothingSerializer.scala | 2 +-
.../api/scala/typeutils/OptionSerializer.scala | 2 +-
.../api/scala/typeutils/OptionTypeInfo.scala | 2 +-
.../scala/typeutils/TraversableSerializer.scala | 2 +-
.../scala/typeutils/TraversableTypeInfo.scala | 2 +-
flink-test-utils/pom.xml | 134 ++++
.../flink/test/util/AbstractTestBase.java | 8 +-
.../flink/test/util/JavaProgramTestBase.java | 5 +-
.../test/util/ForkableFlinkMiniCluster.scala | 77 +++
.../test/cancelling/CancellingTestBase.java | 5 +-
.../PackagedProgramEndToEndITCase.java | 6 +-
.../apache/flink/test/util/FailingTestBase.java | 7 +-
.../scala/runtime/ScalaSpecialTypesITCase.scala | 2 +-
.../ScalaSpecialTypesSerializerTest.scala | 2 +-
.../runtime/TraversableSerializerTest.scala | 2 +-
53 files changed, 878 insertions(+), 562 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index b637e79..78a6683 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -25,7 +25,7 @@ import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.Assert;
import org.junit.Test;
@@ -39,12 +39,12 @@ public class AvroExternalJarProgramITCase {
@Test
public void testExternalProgram() {
- LocalFlinkMiniCluster testMiniCluster = null;
+ ForkableFlinkMiniCluster testMiniCluster = null;
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
- testMiniCluster = new LocalFlinkMiniCluster(config);
+ testMiniCluster = new ForkableFlinkMiniCluster(config);
String jarFile = JAR_FILE;
String testData = getClass().getResource(TEST_DATA_FILE).toString();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/pom.xml b/flink-addons/flink-streaming/flink-streaming-core/pom.xml
index b2bbccc..b476670 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-core/pom.xml
@@ -46,6 +46,13 @@ under the License.
<artifactId>commons-math</artifactId>
<version>2.2</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 505f0e7..3e44012 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -44,9 +44,4 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(jobName),
getDegreeOfParallelism());
}
-
- public void executeTest(long memorySize) throws Exception {
- ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getDegreeOfParallelism(),
- memorySize);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 783fa28..d26b714 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -90,7 +90,7 @@ public abstract class StreamExecutionEnvironment {
/**
* Gets the degree of parallelism with which operation are executed by
* default. Operations can individually override this value to use a
- * specific degree of parallelism via {@link DataStream#setParallelism}.
+ * specific degree of parallelism.
*
* @return The degree of parallelism used by operations, unless they
* override that value.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index bf5ba73..a7b7137 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -81,5 +81,4 @@ public class ClusterUtil {
public static void runOnMiniCluster(JobGraph jobGraph, int numOfSlots) throws Exception {
runOnMiniCluster(jobGraph, numOfSlots, -1);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 5d910ba..7804e66 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;
@@ -73,7 +74,7 @@ public class IterateTest {
@Test
public void test() throws Exception {
- LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+ StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
env.setBufferTimeout(10);
@@ -86,7 +87,7 @@ public class IterateTest {
iteration.closeWith(increment).addSink(new MySink());
- env.executeTest(MEMORYSIZE);
+ env.execute();
assertTrue(iterated);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index d1d5b1e..757f6f6 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -21,8 +21,8 @@ import java.io.Serializable;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
public class PrintTest implements Serializable {
@@ -50,9 +50,8 @@ public class PrintTest implements Serializable {
@Test
public void test() throws Exception {
- LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+ StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
env.generateSequence(1, 10).map(new IdentityMap()).filter(new FilterAll()).print();
- env.executeTest(MEMORYSIZE);
-
+ env.execute();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index d8cdfa5..37f8c0a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -25,10 +25,10 @@ import java.util.ArrayList;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
public class WindowCrossJoinTest implements Serializable {
@@ -45,7 +45,7 @@ public class WindowCrossJoinTest implements Serializable {
@Test
public void test() throws Exception {
- LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+ StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
env.setBufferTimeout(1);
ArrayList<Tuple2<Integer, String>> in1 = new ArrayList<Tuple2<Integer, String>>();
@@ -111,7 +111,7 @@ public class WindowCrossJoinTest implements Serializable {
})
.addSink(new CrossResultSink());
- env.executeTest(MEMORYSIZE);
+ env.execute();
assertEquals(joinExpectedResults, joinResults);
assertEquals(crossExpectedResults, crossResults);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
index 6b1dd5a..edd3ed5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
@@ -28,9 +28,9 @@ import java.util.List;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -132,7 +132,7 @@ public class WriteAsCsvTest {
@Test
public void test() throws Exception {
- LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+ StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
@SuppressWarnings("unused")
DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test1.txt");
@@ -159,7 +159,7 @@ public class WriteAsCsvTest {
fillExpected5();
- env.executeTest(MEMORYSIZE);
+ env.execute();
readFile(PREFIX + "test1.txt", result1);
readFile(PREFIX + "test2.txt", result2);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
index e21f21d..1cad8a6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
@@ -28,9 +28,9 @@ import java.util.List;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -132,7 +132,7 @@ public class WriteAsTextTest {
@Test
public void test() throws Exception {
- LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+ StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
@SuppressWarnings("unused")
DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1()).writeAsText(PREFIX + "test1.txt");
@@ -159,7 +159,7 @@ public class WriteAsTextTest {
fillExpected5();
- env.executeTest(MEMORYSIZE);
+ env.execute();
readFile(PREFIX + "test1.txt", result1);
readFile(PREFIX + "test2.txt", result2);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 4ab1be2..ad51a6c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
public class DirectedOutputTest {
@@ -104,8 +105,7 @@ public class DirectedOutputTest {
@Test
public void outputSelectorTest() throws Exception {
-
- LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+ StreamExecutionEnvironment env = new TestStreamEnvironment(1, 128);
SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
source.select(EVEN).addSink(new ListSink(EVEN));
@@ -113,7 +113,7 @@ public class DirectedOutputTest {
source.select(EVEN, ODD).addSink(new ListSink(EVEN_AND_ODD));
source.selectAll().addSink(new ListSink(ALL));
- env.executeTest(128);
+ env.execute();
assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), outputs.get(EVEN));
assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), outputs.get(ODD_AND_TEN));
assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
index 9edf44e..b103d84 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;
@@ -145,8 +146,7 @@ public class StreamVertexTest {
@Test
public void coTest() throws Exception {
- LocalStreamEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(SOURCE_PARALELISM);
+ StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
DataStream<Long> generatedSequence = env.generateSequence(0, 3);
@@ -154,7 +154,7 @@ public class StreamVertexTest {
fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());
resultSet = new HashSet<String>();
- env.executeTest(MEMORYSIZE);
+ env.execute();
HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1",
"2", "3"));
@@ -163,12 +163,11 @@ public class StreamVertexTest {
@Test
public void runStream() throws Exception {
- LocalStreamEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(SOURCE_PARALELISM);
+ StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
env.addSource(new MySource()).setParallelism(SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink());
- env.executeTest(MEMORYSIZE);
+ env.execute();
assertEquals(10, data.keySet().size());
for (Integer k : data.keySet()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
new file mode 100644
index 0000000..3918013
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import akka.actor.ActorRef;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
+public class TestStreamEnvironment extends StreamExecutionEnvironment {
+ private static final String DEFAULT_JOBNAME = "TestStreamingJob";
+ private static final String CANNOT_EXECUTE_EMPTY_JOB = "Cannot execute empty job";
+
+ private long memorySize;
+
+ public TestStreamEnvironment(int degreeOfParallelism, long memorySize){
+ this.setDegreeOfParallelism(degreeOfParallelism);
+
+ this.memorySize = memorySize;
+ }
+
+ @Override
+ public void execute() throws Exception {
+ execute(DEFAULT_JOBNAME);
+ }
+
+ @Override
+ public void execute(String jobName) throws Exception {
+ JobGraph jobGraph = jobGraphBuilder.getJobGraph(jobName);
+
+ Configuration configuration = jobGraph.getJobConfiguration();
+
+ configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+ getDegreeOfParallelism());
+ configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
+
+ ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
+
+ try{
+ ActorRef client = cluster.getJobClient();
+ JobClient.submitJobAndWait(jobGraph, false, client);
+ }catch(JobExecutionException e){
+ if(e.getMessage().contains("GraphConversionException")){
+ throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
+ }else{
+ throw e;
+ }
+ }finally{
+ cluster.stop();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 203f294..45848c2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -68,7 +68,7 @@ public class Client {
private final PactCompiler compiler; // the compiler to compile the jobs
- private boolean printStatusDuringExecution;
+ private boolean printStatusDuringExecution = false;
// ------------------------------------------------------------------------
// Construction
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 047cfd1..f0ab180 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -598,7 +598,7 @@ public final class ConfigConstants {
public static String DEFAULT_AKKA_FRAMESIZE = "10485760b";
- public static String DEFAULT_AKKA_LOG_LEVEL = "OFF";
+ public static String DEFAULT_AKKA_LOG_LEVEL = "ERROR";
public static int DEFAULT_AKKA_ASK_TIMEOUT = 100;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
index ae1c6cf..c6eb643 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 7807b73..6495aba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -25,9 +25,11 @@ import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
+import akka.dispatch.Futures;
+import org.apache.flink.runtime.akka.AkkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -48,9 +50,6 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
private final Object globalLock = new Object();
- private final ExecutorService executor;
-
-
/** All instances that the scheduler can deploy to */
private final Set<Instance> allInstances = new HashSet<Instance>();
@@ -69,13 +68,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
private int nonLocalizedAssignments;
-
public Scheduler() {
- this(null);
- }
-
- public Scheduler(ExecutorService executorService) {
- this.executor = executorService;
this.newlyAvailableInstances = new LinkedBlockingQueue<Instance>();
}
@@ -395,19 +388,14 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
// that leads with a high probability to deadlocks, when scheduling fast
this.newlyAvailableInstances.add(instance);
-
- if (this.executor != null) {
- this.executor.execute(new Runnable() {
- @Override
- public void run() {
- handleNewSlot();
- }
- });
- }
- else {
- // for tests, we use the synchronous variant
- handleNewSlot();
- }
+
+ Futures.future(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ handleNewSlot();
+ return null;
+ }
+ }, AkkaUtils.globalExecutionContext());
}
private void handleNewSlot() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
index 0d9f672..53f0daa 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.client
import java.io.IOException
import java.net.InetSocketAddress
+import java.util.concurrent.TimeUnit
import akka.actor.Status.Failure
import akka.actor._
@@ -83,11 +84,13 @@ object JobClient{
def startActorSystemAndActor(config: Configuration): (ActorSystem, ActorRef) = {
implicit val actorSystem = AkkaUtils.createActorSystem(host = "localhost",
port =0, configuration = config)
+
(actorSystem, startActorWithConfiguration(config))
}
- def startActor(jobManagerURL: String)(implicit actorSystem: ActorSystem): ActorRef = {
- actorSystem.actorOf(Props(classOf[JobClient], jobManagerURL), JOB_CLIENT_NAME)
+ def startActor(jobManagerURL: String)(implicit actorSystem: ActorSystem, timeout: FiniteDuration):
+ ActorRef = {
+ actorSystem.actorOf(Props(classOf[JobClient], jobManagerURL, timeout), JOB_CLIENT_NAME)
}
def parseConfiguration(configuration: Configuration): String = {
@@ -109,7 +112,10 @@ object JobClient{
}
def startActorWithConfiguration(config: Configuration)(implicit actorSystem: ActorSystem):
- ActorRef= {
+ ActorRef = {
+ implicit val timeout = FiniteDuration(config.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
+ ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
+
startActor(parseConfiguration(config))
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 822a34c..a72c685 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -421,7 +421,6 @@ object JobManager {
}
jobManagerSystem.awaitTermination()
- println("Shutting down.")
}
def parseArgs(args: Array[String]): (String, Int, Configuration, ExecutionMode) = {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 43be786..1c5a9df 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -22,12 +22,9 @@ import java.util.concurrent.TimeUnit
import akka.pattern.ask
import akka.actor.{ActorRef, ActorSystem}
-import org.apache.flink.api.common.io.FileOutputFormat
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.client.JobClient
import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.util.EnvironmentInformation
import org.slf4j.LoggerFactory
import scala.concurrent.duration.FiniteDuration
@@ -36,6 +33,8 @@ import scala.concurrent.{Future, Await}
abstract class FlinkMiniCluster(userConfiguration: Configuration) {
import FlinkMiniCluster._
+ val HOSTNAME = "localhost"
+
implicit val timeout = FiniteDuration(userConfiguration.getInteger(ConfigConstants
.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
@@ -54,8 +53,6 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
val (taskManagerActorSystems, taskManagerActors) = actorSystemsTaskManagers.unzip
- val jobClientActorSystem = AkkaUtils.createActorSystem()
-
waitForTaskManagersToBeRegistered()
def generateConfiguration(userConfiguration: Configuration): Configuration
@@ -79,21 +76,6 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
configuration)
}
- def getJobClient(): ActorRef ={
- val config = new Configuration()
-
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, FlinkMiniCluster.HOSTNAME)
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort)
-
- JobClient.startActorWithConfiguration(config)(jobClientActorSystem)
- }
-
- def getJobClientActorSystem: ActorSystem = jobClientActorSystem
-
- def getJobManagerRPCPort: Int = {
- configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1)
- }
-
def getJobManager: ActorRef = {
jobManagerActor
}
@@ -116,13 +98,11 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
def shutdown(): Unit = {
taskManagerActorSystems foreach { _.shutdown() }
jobManagerActorSystem.shutdown()
- jobClientActorSystem.shutdown()
}
def awaitTermination(): Unit = {
- jobClientActorSystem.awaitTermination()
- taskManagerActorSystems foreach { _.awaitTermination()}
jobManagerActorSystem.awaitTermination()
+ taskManagerActorSystems foreach { _.awaitTermination()}
}
def waitForTaskManagersToBeRegistered(): Unit = {
@@ -138,51 +118,4 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
object FlinkMiniCluster{
val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster])
- val HOSTNAME = "localhost"
-
- def initializeIOFormatClasses(configuration: Configuration): Unit = {
- try{
- val om = classOf[FileOutputFormat[_]].getDeclaredMethod("initDefaultsFromConfiguration",
- classOf[Configuration])
- om.setAccessible(true)
- om.invoke(null, configuration)
- }catch {
- case e: Exception =>
- LOG.error("Cannot (re) initialize the globally loaded defaults. Some classes might not " +
- "follow the specified default behaviour.")
- }
- }
-
- def getDefaultConfig: Configuration = {
- val config: Configuration = new Configuration
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME)
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants
- .DEFAULT_JOB_MANAGER_IPC_PORT)
- config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ConfigConstants
- .DEFAULT_TASK_MANAGER_IPC_PORT)
- config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants
- .DEFAULT_TASK_MANAGER_DATA_PORT)
- config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, ConfigConstants
- .DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION)
- config.setInteger(ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY, ConfigConstants
- .DEFAULT_JOBCLIENT_POLLING_INTERVAL)
- config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, ConfigConstants
- .DEFAULT_FILESYSTEM_OVERWRITE)
- config.setBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
- ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY)
- var memorySize: Long = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag
- val bufferMem: Long = ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS *
- ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE
- val numTaskManager = 1
- val taskManagerNumSlots: Int = ConfigConstants.DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS
- memorySize = memorySize - (bufferMem * numTaskManager)
- memorySize = (memorySize * ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION).toLong
- memorySize >>>= 20
- memorySize /= numTaskManager
- config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize)
- config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManager)
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots)
- config
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index edf16bb..32fdb37 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -19,41 +19,29 @@
package org.apache.flink.runtime.minicluster
import akka.actor.{ActorRef, ActorSystem}
+import org.apache.flink.api.common.io.FileOutputFormat
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.client.JobClient
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.util.EnvironmentInformation
import org.slf4j.LoggerFactory
class LocalFlinkMiniCluster(userConfiguration: Configuration) extends
FlinkMiniCluster(userConfiguration){
+ import LocalFlinkMiniCluster._
- override def generateConfiguration(userConfiguration: Configuration): Configuration = {
- val forNumberString = System.getProperty("forkNumber")
-
- val forkNumber = try {
- Integer.parseInt(forNumberString)
- }catch{
- case e: NumberFormatException => -1
- }
+ val jobClientActorSystem = AkkaUtils.createActorSystem()
- val config = FlinkMiniCluster.getDefaultConfig
+ override def generateConfiguration(userConfiguration: Configuration): Configuration = {
+ val config = getDefaultConfig
config.addAll(userConfiguration)
- if(forkNumber != -1){
- val jobManagerRPC = 1024 + forkNumber*300
- val taskManagerRPC = 1024 + forkNumber*300 + 100
- val taskManagerData = 1024 + forkNumber*300 + 200
+ setMemory(config)
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRPC)
- config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, taskManagerRPC)
- config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData)
-
- }
-
- FlinkMiniCluster.initializeIOFormatClasses(config)
+ initializeIOFormatClasses(config)
config
}
@@ -80,7 +68,84 @@ FlinkMiniCluster(userConfiguration){
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)
}
- TaskManager.startActorWithConfiguration(FlinkMiniCluster.HOSTNAME, config, false)(system)
+ TaskManager.startActorWithConfiguration(HOSTNAME, config, false)(system)
+ }
+
+ def getJobClient(): ActorRef ={
+ val config = new Configuration()
+
+ config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME)
+ config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort)
+
+
+ JobClient.startActorWithConfiguration(config)(jobClientActorSystem)
+ }
+
+ def getJobClientActorSystem: ActorSystem = jobClientActorSystem
+
+ def getJobManagerRPCPort: Int = {
+ configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1)
+ }
+
+ override def shutdown(): Unit = {
+ super.shutdown()
+ jobClientActorSystem.shutdown()
+ }
+
+ override def awaitTermination(): Unit = {
+ jobClientActorSystem.awaitTermination()
+ super.awaitTermination()
+ }
+
+ def initializeIOFormatClasses(configuration: Configuration): Unit = {
+ try{
+ val om = classOf[FileOutputFormat[_]].getDeclaredMethod("initDefaultsFromConfiguration",
+ classOf[Configuration])
+ om.setAccessible(true)
+ om.invoke(null, configuration)
+ }catch {
+ case e: Exception =>
+ LOG.error("Cannot (re) initialize the globally loaded defaults. Some classes might not " +
+ "follow the specified default behaviour.")
+ }
+ }
+
+ def setMemory(config: Configuration): Unit = {
+ var memorySize: Long = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag
+ val bufferMem: Long = ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS *
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE
+ val numTaskManager = config.getInteger(ConfigConstants
+ .LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
+ val taskManagerNumSlots: Int = ConfigConstants.DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS
+ memorySize = memorySize - (bufferMem * numTaskManager)
+ memorySize = (memorySize * ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION).toLong
+ memorySize >>>= 20
+ memorySize /= numTaskManager
+ config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize)
+ }
+
+ def getDefaultConfig: Configuration = {
+ val config: Configuration = new Configuration
+ config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME)
+ config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants
+ .DEFAULT_JOB_MANAGER_IPC_PORT)
+ config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ConfigConstants
+ .DEFAULT_TASK_MANAGER_IPC_PORT)
+ config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants
+ .DEFAULT_TASK_MANAGER_DATA_PORT)
+ config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, ConfigConstants
+ .DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION)
+ config.setInteger(ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY, ConfigConstants
+ .DEFAULT_JOBCLIENT_POLLING_INTERVAL)
+ config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, ConfigConstants
+ .DEFAULT_FILESYSTEM_OVERWRITE)
+ config.setBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
+ ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY)
+ config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1)
+ config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, ConfigConstants
+ .DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS)
+ config
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7004881..9360a14 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -290,7 +290,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
} catch {
case t: Throwable =>
log.error(t, s"Could not instantiate task with execution ID ${executionID}.")
-
runningTasks.remove(executionID)
for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration)) {
@@ -336,9 +335,11 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
log.info(s"Unregister task with execution ID ${executionID}.")
runningTasks.remove(executionID) match {
case Some(task) =>
- for (entry <- DistributedCache.readFileInfoFromConfig(task.getEnvironment
- .getJobConfiguration)) {
- fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID)
+ if(task.getEnvironment != null) {
+ for (entry <- DistributedCache.readFileInfoFromConfig(task.getEnvironment
+ .getJobConfiguration)) {
+ fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID)
+ }
}
channelManager foreach {
@@ -377,6 +378,10 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
val receiver = this.self
+ val taskName = runningTasks(executionID).getTaskName
+ val numberOfSubtasks = runningTasks(executionID).getNumberOfSubtasks
+ val indexOfSubtask = runningTasks(executionID).getSubtaskIndex
+
futureResponse.mapTo[Boolean].onComplete {
case Success(result) =>
if (!result || executionState == ExecutionState.FINISHED || executionState ==
@@ -384,7 +389,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
receiver ! UnregisterTask(executionID)
}
case Failure(t) =>
- log.error(t, "Execution state change notification failed.")
+ log.error(t, s"Execution state change notification failed for task ${executionID} " +
+ s"($indexOfSubtask/$numberOfSubtasks) of job ${jobID}.")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 689d10a..207be1a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -135,8 +135,9 @@ public class ExecutionVertexCancelTest {
setVertexState(vertex, ExecutionState.SCHEDULED);
assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
- ActorRef taskManager = system.actorOf(Props.create(new CancelSequenceTaskManagerCreator(new
- TaskOperationResult(execId, true), new TaskOperationResult(execId, false))));
+ ActorRef taskManager = TestActorRef.create(system, Props.create(new
+ CancelSequenceTaskManagerCreator(new TaskOperationResult(execId, true),
+ new TaskOperationResult(execId, false))));
Instance instance = getInstance(taskManager);
AllocatedSlot slot = instance.allocateSlot(new JobID());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 4b3bbbf..e5c41b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -55,9 +55,13 @@ public class ExecutionVertexDeploymentTest {
public void testDeployCall() {
try {
final JobVertexID jid = new JobVertexID();
+
+ TestingUtils.setCallingThreadDispatcher(system);
+ ActorRef tm = TestActorRef.create(system, Props.create(SimpleAcknowledgingTaskManager
+ .class));
// mock taskmanager to simply accept the call
- Instance instance = getInstance(ActorRef.noSender());
+ Instance instance = getInstance(tm);
final AllocatedSlot slot = instance.allocateSlot(new JobID());
@@ -67,7 +71,7 @@ public class ExecutionVertexDeploymentTest {
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
vertex.deployToSlot(slot);
- assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
+ assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
// no repeated scheduling
try {
@@ -84,6 +88,8 @@ public class ExecutionVertexDeploymentTest {
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
+ }finally{
+ TestingUtils.setGlobalExecutionContext();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index 5234992..b22ccd0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -26,13 +26,32 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
public class ScheduleWithCoLocationHintTest {
+ private static ActorSystem system;
+
+ @BeforeClass
+ public static void setup(){
+ system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
+ TestingUtils.setCallingThreadDispatcher(system);
+ }
+
+ @AfterClass
+ public static void teardown(){
+ TestingUtils.setGlobalExecutionContext();
+ JavaTestKit.shutdownActorSystem(system);
+ }
+
@Test
public void scheduleAllSharedAndCoLocated() {
try {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index ad040f7..240cdac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -24,6 +24,11 @@ import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.g
import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
import static org.junit.Assert.*;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
@@ -46,6 +51,19 @@ import org.apache.flink.runtime.util.ExecutorThreadFactory;
* Tests for the {@link Scheduler} when scheduling individual tasks.
*/
public class SchedulerIsolatedTasksTest {
+ private static ActorSystem system;
+
+ @BeforeClass
+ public static void setup(){
+ system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
+ TestingUtils.setCallingThreadDispatcher(system);
+ }
+
+ @AfterClass
+ public static void teardown(){
+ TestingUtils.setGlobalExecutionContext();
+ JavaTestKit.shutdownActorSystem(system);
+ }
@Test
public void testAddAndRemoveInstance() {
@@ -182,13 +200,13 @@ public class SchedulerIsolatedTasksTest {
final int NUM_INSTANCES = 50;
final int NUM_SLOTS_PER_INSTANCE = 3;
final int NUM_TASKS_TO_SCHEDULE = 2000;
-
- final ExecutorService executor = Executors.newFixedThreadPool(4, ExecutorThreadFactory.INSTANCE);
+
+ TestingUtils.setGlobalExecutionContext();
try {
// note: since this test asynchronously releases slots, the executor needs release workers.
// doing the release call synchronous can lead to a deadlock
- Scheduler scheduler = new Scheduler(executor);
+ Scheduler scheduler = new Scheduler();
for (int i = 0;i < NUM_INSTANCES; i++) {
scheduler.newInstanceAvailable(getRandomInstance((int) (Math.random() * NUM_SLOTS_PER_INSTANCE) + 1));
@@ -274,9 +292,8 @@ public class SchedulerIsolatedTasksTest {
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
- }
- finally {
- executor.shutdownNow();
+ }finally{
+ TestingUtils.setCallingThreadDispatcher(system);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index ea32065..de90701 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -31,6 +31,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
@@ -40,6 +45,20 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
* Tests for the scheduler when scheduling tasks in slot sharing groups.
*/
public class SchedulerSlotSharingTest {
+ private static ActorSystem system;
+
+ @BeforeClass
+ public static void setup(){
+ system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
+ TestingUtils.setCallingThreadDispatcher(system);
+ }
+
+ @AfterClass
+ public static void teardown(){
+ TestingUtils.setGlobalExecutionContext();
+ JavaTestKit.shutdownActorSystem(system);
+ }
+
@Test
public void scheduleSingleVertexType() {
@@ -776,7 +795,7 @@ public class SchedulerSlotSharingTest {
@Test
public void testSequentialAllocateAndRelease() {
- final ExecutorService exec = Executors.newFixedThreadPool(8);
+ TestingUtils.setGlobalExecutionContext();
try {
final JobVertexID jid1 = new JobVertexID();
final JobVertexID jid2 = new JobVertexID();
@@ -785,7 +804,7 @@ public class SchedulerSlotSharingTest {
final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
- final Scheduler scheduler = new Scheduler(exec);
+ final Scheduler scheduler = new Scheduler();
scheduler.newInstanceAvailable(getRandomInstance(4));
// allocate something from group 1 and 2 interleaved with schedule for group 3
@@ -834,15 +853,15 @@ public class SchedulerSlotSharingTest {
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
- }
- finally {
- exec.shutdownNow();
+ }finally{
+ TestingUtils.setCallingThreadDispatcher(system);
}
}
@Test
public void testConcurrentAllocateAndRelease() {
final ExecutorService executor = Executors.newFixedThreadPool(20);
+ TestingUtils.setGlobalExecutionContext();
try {
for (int run = 0; run < 50; run++) {
final JobVertexID jid1 = new JobVertexID();
@@ -852,7 +871,7 @@ public class SchedulerSlotSharingTest {
final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
- final Scheduler scheduler = new Scheduler(executor);
+ final Scheduler scheduler = new Scheduler();
scheduler.newInstanceAvailable(getRandomInstance(4));
final AtomicInteger enumerator1 = new AtomicInteger();
@@ -1012,6 +1031,7 @@ public class SchedulerSlotSharingTest {
}
finally {
executor.shutdownNow();
+ TestingUtils.setCallingThreadDispatcher(system);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index f4ce7b0..c0c052a 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph
import akka.actor.{Props, ActorSystem}
import akka.testkit.{TestKit}
import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils
import org.apache.flink.runtime.jobgraph.{JobStatus, JobID, JobGraph, AbstractJobVertex}
import org.apache.flink.runtime.jobmanager.Tasks
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 9884bca..97625d9 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph
import akka.actor.{Props, ActorSystem}
import akka.testkit.TestKit
import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionGraphTestUtils}
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils
.SimpleAcknowledgingTaskManager
import org.apache.flink.runtime.jobgraph.{JobStatus, JobID, JobGraph, AbstractJobVertex}