You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/08 15:17:49 UTC
[4/4] flink git commit: [FLINK-4458] Replace ForkableFlinkMiniCluster
by LocalFlinkMiniCluster
[FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFlinkMiniCluster
Rename _configuration to originalConfiguration
Remove testing classes from main scope in flink-runtime
Previously, the ForkableFlinkMiniCluster which resided in flink-test-utils required
these files to be in the main scope of flink-runtime. With the removal of the
ForkableFlinkMiniCluster, these classes are now no longer needed and can be moved
back to the test scope.
This closes #2450.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02b852e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02b852e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02b852e3
Branch: refs/heads/master
Commit: 02b852e3571e46f25fdfc79f43ceb726ddff9ba7
Parents: 920cda4
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Aug 31 17:58:09 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Sep 8 17:17:28 2016 +0200
----------------------------------------------------------------------
.../api/avro/AvroExternalJarProgramITCase.java | 7 +-
.../flink/contrib/streaming/CollectITCase.java | 4 +-
.../operations/DegreesWithExceptionITCase.java | 6 +-
.../ReduceOnEdgesWithExceptionITCase.java | 6 +-
.../ReduceOnNeighborsWithExceptionITCase.java | 6 +-
.../apache/flink/ml/util/FlinkTestBase.scala | 11 +-
.../clusterframework/FlinkResourceManager.java | 13 +-
.../testutils/TestingResourceManager.java | 137 ------
.../flink/runtime/jobmanager/JobManager.scala | 45 +-
.../runtime/messages/TaskManagerMessages.scala | 26 ++
.../runtime/minicluster/FlinkMiniCluster.scala | 73 +++-
.../minicluster/LocalFlinkMiniCluster.scala | 235 ++++++++---
.../flink/runtime/taskmanager/TaskManager.scala | 130 ++++--
.../testingUtils/TestingJobManager.scala | 72 ----
.../testingUtils/TestingJobManagerLike.scala | 417 -------------------
.../TestingJobManagerMessages.scala | 133 ------
.../testingUtils/TestingMemoryArchivist.scala | 43 --
.../runtime/testingUtils/TestingMessages.scala | 40 --
.../testingUtils/TestingTaskManager.scala | 70 ----
.../testingUtils/TestingTaskManagerLike.scala | 248 -----------
.../TestingTaskManagerMessages.scala | 94 -----
.../LeaderElectionRetrievalTestingCluster.java | 3 +-
.../testutils/TestingResourceManager.java | 137 ++++++
.../runtime/testingUtils/TestingCluster.scala | 322 ++++++++------
.../testingUtils/TestingJobManager.scala | 71 ++++
.../testingUtils/TestingJobManagerLike.scala | 417 +++++++++++++++++++
.../TestingJobManagerMessages.scala | 132 ++++++
.../testingUtils/TestingMemoryArchivist.scala | 43 ++
.../runtime/testingUtils/TestingMessages.scala | 40 ++
.../testingUtils/TestingTaskManager.scala | 70 ++++
.../testingUtils/TestingTaskManagerLike.scala | 234 +++++++++++
.../TestingTaskManagerMessages.scala | 82 ++++
.../flink/api/scala/ScalaShellITCase.scala | 7 +-
.../cassandra/CassandraConnectorITCase.java | 6 +-
.../kafka/KafkaShortRetentionTestBase.java | 6 +-
.../connectors/kafka/KafkaTestBase.java | 6 +-
.../manualtests/ManualExactlyOnceTest.java | 4 +-
...nualExactlyOnceWithStreamReshardingTest.java | 4 +-
...ScalaStreamingMultipleProgramsTestBase.scala | 5 +-
.../flink-test-utils/pom.xml | 149 -------
.../util/StreamingMultipleProgramsTestBase.java | 4 +-
.../streaming/util/TestStreamEnvironment.java | 8 +-
.../flink/test/util/AbstractTestBase.java | 3 +-
.../test/util/MultipleProgramsTestBase.java | 3 +-
.../apache/flink/test/util/TestBaseUtils.java | 31 +-
.../apache/flink/test/util/TestEnvironment.java | 7 +-
.../test/util/ForkableFlinkMiniCluster.scala | 335 ---------------
.../accumulators/AccumulatorErrorITCase.java | 6 +-
.../accumulators/AccumulatorLiveITCase.java | 1 -
.../test/cancelling/CancelingTestBase.java | 7 +-
.../EventTimeAllWindowCheckpointingITCase.java | 6 +-
.../EventTimeWindowCheckpointingITCase.java | 6 +-
.../test/checkpointing/RescalingITCase.java | 6 +-
.../test/checkpointing/SavepointITCase.java | 19 +-
.../StreamCheckpointNotifierITCase.java | 6 +-
.../StreamFaultToleranceTestBase.java | 6 +-
.../WindowCheckpointingITCase.java | 6 +-
.../test/classloading/ClassLoaderITCase.java | 7 +-
.../clients/examples/JobRetrievalITCase.java | 5 +-
.../JobSubmissionFailsITCase.java | 6 +-
.../CustomDistributionITCase.java | 4 +-
.../RemoteEnvironmentITCase.java | 7 +-
.../flink/test/misc/AutoParallelismITCase.java | 6 +-
.../test/misc/CustomSerializationITCase.java | 6 +-
.../test/misc/MiscellaneousIssuesITCase.java | 6 +-
...SuccessAfterNetworkBuffersFailureITCase.java | 6 +-
.../flink/test/query/QueryableStateITCase.java | 6 +-
.../flink/test/recovery/FastFailuresITCase.java | 4 +-
...SimpleRecoveryFailureRateStrategyITBase.java | 6 +-
...RecoveryFixedDelayRestartStrategyITBase.java | 6 +-
.../test/recovery/SimpleRecoveryITCaseBase.java | 4 +-
.../TaskManagerFailureRecoveryITCase.java | 6 +-
.../flink/test/runtime/IPv6HostnamesITCase.java | 6 +-
.../ZooKeeperLeaderElectionITCase.java | 56 +--
.../test/streaming/runtime/TimestampITCase.java | 6 +-
.../flink/test/web/WebFrontendITCase.java | 6 +-
.../jobmanager/JobManagerFailsITCase.scala | 8 +-
.../taskmanager/TaskManagerFailsITCase.scala | 12 +-
flink-yarn-tests/pom.xml | 8 +
.../org/apache/flink/yarn/YarnTestBase.java | 1 -
tools/maven/scalastyle-config.xml | 2 +-
81 files changed, 2037 insertions(+), 2167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index 29a7e58..1030ff8 100644
--- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -25,12 +25,11 @@ import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.Assert;
import org.junit.Test;
-
public class AvroExternalJarProgramITCase {
private static final String JAR_FILE = "maven-test-jar.jar";
@@ -40,12 +39,12 @@ public class AvroExternalJarProgramITCase {
@Test
public void testExternalProgram() {
- ForkableFlinkMiniCluster testMiniCluster = null;
+ LocalFlinkMiniCluster testMiniCluster = null;
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
- testMiniCluster = new ForkableFlinkMiniCluster(config, false);
+ testMiniCluster = new LocalFlinkMiniCluster(config, false);
testMiniCluster.start();
String jarFile = JAR_FILE;
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
index 10ea85c..d691621 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
@@ -19,9 +19,9 @@
package org.apache.flink.contrib.streaming;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.Test;
@@ -36,7 +36,7 @@ public class CollectITCase {
@Test
public void testCollect() throws Exception {
- final ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(new Configuration(), false);
+ final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new Configuration(), false);
try {
cluster.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
index 551a97b..02eea07 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.types.LongValue;
import org.junit.AfterClass;
@@ -39,7 +39,7 @@ public class DegreesWithExceptionITCase {
private static final int PARALLELISM = 4;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
@@ -47,7 +47,7 @@ public class DegreesWithExceptionITCase {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
index 56a0a59..666f7ef 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
@@ -30,7 +30,7 @@ import org.apache.flink.graph.EdgesFunctionWithVertexValue;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -42,7 +42,7 @@ public class ReduceOnEdgesWithExceptionITCase {
private static final int PARALLELISM = 4;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
@@ -50,7 +50,7 @@ public class ReduceOnEdgesWithExceptionITCase {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
index 7458e08..0bbdc84 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
@@ -31,7 +31,7 @@ import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
import org.apache.flink.graph.ReduceNeighborsFunction;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -43,7 +43,7 @@ public class ReduceOnNeighborsWithExceptionITCase {
private static final int PARALLELISM = 4;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
@@ -51,7 +51,7 @@ public class ReduceOnNeighborsWithExceptionITCase {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
index fb98f24..6353d6a 100644
--- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
+++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
@@ -18,14 +18,15 @@
package org.apache.flink.ml.util
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils, TestEnvironment}
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+import org.apache.flink.test.util.{TestBaseUtils, TestEnvironment}
import org.scalatest.{BeforeAndAfter, Suite}
-/** Mixin to start and stop a ForkableFlinkMiniCluster automatically for Scala based tests.
+/** Mixin to start and stop a LocalFlinkMiniCluster automatically for Scala based tests.
* Additionally a TestEnvironment with the started cluster is created and set as the default
* [[org.apache.flink.api.java.ExecutionEnvironment]].
*
- * This mixin starts a ForkableFlinkMiniCluster with one TaskManager and a number of slots given
+ * This mixin starts a LocalFlinkMiniCluster with one TaskManager and a number of slots given
* by parallelism. This value can be overridden in a sub class in order to start the cluster
* with a different number of slots.
*
@@ -37,7 +38,7 @@ import org.scalatest.{BeforeAndAfter, Suite}
* @example
* {{{
* def testSomething: Unit = {
- * // Obtain TestEnvironment with started ForkableFlinkMiniCluster
+ * // Obtain TestEnvironment with started LocalFlinkMiniCluster
* val env = ExecutionEnvironment.getExecutionEnvironment
*
* env.fromCollection(...)
@@ -50,7 +51,7 @@ import org.scalatest.{BeforeAndAfter, Suite}
trait FlinkTestBase extends BeforeAndAfter {
that: Suite =>
- var cluster: Option[ForkableFlinkMiniCluster] = None
+ var cluster: Option[LocalFlinkMiniCluster] = None
val parallelism = 4
before {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index 95be084..7ea286d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -767,8 +767,19 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
Class<? extends FlinkResourceManager<?>> resourceManagerClass,
String resourceManagerActorName) {
- Props resourceMasterProps = Props.create(resourceManagerClass, configuration, leaderRetriever);
+ Props resourceMasterProps = getResourceManagerProps(
+ resourceManagerClass,
+ configuration,
+ leaderRetriever);
return actorSystem.actorOf(resourceMasterProps, resourceManagerActorName);
}
+
+ public static Props getResourceManagerProps(
+ Class<? extends FlinkResourceManager> resourceManagerClass,
+ Configuration configuration,
+ LeaderRetrievalService leaderRetrievalService) {
+
+ return Props.create(resourceManagerClass, configuration, leaderRetrievalService);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
deleted file mode 100644
index 495cacd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testutils;
-
-import akka.actor.ActorRef;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.testingUtils.TestingMessages;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-
-/**
- * A testing resource manager which may alter the default standalone resource master's behavior.
- */
-public class TestingResourceManager extends StandaloneResourceManager {
-
- /** Set of Actors which want to be informed of a connection to the job manager */
- private Set<ActorRef> waitForResourceManagerConnected = new HashSet<>();
-
- /** Set of Actors which want to be informed of a shutdown */
- private Set<ActorRef> waitForShutdown = new HashSet<>();
-
- /** Flag to signal a connection to the JobManager */
- private boolean isConnected = false;
-
- public TestingResourceManager(Configuration flinkConfig, LeaderRetrievalService leaderRetriever) {
- super(flinkConfig, leaderRetriever);
- }
-
- /**
- * Overwrite messages here if desired
- */
- @Override
- protected void handleMessage(Object message) {
-
- if (message instanceof GetRegisteredResources) {
- sender().tell(new GetRegisteredResourcesReply(getStartedTaskManagers()), self());
- } else if (message instanceof FailResource) {
- ResourceID resourceID = ((FailResource) message).resourceID;
- notifyWorkerFailed(resourceID, "Failed for test case.");
-
- } else if (message instanceof NotifyWhenResourceManagerConnected) {
- if (isConnected) {
- sender().tell(
- Messages.getAcknowledge(),
- self());
- } else {
- waitForResourceManagerConnected.add(sender());
- }
- } else if (message instanceof RegisterResourceManagerSuccessful) {
- super.handleMessage(message);
-
- isConnected = true;
-
- for (ActorRef ref : waitForResourceManagerConnected) {
- ref.tell(
- Messages.getAcknowledge(),
- self());
- }
- waitForResourceManagerConnected.clear();
-
- } else if (message instanceof TestingMessages.NotifyOfComponentShutdown$) {
- waitForShutdown.add(sender());
- } else if (message instanceof TestingMessages.Alive$) {
- sender().tell(Messages.getAcknowledge(), self());
- } else {
- super.handleMessage(message);
- }
- }
-
- /**
- * Testing messages
- */
- public static class GetRegisteredResources {}
-
- public static class GetRegisteredResourcesReply {
-
- public Collection<ResourceID> resources;
-
- public GetRegisteredResourcesReply(Collection<ResourceID> resources) {
- this.resources = resources;
- }
-
- }
-
- /**
- * Fails all resources that the resource manager has registered
- */
- public static class FailResource {
-
- public ResourceID resourceID;
-
- public FailResource(ResourceID resourceID) {
- this.resourceID = resourceID;
- }
- }
-
- /**
- * The sender of this message will be informed of a connection to the Job Manager
- */
- public static class NotifyWhenResourceManagerConnected {}
-
- /**
- * Inform registered listeners about a shutdown of the application.
- */
- @Override
- protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
- for (ActorRef listener : waitForShutdown) {
- listener.tell(new TestingMessages.ComponentShutdown(self()), self());
- }
- waitForShutdown.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 88af604..f67be0e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa
import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.executiongraph.{StatusListenerMessenger, ExecutionGraph, ExecutionJobVertex}
+import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex, StatusListenerMessenger}
import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
@@ -2721,7 +2721,7 @@ object JobManager {
configuration,
None)
- val archiveProps = Props(archiveClass, archiveCount)
+ val archiveProps = getArchiveProps(archiveClass, archiveCount)
// start the archiver with the given name, or without (avoid name conflicts)
val archive: ActorRef = archiveActorName match {
@@ -2729,7 +2729,7 @@ object JobManager {
case None => actorSystem.actorOf(archiveProps)
}
- val jobManagerProps = Props(
+ val jobManagerProps = getJobManagerProps(
jobManagerClass,
configuration,
executorService,
@@ -2754,6 +2754,45 @@ object JobManager {
(jobManager, archive)
}
+ def getArchiveProps(archiveClass: Class[_ <: MemoryArchivist], archiveCount: Int): Props = {
+ Props(archiveClass, archiveCount)
+ }
+
+ def getJobManagerProps(
+ jobManagerClass: Class[_ <: JobManager],
+ configuration: Configuration,
+ executorService: ExecutorService,
+ instanceManager: InstanceManager,
+ scheduler: FlinkScheduler,
+ libraryCacheManager: BlobLibraryCacheManager,
+ archive: ActorRef,
+ restartStrategyFactory: RestartStrategyFactory,
+ timeout: FiniteDuration,
+ leaderElectionService: LeaderElectionService,
+ submittedJobGraphStore: SubmittedJobGraphStore,
+ checkpointRecoveryFactory: CheckpointRecoveryFactory,
+ savepointStore: SavepointStore,
+ jobRecoveryTimeout: FiniteDuration,
+ metricsRegistry: Option[FlinkMetricRegistry]): Props = {
+
+ Props(
+ jobManagerClass,
+ configuration,
+ executorService,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archive,
+ restartStrategyFactory,
+ timeout,
+ leaderElectionService,
+ submittedJobGraphStore,
+ checkpointRecoveryFactory,
+ savepointStore,
+ jobRecoveryTimeout,
+ metricsRegistry)
+ }
+
// --------------------------------------------------------------------------
// Resolving the JobManager endpoint
// --------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index 2d99245..b433015 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -130,6 +130,16 @@ object TaskManagerMessages {
*/
case class RequestTaskManagerLog(requestType : LogTypeRequest)
+ /** Requests the number of active connections at the ConnectionManager */
+ case object RequestNumActiveConnections
+
+ case class ResponseNumActiveConnections(number: Int)
+
+ /** Requests the number of broadcast variables with references */
+ case object RequestBroadcastVariablesWithReferences
+
+ case class ResponseBroadcastVariablesWithReferences(number: Int)
+
// --------------------------------------------------------------------------
// Utility getters for case objects to simplify access from Java
@@ -166,4 +176,20 @@ object TaskManagerMessages {
def getRequestTaskManagerStdout(): AnyRef = {
RequestTaskManagerLog(StdOutFileRequest)
}
+
+ /**
+ * Accessor for the case object instance, to simplify Java interoperability.
+ * @return The RequestBroadcastVariablesWithReferences case object instance.
+ */
+ def getRequestBroadcastVariablesWithReferences(): RequestBroadcastVariablesWithReferences.type = {
+ RequestBroadcastVariablesWithReferences
+ }
+
+ /**
+ * Accessor for the case object instance, to simplify Java interoperability.
+ * @return The RequestNumActiveConnections case object instance.
+ */
+ def getRequestNumActiveConnections(): RequestNumActiveConnections.type = {
+ RequestNumActiveConnections
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index a547d25..0178bd3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -69,7 +69,7 @@ abstract class FlinkMiniCluster(
ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
InetAddress.getByName("localhost").getHostAddress())
- val configuration = generateConfiguration(userConfiguration)
+ protected val originalConfiguration = generateConfiguration(userConfiguration)
/** Future to the [[ActorGateway]] of the current leader */
var leaderGateway: Promise[ActorGateway] = Promise()
@@ -79,16 +79,16 @@ abstract class FlinkMiniCluster(
/** Future lock */
val futureLock = new Object()
-
+
implicit val executionContext = ExecutionContext.global
- implicit val timeout = AkkaUtils.getTimeout(configuration)
+ implicit val timeout = AkkaUtils.getTimeout(originalConfiguration)
- val haMode = HighAvailabilityMode.fromConfig(configuration)
+ val haMode = HighAvailabilityMode.fromConfig(originalConfiguration)
val numJobManagers = getNumberOfJobManagers
- var numTaskManagers = configuration.getInteger(
+ var numTaskManagers = originalConfiguration.getInteger(
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)
@@ -105,6 +105,22 @@ abstract class FlinkMiniCluster(
private var isRunning = false
+ def configuration: Configuration = {
+ if (originalConfiguration.getInteger(
+ ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) == 0) {
+ val leaderConfiguration = new Configuration(originalConfiguration)
+
+ val leaderPort = getLeaderRPCPort
+
+ leaderConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, leaderPort)
+
+ leaderConfiguration
+ } else {
+ originalConfiguration
+ }
+ }
+
// --------------------------------------------------------------------------
// Abstract Methods
// --------------------------------------------------------------------------
@@ -125,7 +141,7 @@ abstract class FlinkMiniCluster(
if(haMode == HighAvailabilityMode.NONE) {
1
} else {
- configuration.getInteger(
+ originalConfiguration.getInteger(
ConfigConstants.LOCAL_NUMBER_JOB_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER
)
@@ -136,7 +152,7 @@ abstract class FlinkMiniCluster(
if(haMode == HighAvailabilityMode.NONE) {
1
} else {
- configuration.getInteger(
+ originalConfiguration.getInteger(
ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER
)
@@ -177,40 +193,55 @@ abstract class FlinkMiniCluster(
Await.result(indexFuture, timeout)
}
+ def getLeaderRPCPort: Int = {
+ val index = getLeaderIndex(timeout)
+
+ jobManagerActorSystems match {
+ case Some(jmActorSystems) =>
+ AkkaUtils.getAddress(jmActorSystems(index)).port match {
+ case Some(p) => p
+ case None => -1
+ }
+
+ case None => throw new Exception("The JobManager of the LocalFlinkMiniCluster has not been " +
+ "started properly.")
+ }
+ }
+
def getResourceManagerAkkaConfig(index: Int): Config = {
if (useSingleActorSystem) {
- AkkaUtils.getAkkaConfig(configuration, None)
+ AkkaUtils.getAkkaConfig(originalConfiguration, None)
} else {
- val port = configuration.getInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
+ val port = originalConfiguration.getInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
+ ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
val resolvedPort = if(port != 0) port + index else port
- AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
+ AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort)))
}
}
def getJobManagerAkkaConfig(index: Int): Config = {
if (useSingleActorSystem) {
- AkkaUtils.getAkkaConfig(configuration, None)
+ AkkaUtils.getAkkaConfig(originalConfiguration, None)
}
else {
- val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+ val port = originalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
val resolvedPort = if(port != 0) port + index else port
- AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
+ AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort)))
}
}
def getTaskManagerAkkaConfig(index: Int): Config = {
- val port = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
+ val port = originalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
val resolvedPort = if(port != 0) port + index else port
- AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
+ AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort)))
}
/**
@@ -257,7 +288,7 @@ abstract class FlinkMiniCluster(
"The FlinkMiniCluster has not been started yet.")
}
} else {
- JobClient.startJobClientActorSystem(configuration)
+ JobClient.startJobClientActorSystem(originalConfiguration)
}
}
@@ -320,7 +351,7 @@ abstract class FlinkMiniCluster(
val jobManagerAkkaURL = AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0))
- webMonitor = startWebServer(configuration, jmActorSystems(0), jobManagerAkkaURL)
+ webMonitor = startWebServer(originalConfiguration, jmActorSystems(0), jobManagerAkkaURL)
if(waitForTaskManagerRegistration) {
waitForTaskManagersToBeRegistered()
@@ -528,7 +559,7 @@ abstract class FlinkMiniCluster(
new StandaloneLeaderRetrievalService(
AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0)))
} else {
- ZooKeeperUtils.createLeaderRetrievalService(configuration)
+ ZooKeeperUtils.createLeaderRetrievalService(originalConfiguration)
}
case _ => throw new Exception("The FlinkMiniCluster has not been started properly.")
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index d30c047..cac5d91 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -18,23 +18,36 @@
package org.apache.flink.runtime.minicluster
-import akka.actor.{ActorRef, ActorSystem}
-import org.apache.flink.api.common.JobID
+import java.util.concurrent.ExecutorService
+import akka.actor.{ActorRef, ActorSystem, Props}
+import org.apache.flink.api.common.JobID
import org.apache.flink.api.common.io.FileOutputFormat
import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
-import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.clusterframework.types.{ResourceID, ResourceIDRetrievable}
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.io.network.netty.NettyConfig
-import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
+import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
+import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.messages.JobManagerMessages
-import org.apache.flink.runtime.messages.JobManagerMessages.{CancellationFailure, CancellationResponse, StoppingFailure, StoppingResponse, RunningJobsStatus, RunningJobs}
-import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse}
+import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation}
import org.apache.flink.runtime.util.EnvironmentInformation
import scala.concurrent.Await
+import scala.concurrent.duration.FiniteDuration
/**
* Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same
@@ -65,8 +78,25 @@ class LocalFlinkMiniCluster(
config
}
+ //------------------------------------------------------------------------------------------------
+ // Actor classes
+ //------------------------------------------------------------------------------------------------
+
+ val jobManagerClass: Class[_ <: JobManager] = classOf[JobManager]
+
+ val taskManagerClass: Class[_ <: TaskManager] = classOf[TaskManager]
+
+ val memoryArchivistClass: Class[_ <: MemoryArchivist] = classOf[MemoryArchivist]
+
+ val resourceManagerClass: Class[_ <: FlinkResourceManager[_ <: ResourceIDRetrievable]] =
+ classOf[StandaloneResourceManager]
+
+ //------------------------------------------------------------------------------------------------
+ // Start methods for the distributed components
+ //------------------------------------------------------------------------------------------------
+
override def startJobManager(index: Int, system: ActorSystem): ActorRef = {
- val config = configuration.clone()
+ val config = originalConfiguration.clone()
val jobManagerName = getJobManagerName(index)
val archiveName = getArchiveName(index)
@@ -79,19 +109,48 @@ class LocalFlinkMiniCluster(
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
}
- val (jobManager, _) = JobManager.startJobManagerActors(
- config,
- system,
- Some(jobManagerName),
- Some(archiveName),
- classOf[JobManager],
- classOf[MemoryArchivist])
-
- jobManager
+ val (executorService,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ restartStrategyFactory,
+ timeout,
+ archiveCount,
+ leaderElectionService,
+ submittedJobGraphStore,
+ checkpointRecoveryFactory,
+ savepointStore,
+ jobRecoveryTimeout,
+ metricsRegistry) = JobManager.createJobManagerComponents(config, createLeaderElectionService())
+
+ val archive = system.actorOf(
+ getArchiveProps(
+ memoryArchivistClass,
+ archiveCount),
+ archiveName)
+
+ system.actorOf(
+ getJobManagerProps(
+ jobManagerClass,
+ config,
+ executorService,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archive,
+ restartStrategyFactory,
+ timeout,
+ leaderElectionService,
+ submittedJobGraphStore,
+ checkpointRecoveryFactory,
+ savepointStore,
+ jobRecoveryTimeout,
+ metricsRegistry),
+ jobManagerName)
}
override def startResourceManager(index: Int, system: ActorSystem): ActorRef = {
- val config = configuration.clone()
+ val config = originalConfiguration.clone()
val resourceManagerName = getResourceManagerName(index)
@@ -103,18 +162,16 @@ class LocalFlinkMiniCluster(
config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
}
- val resourceManager = FlinkResourceManager.startResourceManagerActors(
+ val resourceManagerProps = getResourceManagerProps(
+ resourceManagerClass,
config,
- system,
- createLeaderRetrievalService(),
- classOf[StandaloneResourceManager],
- resourceManagerName)
+ createLeaderRetrievalService())
- resourceManager
+ system.actorOf(resourceManagerProps, resourceManagerName)
}
override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
- val config = configuration.clone()
+ val config = originalConfiguration.clone()
val rpcPort = config.getInteger(
ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
@@ -138,32 +195,115 @@ class LocalFlinkMiniCluster(
} else {
TaskManager.TASK_MANAGER_NAME
}
-
- TaskManager.startTaskManagerComponentsAndActor(
+
+ val resourceID = ResourceID.generate() // generate random resource id
+
+ val (taskManagerConfig,
+ taskManagerLocation,
+ memoryManager,
+ ioManager,
+ network,
+ leaderRetrievalService) = TaskManager.createTaskManagerComponents(
config,
- ResourceID.generate(), // generate random resource id
- system,
+ resourceID,
hostname, // network interface to bind to
- Some(taskManagerActorName), // actor name
- Some(createLeaderRetrievalService()), // job manager leader retrieval service
localExecution, // start network stack?
- classOf[TaskManager])
+ Some(createLeaderRetrievalService()))
+
+ val props = getTaskManagerProps(
+ taskManagerClass,
+ taskManagerConfig,
+ resourceID,
+ taskManagerLocation,
+ memoryManager,
+ ioManager,
+ network,
+ leaderRetrievalService)
+
+ system.actorOf(props, taskManagerActorName)
}
- def getLeaderRPCPort: Int = {
- val index = getLeaderIndex(timeout)
+ //------------------------------------------------------------------------------------------------
+ // Props for the distributed components
+ //------------------------------------------------------------------------------------------------
- jobManagerActorSystems match {
- case Some(jmActorSystems) =>
- AkkaUtils.getAddress(jmActorSystems(index)).port match {
- case Some(p) => p
- case None => -1
- }
+ def getArchiveProps(archiveClass: Class[_ <: MemoryArchivist], archiveCount: Int): Props = {
+ JobManager.getArchiveProps(archiveClass, archiveCount)
+ }
- case None => throw new Exception("The JobManager of the LocalFlinkMiniCluster has not been " +
- "started properly.")
- }
+ def getJobManagerProps(
+ jobManagerClass: Class[_ <: JobManager],
+ configuration: Configuration,
+ executorService: ExecutorService,
+ instanceManager: InstanceManager,
+ scheduler: Scheduler,
+ libraryCacheManager: BlobLibraryCacheManager,
+ archive: ActorRef,
+ restartStrategyFactory: RestartStrategyFactory,
+ timeout: FiniteDuration,
+ leaderElectionService: LeaderElectionService,
+ submittedJobGraphStore: SubmittedJobGraphStore,
+ checkpointRecoveryFactory: CheckpointRecoveryFactory,
+ savepointStore: SavepointStore,
+ jobRecoveryTimeout: FiniteDuration,
+ metricsRegistry: Option[MetricRegistry]): Props = {
+
+ JobManager.getJobManagerProps(
+ jobManagerClass,
+ configuration,
+ executorService,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archive,
+ restartStrategyFactory,
+ timeout,
+ leaderElectionService,
+ submittedJobGraphStore,
+ checkpointRecoveryFactory,
+ savepointStore,
+ jobRecoveryTimeout,
+ metricsRegistry)
+ }
+
+ def getTaskManagerProps(
+ taskManagerClass: Class[_ <: TaskManager],
+ taskManagerConfig: TaskManagerConfiguration,
+ resourceID: ResourceID,
+ taskManagerLocation: TaskManagerLocation,
+ memoryManager: MemoryManager,
+ ioManager: IOManager,
+ networkEnvironment: NetworkEnvironment,
+ leaderRetrievalService: LeaderRetrievalService): Props = {
+
+ TaskManager.getTaskManagerProps(
+ taskManagerClass,
+ taskManagerConfig,
+ resourceID,
+ taskManagerLocation,
+ memoryManager,
+ ioManager,
+ networkEnvironment,
+ leaderRetrievalService)
+ }
+
+ def getResourceManagerProps(
+ resourceManagerClass: Class[_ <: FlinkResourceManager[_ <: ResourceIDRetrievable]],
+ configuration: Configuration,
+ leaderRetrievalService: LeaderRetrievalService): Props = {
+
+ FlinkResourceManager.getResourceManagerProps(
+ resourceManagerClass,
+ configuration,
+ leaderRetrievalService)
+ }
+
+ //------------------------------------------------------------------------------------------------
+ // Helper methods
+ //------------------------------------------------------------------------------------------------
+ def createLeaderElectionService(): Option[LeaderElectionService] = {
+ None
}
def initializeIOFormatClasses(configuration: Configuration): Unit = {
@@ -186,7 +326,7 @@ class LocalFlinkMiniCluster(
val bufferSize: Int = config.getInteger(
ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)
-
+
val bufferMem: Long = config.getLong(
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) * bufferSize.toLong
@@ -218,6 +358,7 @@ class LocalFlinkMiniCluster(
val config: Configuration = new Configuration()
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, hostname)
+ config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)
@@ -252,11 +393,11 @@ class LocalFlinkMiniCluster(
JobManager.ARCHIVE_NAME
}
}
-
+
// --------------------------------------------------------------------------
// Actions on running jobs
// --------------------------------------------------------------------------
-
+
def currentlyRunningJobs: Iterable[JobID] = {
val leader = getLeaderGateway(timeout)
val future = leader.ask(JobManagerMessages.RequestRunningJobsStatus, timeout)
@@ -269,7 +410,7 @@ class LocalFlinkMiniCluster(
currentlyRunningJobs.foreach(list.add)
list
}
-
+
def stopJob(id: JobID) : Unit = {
val leader = getLeaderGateway(timeout)
val response = leader.ask(new JobManagerMessages.StopJob(id), timeout)
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 84750a3..de85f30 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -354,6 +354,21 @@ class TaskManager(
case None =>
sender() ! new IOException("BlobService not available. Cannot upload TaskManager logs.")
}
+
+ case RequestBroadcastVariablesWithReferences =>
+ sender ! decorateMessage(
+ ResponseBroadcastVariablesWithReferences(
+ bcVarManager.getNumberOfVariablesWithReferences)
+ )
+
+ case RequestNumActiveConnections =>
+ val numActive = if (!network.isShutdown) {
+ network.getConnectionManager.getNumberOfActiveConnections
+ } else {
+ 0
+ }
+
+ sender ! decorateMessage(ResponseNumActiveConnections(numActive))
}
/**
@@ -1781,6 +1796,7 @@ object TaskManager {
}
/**
+ * Starts the task manager actor.
*
* @param configuration The configuration for the TaskManager.
* @param resourceID The id of the resource which the task manager will run on.
@@ -1817,11 +1833,75 @@ object TaskManager {
taskManagerClass: Class[_ <: TaskManager])
: ActorRef = {
- val (taskManagerConfig : TaskManagerConfiguration,
- netConfig: NetworkEnvironmentConfiguration,
- taskManagerAddress: InetSocketAddress,
- memType: MemoryType
- ) = parseTaskManagerConfiguration(
+ val (taskManagerConfig,
+ connectionInfo,
+ memoryManager,
+ ioManager,
+ network,
+ leaderRetrievalService) = createTaskManagerComponents(
+ configuration,
+ resourceID,
+ taskManagerHostname,
+ localTaskManagerCommunication,
+ leaderRetrievalServiceOption)
+
+ // create the actor properties (which define the actor constructor parameters)
+ val tmProps = getTaskManagerProps(
+ taskManagerClass,
+ taskManagerConfig,
+ resourceID,
+ connectionInfo,
+ memoryManager,
+ ioManager,
+ network,
+ leaderRetrievalService)
+
+ taskManagerActorName match {
+ case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
+ case None => actorSystem.actorOf(tmProps)
+ }
+ }
+
+ def getTaskManagerProps(
+ taskManagerClass: Class[_ <: TaskManager],
+ taskManagerConfig: TaskManagerConfiguration,
+ resourceID: ResourceID,
+ taskManagerLocation: TaskManagerLocation,
+ memoryManager: MemoryManager,
+ ioManager: IOManager,
+ networkEnvironment: NetworkEnvironment,
+ leaderRetrievalService: LeaderRetrievalService
+ ): Props = {
+ Props(
+ taskManagerClass,
+ taskManagerConfig,
+ resourceID,
+ taskManagerLocation,
+ memoryManager,
+ ioManager,
+ networkEnvironment,
+ taskManagerConfig.numberOfSlots,
+ leaderRetrievalService)
+ }
+
+ def createTaskManagerComponents(
+ configuration: Configuration,
+ resourceID: ResourceID,
+ taskManagerHostname: String,
+ localTaskManagerCommunication: Boolean,
+ leaderRetrievalServiceOption: Option[LeaderRetrievalService]):
+ (TaskManagerConfiguration,
+ TaskManagerLocation,
+ MemoryManager,
+ IOManager,
+ NetworkEnvironment,
+ LeaderRetrievalService) = {
+
+ val (taskManagerConfig : TaskManagerConfiguration,
+ netConfig: NetworkEnvironmentConfiguration,
+ taskManagerAddress: InetSocketAddress,
+ memType: MemoryType
+ ) = parseTaskManagerConfiguration(
configuration,
taskManagerHostname,
localTaskManagerCommunication)
@@ -1895,10 +1975,10 @@ object TaskManager {
// check if a value has been configured
val configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
- ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
- "MemoryManager needs at least one MB of memory. " +
- "If you leave this config parameter empty, the system automatically " +
- "pick a fraction of the available memory.")
+ ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
+ "MemoryManager needs at least one MB of memory. " +
+ "If you leave this config parameter empty, the system automatically " +
+ "pick a fraction of the available memory.")
val preAllocateMemory = configuration.getBoolean(
@@ -1910,7 +1990,7 @@ object TaskManager {
LOG.info(s"Using $configuredMemory MB for managed memory.")
} else {
LOG.info(s"Limiting managed memory to $configuredMemory MB, " +
- s"memory will be allocated lazily.")
+ s"memory will be allocated lazily.")
}
configuredMemory << 20 // megabytes to bytes
}
@@ -1928,10 +2008,10 @@ object TaskManager {
if (preAllocateMemory) {
LOG.info(s"Using $fraction of the currently free heap space for managed " +
- s"heap memory (${relativeMemSize >> 20} MB).")
+ s"heap memory (${relativeMemSize >> 20} MB).")
} else {
LOG.info(s"Limiting managed memory to $fraction of the currently free heap space " +
- s"(${relativeMemSize >> 20} MB), memory will be allocated lazily.")
+ s"(${relativeMemSize >> 20} MB), memory will be allocated lazily.")
}
relativeMemSize
@@ -1944,10 +2024,10 @@ object TaskManager {
if (preAllocateMemory) {
LOG.info(s"Using $fraction of the maximum memory size for " +
- s"managed off-heap memory (${directMemorySize >> 20} MB).")
+ s"managed off-heap memory (${directMemorySize >> 20} MB).")
} else {
LOG.info(s"Limiting managed memory to $fraction of the maximum memory size " +
- s"(${directMemorySize >> 20} MB), memory will be allocated lazily.")
+ s"(${directMemorySize >> 20} MB), memory will be allocated lazily.")
}
directMemorySize
@@ -1971,12 +2051,12 @@ object TaskManager {
memType match {
case MemoryType.HEAP =>
throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
- s" while allocating the TaskManager heap memory ($memorySize bytes).", e)
+ s" while allocating the TaskManager heap memory ($memorySize bytes).", e)
case MemoryType.OFF_HEAP =>
throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
- s" while allocating the TaskManager off-heap memory ($memorySize bytes). " +
- s"Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e)
+ s" while allocating the TaskManager off-heap memory ($memorySize bytes). " +
+ s"Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e)
case _ => throw e
}
@@ -1990,22 +2070,12 @@ object TaskManager {
case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
}
- // create the actor properties (which define the actor constructor parameters)
- val tmProps = Props(
- taskManagerClass,
- taskManagerConfig,
- resourceID,
+ (taskManagerConfig,
taskManagerLocation,
memoryManager,
ioManager,
network,
- taskManagerConfig.numberOfSlots,
leaderRetrievalService)
-
- taskManagerActorName match {
- case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
- case None => actorSystem.actorOf(tmProps)
- }
}
@@ -2055,8 +2125,8 @@ object TaskManager {
* @param taskManagerHostname The host name under which the TaskManager communicates.
* @param localTaskManagerCommunication True, to skip initializing the network stack.
* Use only in cases where only one task manager runs.
- * @return A tuple (TaskManagerConfiguration, network configuration,
- * InstanceConnectionInfo, JobManager actor Akka URL).
+ * @return A tuple (TaskManagerConfiguration, network configuration, inet socket address,
+ * memory tyep).
*/
@throws(classOf[IllegalArgumentException])
def parseTaskManagerConfiguration(
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
deleted file mode 100644
index 16331ac..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.ActorRef
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
-import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
-import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.metrics.MetricRegistry
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import java.util.concurrent.ExecutorService
-
-/** JobManager implementation extended by testing messages
- *
- */
-class TestingJobManager(
- flinkConfiguration: Configuration,
- executorService: ExecutorService,
- instanceManager: InstanceManager,
- scheduler: Scheduler,
- libraryCacheManager: BlobLibraryCacheManager,
- archive: ActorRef,
- restartStrategyFactory: RestartStrategyFactory,
- timeout: FiniteDuration,
- leaderElectionService: LeaderElectionService,
- submittedJobGraphs : SubmittedJobGraphStore,
- checkpointRecoveryFactory : CheckpointRecoveryFactory,
- savepointStore : SavepointStore,
- jobRecoveryTimeout : FiniteDuration,
- metricRegistry : Option[MetricRegistry])
- extends JobManager(
- flinkConfiguration,
- executorService,
- instanceManager,
- scheduler,
- libraryCacheManager,
- archive,
- restartStrategyFactory,
- timeout,
- leaderElectionService,
- submittedJobGraphs,
- checkpointRecoveryFactory,
- savepointStore,
- jobRecoveryTimeout,
- metricRegistry)
- with TestingJobManagerLike {}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
deleted file mode 100644
index 3947b17..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ /dev/null
@@ -1,417 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.{ActorRef, Cancellable, Terminated}
-import akka.pattern.{ask, pipe}
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.FlinkActor
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.jobmanager.JobManager
-import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient, RequestClassloadingProps}
-import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
-import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
-import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
-import org.apache.flink.runtime.testingUtils.TestingMessages._
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
-
-import scala.collection.mutable
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-/** This mixin can be used to decorate a JobManager with messages for testing purpose. */
-trait TestingJobManagerLike extends FlinkActor {
- that: JobManager =>
-
- import context._
-
- import scala.collection.JavaConverters._
-
- val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
- val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
-
- val waitForAllVerticesToBeRunningOrFinished =
- scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
-
- var periodicCheck: Option[Cancellable] = None
-
- val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
- collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
-
- val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]()
-
- val waitForLeader = scala.collection.mutable.HashSet[ActorRef]()
-
- val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
- new Ordering[(Int, ActorRef)] {
- override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1
- })
-
- val waitForClient = scala.collection.mutable.HashSet[ActorRef]()
-
- val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
-
- var disconnectDisabled = false
-
- var postStopEnabled = true
-
- abstract override def postStop(): Unit = {
- if (postStopEnabled) {
- super.postStop()
- } else {
- // only stop leader election service to revoke the leadership of this JM so that a new JM
- // can be elected leader
- leaderElectionService.stop()
- }
- }
-
- abstract override def handleMessage: Receive = {
- handleTestingMessage orElse super.handleMessage
- }
-
- def handleTestingMessage: Receive = {
- case Alive => sender() ! Acknowledge
-
- case RequestExecutionGraph(jobID) =>
- currentJobs.get(jobID) match {
- case Some((executionGraph, jobInfo)) => sender() ! decorateMessage(
- ExecutionGraphFound(
- jobID,
- executionGraph)
- )
-
- case None => archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender())
- }
-
- case WaitForAllVerticesToBeRunning(jobID) =>
- if(checkIfAllVerticesRunning(jobID)){
- sender() ! decorateMessage(AllVerticesRunning(jobID))
- }else{
- val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]())
- waitForAllVerticesToBeRunning += jobID -> (waiting + sender())
-
- if(periodicCheck.isEmpty){
- periodicCheck =
- Some(
- context.system.scheduler.schedule(
- 0 seconds,
- 200 millis,
- self,
- decorateMessage(NotifyListeners)
- )
- )
- }
- }
- case WaitForAllVerticesToBeRunningOrFinished(jobID) =>
- if(checkIfAllVerticesRunningOrFinished(jobID)){
- sender() ! decorateMessage(AllVerticesRunning(jobID))
- }else{
- val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]())
- waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender())
-
- if(periodicCheck.isEmpty){
- periodicCheck =
- Some(
- context.system.scheduler.schedule(
- 0 seconds,
- 200 millis,
- self,
- decorateMessage(NotifyListeners)
- )
- )
- }
- }
-
- case NotifyListeners =>
- for(jobID <- currentJobs.keySet){
- notifyListeners(jobID)
- }
-
- if(waitForAllVerticesToBeRunning.isEmpty && waitForAllVerticesToBeRunningOrFinished.isEmpty) {
- periodicCheck foreach { _.cancel() }
- periodicCheck = None
- }
-
-
- case NotifyWhenJobRemoved(jobID) =>
- val gateways = instanceManager.getAllRegisteredInstances.asScala.map(_.getActorGateway)
-
- val responses = gateways.map{
- gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean]
- }
-
- val jobRemovedOnJobManager = (self ? CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean]
-
- val allFutures = responses ++ Seq(jobRemovedOnJobManager)
-
- import context.dispatcher
- Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo sender()
-
- case CheckIfJobRemoved(jobID) =>
- if(currentJobs.contains(jobID)) {
- context.system.scheduler.scheduleOnce(
- 200 milliseconds,
- self,
- decorateMessage(CheckIfJobRemoved(jobID))
- )(context.dispatcher, sender())
- } else {
- sender() ! decorateMessage(true)
- }
-
- case NotifyWhenTaskManagerTerminated(taskManager) =>
- val waiting = waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set())
- waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + sender)
-
- case msg@Terminated(taskManager) =>
- super.handleMessage(msg)
-
- waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
- _ foreach {
- listener =>
- listener ! decorateMessage(TaskManagerTerminated(taskManager))
- }
- }
-
- // see shutdown method for reply
- case NotifyOfComponentShutdown =>
- waitForShutdown += sender()
-
- case NotifyWhenAccumulatorChange(jobID) =>
-
- val (updated, registered) = waitForAccumulatorUpdate.
- getOrElse(jobID, (false, Set[ActorRef]()))
- waitForAccumulatorUpdate += jobID -> (updated, registered + sender)
- sender ! true
-
- /**
- * Notification from the task manager that changed accumulator are transferred on next
- * Hearbeat. We need to keep this state to notify the listeners on next Heartbeat report.
- */
- case AccumulatorsChanged(jobID: JobID) =>
- waitForAccumulatorUpdate.get(jobID) match {
- case Some((updated, registered)) =>
- waitForAccumulatorUpdate.put(jobID, (true, registered))
- case None =>
- }
-
- /**
- * Disabled async processing of accumulator values and send accumulators to the listeners if
- * we previously received an [[AccumulatorsChanged]] message.
- */
- case msg : Heartbeat =>
- super.handleMessage(msg)
-
- waitForAccumulatorUpdate foreach {
- case (jobID, (updated, actors)) if updated =>
- currentJobs.get(jobID) match {
- case Some((graph, jobInfo)) =>
- val flinkAccumulators = graph.getFlinkAccumulators
- val userAccumulators = graph.aggregateUserAccumulators
- actors foreach {
- actor => actor ! UpdatedAccumulators(jobID, flinkAccumulators, userAccumulators)
- }
- case None =>
- }
- waitForAccumulatorUpdate.put(jobID, (false, actors))
- case _ =>
- }
-
- case RequestWorkingTaskManager(jobID) =>
- currentJobs.get(jobID) match {
- case Some((eg, _)) =>
- if(eg.getAllExecutionVertices.asScala.isEmpty){
- sender ! decorateMessage(WorkingTaskManager(None))
- } else {
- val resource = eg.getAllExecutionVertices.asScala.head.getCurrentAssignedResource
-
- if(resource == null){
- sender ! decorateMessage(WorkingTaskManager(None))
- } else {
- sender ! decorateMessage(
- WorkingTaskManager(
- Some(resource.getTaskManagerActorGateway())
- )
- )
- }
- }
- case None => sender ! decorateMessage(WorkingTaskManager(None))
- }
-
- case NotifyWhenJobStatus(jobID, state) =>
- val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID,
- scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]())
-
- val listener = jobStatusListener.getOrElse(state, Set[ActorRef]())
-
- jobStatusListener += state -> (listener + sender)
-
- case msg@JobStatusChanged(jobID, newJobStatus, _, _) =>
- super.handleMessage(msg)
-
- val cleanup = waitForJobStatus.get(jobID) match {
- case Some(stateListener) =>
- stateListener.remove(newJobStatus) match {
- case Some(listeners) =>
- listeners foreach {
- _ ! decorateMessage(JobStatusIs(jobID, newJobStatus))
- }
- case _ =>
- }
- stateListener.isEmpty
-
- case _ => false
- }
-
- if (cleanup) {
- waitForJobStatus.remove(jobID)
- }
-
- case DisableDisconnect =>
- disconnectDisabled = true
-
- case DisablePostStop =>
- postStopEnabled = false
-
- case RequestSavepoint(savepointPath) =>
- try {
- val savepoint = savepointStore.loadSavepoint(savepointPath)
- sender ! ResponseSavepoint(savepoint)
- }
- catch {
- case e: Exception =>
- sender ! ResponseSavepoint(null)
- }
-
- case msg: Disconnect =>
- if (!disconnectDisabled) {
- super.handleMessage(msg)
-
- val taskManager = sender()
-
- waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
- _ foreach {
- listener =>
- listener ! decorateMessage(TaskManagerTerminated(taskManager))
- }
- }
- }
-
- case NotifyWhenLeader =>
- if (leaderElectionService.hasLeadership) {
- sender() ! true
- } else {
- waitForLeader += sender()
- }
-
- case msg: GrantLeadership =>
- super.handleMessage(msg)
-
- waitForLeader.foreach(_ ! true)
-
- waitForLeader.clear()
-
- case NotifyWhenClientConnects =>
- waitForClient += sender()
- sender() ! true
-
- case msg: RegisterJobClient =>
- super.handleMessage(msg)
- waitForClient.foreach(_ ! ClientConnected)
- case msg: RequestClassloadingProps =>
- super.handleMessage(msg)
- waitForClient.foreach(_ ! ClassLoadingPropsDelivered)
-
- case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) =>
- if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) {
- // there are already at least numRegisteredTaskManager registered --> send Acknowledge
- sender() ! Acknowledge
- } else {
- // wait until we see at least numRegisteredTaskManager being registered at the JobManager
- waitForNumRegisteredTaskManagers += ((numRegisteredTaskManager, sender()))
- }
-
- // TaskManager may be registered on these two messages
- case msg @ (_: RegisterTaskManager) =>
- super.handleMessage(msg)
-
- // dequeue all senders which wait for instanceManager.getNumberOfStartedTaskManagers or
- // fewer registered TaskManagers
- while (waitForNumRegisteredTaskManagers.nonEmpty &&
- waitForNumRegisteredTaskManagers.head._1 <=
- instanceManager.getNumberOfRegisteredTaskManagers) {
- val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
- receiver ! Acknowledge
- }
- }
-
- def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
- currentJobs.get(jobID) match {
- case Some((eg, _)) =>
- eg.getAllExecutionVertices.asScala.forall( _.getExecutionState == ExecutionState.RUNNING)
- case None => false
- }
- }
-
- def checkIfAllVerticesRunningOrFinished(jobID: JobID): Boolean = {
- currentJobs.get(jobID) match {
- case Some((eg, _)) =>
- eg.getAllExecutionVertices.asScala.forall {
- case vertex =>
- (vertex.getExecutionState == ExecutionState.RUNNING
- || vertex.getExecutionState == ExecutionState.FINISHED)
- }
- case None => false
- }
- }
-
- def notifyListeners(jobID: JobID): Unit = {
- if(checkIfAllVerticesRunning(jobID)) {
- waitForAllVerticesToBeRunning.remove(jobID) match {
- case Some(listeners) =>
- for (listener <- listeners) {
- listener ! decorateMessage(AllVerticesRunning(jobID))
- }
- case _ =>
- }
- }
-
- if(checkIfAllVerticesRunningOrFinished(jobID)) {
- waitForAllVerticesToBeRunningOrFinished.remove(jobID) match {
- case Some(listeners) =>
- for (listener <- listeners) {
- listener ! decorateMessage(AllVerticesRunning(jobID))
- }
- case _ =>
- }
- }
- }
-
- /**
- * No killing of the VM for testing.
- */
- override protected def shutdown(): Unit = {
- log.info("Shutting down TestingJobManager.")
- waitForShutdown.foreach(_ ! ComponentShutdown(self))
- waitForShutdown.clear()
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
deleted file mode 100644
index f121305..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import java.util.Map
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.JobID
-import org.apache.flink.api.common.accumulators.Accumulator
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
-import org.apache.flink.runtime.instance.ActorGateway
-import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
-
-object TestingJobManagerMessages {
-
- case class RequestExecutionGraph(jobID: JobID)
-
- sealed trait ResponseExecutionGraph {
- def jobID: JobID
- }
-
- case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph) extends
- ResponseExecutionGraph
-
- case class ExecutionGraphNotFound(jobID: JobID) extends ResponseExecutionGraph
-
- case class WaitForAllVerticesToBeRunning(jobID: JobID)
- case class WaitForAllVerticesToBeRunningOrFinished(jobID: JobID)
- case class AllVerticesRunning(jobID: JobID)
-
- case class NotifyWhenJobRemoved(jobID: JobID)
-
- case class RequestWorkingTaskManager(jobID: JobID)
- case class WorkingTaskManager(gatewayOption: Option[ActorGateway])
-
- case class NotifyWhenJobStatus(jobID: JobID, state: JobStatus)
- case class JobStatusIs(jobID: JobID, state: JobStatus)
-
- case object NotifyListeners
-
- case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
- case class TaskManagerTerminated(taskManager: ActorRef)
-
- /**
- * Registers a listener to receive a message when accumulators changed.
- * The change must be explicitly triggered by the TestingTaskManager which can receive an
- * [[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]]
- * message by a task that changed the accumulators. This message is then
- * forwarded to the JobManager which will send the accumulators in the [[UpdatedAccumulators]]
- * message when the next Heartbeat occurs.
- */
- case class NotifyWhenAccumulatorChange(jobID: JobID)
-
- /**
- * Reports updated accumulators back to the listener.
- */
- case class UpdatedAccumulators(jobID: JobID,
- flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, Accumulator[_,_]]],
- userAccumulators: Map[String, Accumulator[_,_]])
-
- /** Notifies the sender when the [[TestingJobManager]] has been elected as the leader
- *
- */
- case object NotifyWhenLeader
-
- /**
- * Notifies the sender when the [[TestingJobManager]] receives new clients for jobs
- */
- case object NotifyWhenClientConnects
- /**
- * Notifes of client connect
- */
- case object ClientConnected
- /**
- * Notifies when the client has requested class loading information
- */
- case object ClassLoadingPropsDelivered
-
- /**
- * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]]
- * message when at least numRegisteredTaskManager have registered at the JobManager.
- *
- * @param numRegisteredTaskManager minimum number of registered TMs before the sender is notified
- */
- case class NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager: Int)
-
- /** Disables the post stop method of the [[TestingJobManager]].
- *
- * Only the leaderElectionService is stopped in the postStop method call to revoke the leadership
- */
- case object DisablePostStop
-
- /**
- * Requests a savepoint from the job manager.
- *
- * @param savepointPath The path of the savepoint to request.
- */
- case class RequestSavepoint(savepointPath: String)
-
- /**
- * Response to a savepoint request.
- *
- * @param savepoint The requested savepoint or null if none available.
- */
- case class ResponseSavepoint(savepoint: Savepoint)
-
- def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader
- def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects
- def getDisablePostStop(): AnyRef = DisablePostStop
-
- def getClientConnected(): AnyRef = ClientConnected
- def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
deleted file mode 100644
index 48a1ddd..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import org.apache.flink.runtime.jobmanager.MemoryArchivist
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphFound, ExecutionGraphNotFound, RequestExecutionGraph}
-
-/** Memory archivist extended by testing messages
- *
- * @param maxEntries number of maximum number of archived jobs
- */
-class TestingMemoryArchivist(maxEntries: Int) extends MemoryArchivist(maxEntries) {
-
- override def handleMessage: Receive = {
- handleTestingMessage orElse super.handleMessage
- }
-
- def handleTestingMessage: Receive = {
- case RequestExecutionGraph(jobID) =>
- val executionGraph = graphs.get(jobID)
-
- executionGraph match {
- case Some(graph) => sender ! decorateMessage(ExecutionGraphFound(jobID, graph))
- case None => sender ! decorateMessage(ExecutionGraphNotFound(jobID))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
deleted file mode 100644
index 91d169a..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.JobID
-
-object TestingMessages {
-
- case class CheckIfJobRemoved(jobID: JobID)
-
- case object DisableDisconnect
-
- case object Alive
-
- def getAlive: AnyRef = Alive
-
- def getDisableDisconnect: AnyRef = DisableDisconnect
-
- case object NotifyOfComponentShutdown
- case class ComponentShutdown(ref: ActorRef)
-
- def getNotifyOfComponentShutdown(): AnyRef = NotifyOfComponentShutdown
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
deleted file mode 100644
index 9b5a147..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.io.disk.iomanager.IOManager
-import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
-import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManagerLocation, TaskManager, TaskManagerConfiguration}
-
-import scala.language.postfixOps
-
-/** Subclass of the [[TaskManager]] to support testing messages
- */
-class TestingTaskManager(
- config: TaskManagerConfiguration,
- resourceID: ResourceID,
- connectionInfo: TaskManagerLocation,
- memoryManager: MemoryManager,
- ioManager: IOManager,
- network: NetworkEnvironment,
- numberOfSlots: Int,
- leaderRetrievalService: LeaderRetrievalService)
- extends TaskManager(
- config,
- resourceID,
- connectionInfo,
- memoryManager,
- ioManager,
- network,
- numberOfSlots,
- leaderRetrievalService)
- with TestingTaskManagerLike {
-
- def this(
- config: TaskManagerConfiguration,
- connectionInfo: TaskManagerLocation,
- memoryManager: MemoryManager,
- ioManager: IOManager,
- network: NetworkEnvironment,
- numberOfSlots: Int,
- leaderRetrievalService: LeaderRetrievalService) {
- this(
- config,
- ResourceID.generate(),
- connectionInfo,
- memoryManager,
- ioManager,
- network,
- numberOfSlots,
- leaderRetrievalService)
- }
-}