You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/08 15:17:46 UTC
[1/4] flink git commit: [FLINK-4458] Replace ForkableFlinkMiniCluster
by LocalFlinkMiniCluster
Repository: flink
Updated Branches:
refs/heads/master 920cda408 -> 02b852e35
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 9bd8cc3..2738d22 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.runtime.leaderelection;
import akka.actor.ActorSystem;
import akka.actor.Kill;
import akka.actor.PoisonPill;
-import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -35,14 +35,19 @@ import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
@@ -52,7 +57,6 @@ import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;
import java.io.File;
-import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -61,22 +65,20 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
private static final FiniteDuration timeout = TestingUtils.TESTING_DURATION();
- private static final File tempDirectory;
+ private static TestingServer zkServer;
- static {
- try {
- tempDirectory = org.apache.flink.runtime.testutils
- .CommonTestUtils.createTempDirectory();
- }
- catch (IOException e) {
- throw new RuntimeException("Test setup failed", e);
- }
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ zkServer = new TestingServer(true);
}
@AfterClass
public static void tearDown() throws Exception {
- if (tempDirectory != null) {
- FileUtils.deleteDirectory(tempDirectory);
+ if (zkServer != null) {
+ zkServer.close();
}
}
@@ -86,18 +88,19 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
*/
@Test
public void testTaskManagerRegistrationAtReelectedLeader() throws Exception {
- Configuration configuration = new Configuration();
+ File rootFolder = tempFolder.getRoot();
+
+ Configuration configuration = ZooKeeperTestUtils.createZooKeeperHAConfig(
+ zkServer.getConnectString(),
+ rootFolder.getPath());
int numJMs = 10;
int numTMs = 3;
- configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
- configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
- configuration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
- ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
+ TestingCluster cluster = new TestingCluster(configuration);
try {
cluster.start();
@@ -137,14 +140,15 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
int numSlotsPerTM = 3;
int parallelism = numTMs * numSlotsPerTM;
- Configuration configuration = new Configuration();
+ File rootFolder = tempFolder.getRoot();
+
+ Configuration configuration = ZooKeeperTestUtils.createZooKeeperHAConfig(
+ zkServer.getConnectString(),
+ rootFolder.getPath());
- configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
- configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
- configuration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make
// sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message
@@ -169,7 +173,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
final JobGraph graph = new JobGraph("Blocking test job", sender, receiver);
- final ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
+ final TestingCluster cluster = new TestingCluster(configuration);
ActorSystem clientActorSystem = null;
@@ -250,14 +254,14 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
boolean finished = false;
final ActorSystem clientActorSystem;
- final ForkableFlinkMiniCluster cluster;
+ final LocalFlinkMiniCluster cluster;
final JobGraph graph;
final Promise<JobExecutionResult> resultPromise = new Promise.DefaultPromise<>();
public JobSubmitterRunnable(
ActorSystem actorSystem,
- ForkableFlinkMiniCluster cluster,
+ LocalFlinkMiniCluster cluster,
JobGraph graph) {
this.clientActorSystem = actorSystem;
this.cluster = cluster;
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index d693aaa..2ed759d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -43,7 +44,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TestLogger;
@@ -75,7 +75,7 @@ public class TimestampITCase extends TestLogger {
static MultiShotLatch latch;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@Before
public void setupLatch() {
@@ -92,7 +92,7 @@ public class TimestampITCase extends TestLogger {
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index fc90994..a8482ac 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -29,11 +29,11 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testutils.StoppableInvokable;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.files.MimeTypes;
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
@@ -62,7 +62,7 @@ public class WebFrontendITCase extends TestLogger {
private static final int NUM_TASK_MANAGERS = 2;
private static final int NUM_SLOTS = 4;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
private static int port = -1;
@@ -86,7 +86,7 @@ public class WebFrontendITCase extends TestLogger {
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.getAbsolutePath());
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
port = cluster.webMonitor().get().getServerPort();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index ac661f3..1b2838d 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -21,7 +21,6 @@ package org.apache.flink.api.scala.runtime.jobmanager
import akka.actor.{ActorSystem, PoisonPill}
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest}
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, NoOpInvokable}
@@ -30,8 +29,7 @@ import org.apache.flink.runtime.messages.Messages.Acknowledge
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered
import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.{JobManagerTerminated, NotifyWhenJobManagerTerminated}
-import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.test.util.ForkableFlinkMiniCluster
+import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -140,12 +138,12 @@ class JobManagerFailsITCase(_system: ActorSystem)
}
}
- def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): ForkableFlinkMiniCluster = {
+ def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = {
val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
- val cluster = new ForkableFlinkMiniCluster(config, singleActorSystem = false)
+ val cluster = new TestingCluster(config, singleActorSystem = false)
cluster.start()
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 258f6df..3b39b3f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -20,7 +20,6 @@ package org.apache.flink.api.scala.runtime.taskmanager
import akka.actor.{ActorSystem, Kill, PoisonPill}
import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest}
import org.apache.flink.configuration.ConfigConstants
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
@@ -31,8 +30,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager, RegisteredAtJobManager}
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
-import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.test.util.ForkableFlinkMiniCluster
+import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -100,7 +98,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
val jobID = jobGraph.getJobID
- val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
+ val cluster = TestingUtils.startTestingCluster(num_tasks, 2)
val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
@@ -152,7 +150,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
val jobID = jobGraph.getJobID
- val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
+ val cluster = TestingUtils.startTestingCluster(num_tasks, 2)
val taskManagers = cluster.getTaskManagers
val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
@@ -239,11 +237,11 @@ class TaskManagerFailsITCase(_system: ActorSystem)
}
}
- def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): ForkableFlinkMiniCluster = {
+ def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = {
val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
- new ForkableFlinkMiniCluster(config, singleActorSystem = false)
+ new TestingCluster(config, singleActorSystem = false)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index 8c211ef..ffdca36 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -48,6 +48,14 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
<!-- Needed for the streaming wordcount example -->
<dependency>
<groupId>org.apache.flink</groupId>
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 0243012..31a3d98 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -48,7 +48,6 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
-import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/tools/maven/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/tools/maven/scalastyle-config.xml b/tools/maven/scalastyle-config.xml
index f7bb0d4..0f7f6bb 100644
--- a/tools/maven/scalastyle-config.xml
+++ b/tools/maven/scalastyle-config.xml
@@ -86,7 +86,7 @@
<!-- </check> -->
<check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
<parameters>
- <parameter name="maxParameters"><![CDATA[10]]></parameter>
+ <parameter name="maxParameters"><![CDATA[15]]></parameter>
</parameters>
</check>
<!-- <check level="error" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true"> -->
[3/4] flink git commit: [FLINK-4458] Replace ForkableFlinkMiniCluster
by LocalFlinkMiniCluster
Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
deleted file mode 100644
index a6963fe..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.{ActorRef, Terminated}
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.FlinkActor
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.messages.JobManagerMessages.{RequestLeaderSessionID, ResponseLeaderSessionID}
-import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
-import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered}
-import org.apache.flink.runtime.messages.TaskMessages.{SubmitTask, TaskInFinalState, UpdateTaskExecutionState}
-import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
-import org.apache.flink.runtime.testingUtils.TestingMessages._
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-/** This mixin can be used to decorate a TaskManager with messages for testing purposes. */
-trait TestingTaskManagerLike extends FlinkActor {
- that: TaskManager =>
-
- import scala.collection.JavaConverters._
-
- val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
- val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
- val waitForRegisteredAtResourceManager =
- scala.collection.mutable.HashMap[ActorRef, Set[ActorRef]]()
- val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
- val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]()
-
- /** Map of registered task submit listeners */
- val registeredSubmitTaskListeners = scala.collection.mutable.HashMap[JobID, ActorRef]()
-
- val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
-
- var disconnectDisabled = false
-
- /**
- * Handler for testing related messages
- */
- abstract override def handleMessage: Receive = {
- handleTestingMessage orElse super.handleMessage
- }
-
- def handleTestingMessage: Receive = {
- case Alive => sender() ! Acknowledge
-
- case NotifyWhenTaskIsRunning(executionID) =>
- Option(runningTasks.get(executionID)) match {
- case Some(task) if task.getExecutionState == ExecutionState.RUNNING =>
- sender ! decorateMessage(true)
-
- case _ =>
- val listeners = waitForRunning.getOrElse(executionID, Set())
- waitForRunning += (executionID -> (listeners + sender))
- }
-
- case RequestRunningTasks =>
- sender ! decorateMessage(ResponseRunningTasks(runningTasks.asScala.toMap))
-
- case NotifyWhenTaskRemoved(executionID) =>
- Option(runningTasks.get(executionID)) match {
- case Some(_) =>
- val set = waitForRemoval.getOrElse(executionID, Set())
- waitForRemoval += (executionID -> (set + sender))
- case None =>
- if(unregisteredTasks.contains(executionID)) {
- sender ! decorateMessage(true)
- } else {
- val set = waitForRemoval.getOrElse(executionID, Set())
- waitForRemoval += (executionID -> (set + sender))
- }
- }
-
- case TaskInFinalState(executionID) =>
- super.handleMessage(TaskInFinalState(executionID))
- waitForRemoval.remove(executionID) match {
- case Some(actors) => for(actor <- actors) actor ! decorateMessage(true)
- case None =>
- }
-
- unregisteredTasks += executionID
-
- case RequestBroadcastVariablesWithReferences =>
- sender ! decorateMessage(
- ResponseBroadcastVariablesWithReferences(
- bcVarManager.getNumberOfVariablesWithReferences)
- )
-
- case RequestNumActiveConnections =>
- val numActive = if (!network.isShutdown) {
- network.getConnectionManager.getNumberOfActiveConnections
- } else {
- 0
- }
- sender ! decorateMessage(ResponseNumActiveConnections(numActive))
-
- case NotifyWhenJobRemoved(jobID) =>
- if(runningTasks.values.asScala.exists(_.getJobID == jobID)){
- context.system.scheduler.scheduleOnce(
- 200 milliseconds,
- self,
- decorateMessage(CheckIfJobRemoved(jobID)))(
- context.dispatcher,
- sender()
- )
- }else{
- sender ! decorateMessage(true)
- }
-
- case CheckIfJobRemoved(jobID) =>
- if(runningTasks.values.asScala.forall(_.getJobID != jobID)){
- sender ! decorateMessage(true)
- } else {
- context.system.scheduler.scheduleOnce(
- 200 milliseconds,
- self,
- decorateMessage(CheckIfJobRemoved(jobID)))(
- context.dispatcher,
- sender()
- )
- }
-
- case NotifyWhenJobManagerTerminated(jobManager) =>
- val waiting = waitForJobManagerToBeTerminated.getOrElse(jobManager.path.name, Set())
- waitForJobManagerToBeTerminated += jobManager.path.name -> (waiting + sender)
-
- case RegisterSubmitTaskListener(jobId) =>
- registeredSubmitTaskListeners.put(jobId, sender())
-
- case msg@SubmitTask(tdd) =>
- registeredSubmitTaskListeners.get(tdd.getJobID) match {
- case Some(listenerRef) =>
- listenerRef ! ResponseSubmitTaskListener(tdd)
- case None =>
- // Nothing to do
- }
-
- super.handleMessage(msg)
-
- /**
- * Message from task manager that accumulator values changed and need to be reported immediately
- * instead of lazily through the
- * [[org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat]] message. We forward this
- * message to the job manager that it knows it should report to the listeners.
- */
- case msg: AccumulatorsChanged =>
- currentJobManager match {
- case Some(jobManager) =>
- jobManager.forward(msg)
- sendHeartbeatToJobManager()
- sender ! true
- case None =>
- }
-
- case msg@Terminated(jobManager) =>
- super.handleMessage(msg)
-
- waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
- _ foreach {
- _ ! decorateMessage(JobManagerTerminated(jobManager))
- }
- }
-
- case msg:Disconnect =>
- if (!disconnectDisabled) {
- super.handleMessage(msg)
-
- val jobManager = sender()
-
- waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
- _ foreach {
- _ ! decorateMessage(JobManagerTerminated(jobManager))
- }
- }
- }
-
- case DisableDisconnect =>
- disconnectDisabled = true
-
- case NotifyOfComponentShutdown =>
- waitForShutdown += sender()
-
- case msg @ UpdateTaskExecutionState(taskExecutionState) =>
- super.handleMessage(msg)
-
- if(taskExecutionState.getExecutionState == ExecutionState.RUNNING) {
- waitForRunning.get(taskExecutionState.getID) foreach {
- _ foreach (_ ! decorateMessage(true))
- }
- }
-
- case RequestLeaderSessionID =>
- sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
-
- case NotifyWhenRegisteredAtJobManager(jobManager: ActorRef) =>
- if(isConnected && jobManager == currentJobManager.get) {
- sender() ! true
- } else {
- val list = waitForRegisteredAtResourceManager.getOrElse(
- jobManager,
- Set[ActorRef]())
-
- waitForRegisteredAtResourceManager += jobManager -> (list + sender())
- }
-
- case msg @ (_: AcknowledgeRegistration | _: AlreadyRegistered) =>
- super.handleMessage(msg)
-
- val jm = sender()
-
- waitForRegisteredAtResourceManager.remove(jm).foreach {
- listeners => listeners.foreach{
- listener =>
- listener ! true
- }
- }
- }
-
- /**
- * No killing of the VM for testing.
- */
- override protected def shutdown(): Unit = {
- log.info("Shutting down TestingJobManager.")
- waitForShutdown.foreach(_ ! ComponentShutdown(self))
- waitForShutdown.clear()
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
deleted file mode 100644
index 974e4e8..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.taskmanager.Task
-
-/**
- * Additional messages that the [[TestingTaskManager]] understands.
- */
-object TestingTaskManagerMessages {
-
- case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
-
- case class NotifyWhenTaskIsRunning(executionID: ExecutionAttemptID)
-
- case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
- import collection.JavaConverters._
- def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava
- }
-
- case class ResponseBroadcastVariablesWithReferences(number: Int)
-
- case object RequestNumActiveConnections
- case class ResponseNumActiveConnections(number: Int)
-
- case object RequestRunningTasks
-
- case object RequestBroadcastVariablesWithReferences
-
- case class NotifyWhenJobManagerTerminated(jobManager: ActorRef)
-
- case class JobManagerTerminated(jobManager: ActorRef)
-
- case class NotifyWhenRegisteredAtJobManager(resourceManager: ActorRef)
-
- /**
- * Message to give a hint to the task manager that accumulator values were updated in the task.
- * This message is forwarded to the job manager which knows that it needs to notify listeners
- * of accumulator updates.
- */
- case class AccumulatorsChanged(jobID: JobID)
-
- /**
- * Registers a listener for all [[org.apache.flink.runtime.messages.TaskMessages.SubmitTask]]
- * messages of the given job.
- *
- * If a task is submitted with the given job ID the task deployment
- * descriptor is forwarded to the listener.
- *
- * @param jobId The job ID to listen for.
- */
- case class RegisterSubmitTaskListener(jobId: JobID)
-
- /**
- * A response to a listened job ID containing the submitted task deployment descriptor.
- *
- * @param tdd The submitted task deployment descriptor.
- */
- case class ResponseSubmitTaskListener(tdd: TaskDeploymentDescriptor)
-
- // --------------------------------------------------------------------------
- // Utility methods to allow simpler case object access from Java
- // --------------------------------------------------------------------------
-
- def getRequestRunningTasksMessage: AnyRef = {
- RequestRunningTasks
- }
-
- def getRequestBroadcastVariablesWithReferencesMessage: AnyRef = {
- RequestBroadcastVariablesWithReferences
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
index e596166..c143fe2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import scala.Option;
@@ -86,7 +85,7 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
@Override
public int getNumberOfJobManagers() {
- return this.configuration().getInteger(
+ return this.originalConfiguration().getInteger(
ConfigConstants.LOCAL_NUMBER_JOB_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
new file mode 100644
index 0000000..495cacd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import akka.actor.ActorRef;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * A testing resource manager which may alter the default standalone resource master's behavior.
+ */
+public class TestingResourceManager extends StandaloneResourceManager {
+
+ /** Set of Actors which want to be informed of a connection to the job manager */
+ private Set<ActorRef> waitForResourceManagerConnected = new HashSet<>();
+
+ /** Set of Actors which want to be informed of a shutdown */
+ private Set<ActorRef> waitForShutdown = new HashSet<>();
+
+ /** Flag to signal a connection to the JobManager */
+ private boolean isConnected = false;
+
+ public TestingResourceManager(Configuration flinkConfig, LeaderRetrievalService leaderRetriever) {
+ super(flinkConfig, leaderRetriever);
+ }
+
+ /**
+ * Overwrite messages here if desired
+ */
+ @Override
+ protected void handleMessage(Object message) {
+
+ if (message instanceof GetRegisteredResources) {
+ sender().tell(new GetRegisteredResourcesReply(getStartedTaskManagers()), self());
+ } else if (message instanceof FailResource) {
+ ResourceID resourceID = ((FailResource) message).resourceID;
+ notifyWorkerFailed(resourceID, "Failed for test case.");
+
+ } else if (message instanceof NotifyWhenResourceManagerConnected) {
+ if (isConnected) {
+ sender().tell(
+ Messages.getAcknowledge(),
+ self());
+ } else {
+ waitForResourceManagerConnected.add(sender());
+ }
+ } else if (message instanceof RegisterResourceManagerSuccessful) {
+ super.handleMessage(message);
+
+ isConnected = true;
+
+ for (ActorRef ref : waitForResourceManagerConnected) {
+ ref.tell(
+ Messages.getAcknowledge(),
+ self());
+ }
+ waitForResourceManagerConnected.clear();
+
+ } else if (message instanceof TestingMessages.NotifyOfComponentShutdown$) {
+ waitForShutdown.add(sender());
+ } else if (message instanceof TestingMessages.Alive$) {
+ sender().tell(Messages.getAcknowledge(), self());
+ } else {
+ super.handleMessage(message);
+ }
+ }
+
+ /**
+ * Testing messages
+ */
+ public static class GetRegisteredResources {}
+
+ public static class GetRegisteredResourcesReply {
+
+ public Collection<ResourceID> resources;
+
+ public GetRegisteredResourcesReply(Collection<ResourceID> resources) {
+ this.resources = resources;
+ }
+
+ }
+
+ /**
+ * Fails all resources that the resource manager has registered
+ */
+ public static class FailResource {
+
+ public ResourceID resourceID;
+
+ public FailResource(ResourceID resourceID) {
+ this.resourceID = resourceID;
+ }
+ }
+
+ /**
+ * The sender of this message will be informed of a connection to the Job Manager
+ */
+ public static class NotifyWhenResourceManagerConnected {}
+
+ /**
+ * Inform registered listeners about a shutdown of the application.
+ */
+ @Override
+ protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+ for (ActorRef listener : waitForShutdown) {
+ listener.tell(new TestingMessages.ComponentShutdown(self()), self());
+ }
+ waitForShutdown.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index b4ba40b..c01a321 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -18,22 +18,32 @@
package org.apache.flink.runtime.testingUtils
-import java.util.concurrent.TimeoutException
+import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
import akka.pattern.ask
-import akka.actor.{ActorRef, Props, ActorSystem}
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.pattern.Patterns._
import akka.testkit.CallingThreadDispatcher
import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
-import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
+import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore}
import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster
+import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.runtime.testutils.TestingResourceManager
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, Future}
/**
@@ -48,7 +58,7 @@ class TestingCluster(
userConfiguration: Configuration,
singleActorSystem: Boolean,
synchronousDispatcher: Boolean)
- extends FlinkMiniCluster(
+ extends LocalFlinkMiniCluster(
userConfiguration,
singleActorSystem) {
@@ -59,133 +69,54 @@ class TestingCluster(
// --------------------------------------------------------------------------
- override def generateConfiguration(userConfig: Configuration): Configuration = {
- val cfg = new Configuration()
- cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
- cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
- cfg.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, 0)
- cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10)
- cfg.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
-
- setDefaultCiConfig(cfg)
-
- cfg.addAll(userConfig)
- cfg
- }
-
- override def startJobManager(index: Int, actorSystem: ActorSystem): ActorRef = {
- val config = configuration.clone()
-
- val jobManagerName = if(singleActorSystem) {
- JobManager.JOB_MANAGER_NAME + "_" + (index + 1)
- } else {
- JobManager.JOB_MANAGER_NAME
- }
-
- val archiveName = if(singleActorSystem) {
- JobManager.ARCHIVE_NAME + "_" + (index + 1)
- } else {
- JobManager.ARCHIVE_NAME
- }
-
- val jobManagerPort = config.getInteger(
- ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
-
- if(jobManagerPort > 0) {
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
- }
-
- val (executionContext,
- instanceManager,
- scheduler,
- libraryCacheManager,
- restartStrategyFactory,
- timeout,
- archiveCount,
- leaderElectionService,
- submittedJobsGraphs,
- checkpointRecoveryFactory,
- savepointStore,
- jobRecoveryTimeout,
- metricRegistry) = JobManager.createJobManagerComponents(
- config,
- createLeaderElectionService())
-
- val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount))
- val archive = actorSystem.actorOf(testArchiveProps, archiveName)
-
- val jobManagerProps = Props(
- new TestingJobManager(
- configuration,
- executionContext,
- instanceManager,
- scheduler,
- libraryCacheManager,
- archive,
- restartStrategyFactory,
- timeout,
- leaderElectionService,
- submittedJobsGraphs,
- checkpointRecoveryFactory,
- savepointStore,
- jobRecoveryTimeout,
- metricRegistry))
-
- val dispatcherJobManagerProps = if (synchronousDispatcher) {
- // disable asynchronous futures (e.g. accumulator update in Heartbeat)
- jobManagerProps.withDispatcher(CallingThreadDispatcher.Id)
- } else {
- jobManagerProps
- }
-
- actorSystem.actorOf(dispatcherJobManagerProps, jobManagerName)
- }
-
- override def startResourceManager(index: Int, system: ActorSystem): ActorRef = {
- val config = configuration.clone()
-
- val resourceManagerName = if(singleActorSystem) {
- FlinkResourceManager.RESOURCE_MANAGER_NAME + "_" + (index + 1)
+ override val jobManagerClass: Class[_ <: JobManager] = classOf[TestingJobManager]
+
+ override val resourceManagerClass: Class[_ <: FlinkResourceManager[_ <: ResourceIDRetrievable]] =
+ classOf[TestingResourceManager]
+
+ override val taskManagerClass: Class[_ <: TaskManager] = classOf[TestingTaskManager]
+
+ override val memoryArchivistClass: Class[_ <: MemoryArchivist] = classOf[TestingMemoryArchivist]
+
+ override def getJobManagerProps(
+ jobManagerClass: Class[_ <: JobManager],
+ configuration: Configuration,
+ executorService: ExecutorService,
+ instanceManager: InstanceManager,
+ scheduler: Scheduler,
+ libraryCacheManager: BlobLibraryCacheManager,
+ archive: ActorRef,
+ restartStrategyFactory: RestartStrategyFactory,
+ timeout: FiniteDuration,
+ leaderElectionService: LeaderElectionService,
+ submittedJobGraphStore: SubmittedJobGraphStore,
+ checkpointRecoveryFactory: CheckpointRecoveryFactory,
+ savepointStore: SavepointStore,
+ jobRecoveryTimeout: FiniteDuration,
+ metricsRegistry: Option[MetricRegistry]): Props = {
+
+ val props = super.getJobManagerProps(
+ jobManagerClass,
+ configuration,
+ executorService,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archive,
+ restartStrategyFactory,
+ timeout,
+ leaderElectionService,
+ submittedJobGraphStore,
+ checkpointRecoveryFactory,
+ savepointStore,
+ jobRecoveryTimeout,
+ metricsRegistry)
+
+ if (synchronousDispatcher) {
+ props.withDispatcher(CallingThreadDispatcher.Id)
} else {
- FlinkResourceManager.RESOURCE_MANAGER_NAME
+ props
}
-
- val resourceManagerPort = config.getInteger(
- ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
-
- if(resourceManagerPort > 0) {
- config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
- }
-
- val testResourceManagerProps = Props(
- new TestingResourceManager(
- config,
- createLeaderRetrievalService()
- ))
-
- system.actorOf(testResourceManagerProps, resourceManagerName)
- }
-
- override def startTaskManager(index: Int, system: ActorSystem) = {
-
- val tmActorName = TaskManager.TASK_MANAGER_NAME + "_" + (index + 1)
-
- TaskManager.startTaskManagerComponentsAndActor(
- configuration,
- ResourceID.generate(),
- system,
- hostname,
- Some(tmActorName),
- Some(createLeaderRetrievalService()),
- numTaskManagers == 1,
- classOf[TestingTaskManager])
- }
-
-
- def createLeaderElectionService(): Option[LeaderElectionService] = {
- None
}
@throws(classOf[TimeoutException])
@@ -228,4 +159,131 @@ class TestingCluster(
Await.ready(combinedFuture, timeout)
}
+
+ def waitForTaskManagersToBeRegisteredAtJobManager(jobManager: ActorRef): Unit = {
+ val futures = taskManagerActors.map {
+ _.map {
+ tm => (tm ? NotifyWhenRegisteredAtJobManager(jobManager))(timeout)
+ }
+ }.getOrElse(Seq())
+
+ try {
+ Await.ready(Future.sequence(futures), timeout)
+ } catch {
+ case t: TimeoutException =>
+ throw new Exception("Timeout while waiting for TaskManagers to register at " +
+ s"${jobManager.path}")
+ }
+
+ }
+
+ def restartLeadingJobManager(): Unit = {
+ this.synchronized {
+ (jobManagerActorSystems, jobManagerActors) match {
+ case (Some(jmActorSystems), Some(jmActors)) =>
+ val leader = getLeaderGateway(AkkaUtils.getTimeout(originalConfiguration))
+ val index = getLeaderIndex(AkkaUtils.getTimeout(originalConfiguration))
+
+ // restart the leading job manager with the same port
+ val port = getLeaderRPCPort
+ val oldPort = originalConfiguration.getInteger(
+ ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+ 0)
+
+ // we have to set the old port in the configuration file because this is used for startup
+ originalConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port)
+
+ clearLeader()
+
+ val stopped = gracefulStop(leader.actor(), TestingCluster.MAX_RESTART_DURATION)
+ Await.result(stopped, TestingCluster.MAX_RESTART_DURATION)
+
+ if(!singleActorSystem) {
+ jmActorSystems(index).shutdown()
+ jmActorSystems(index).awaitTermination()
+ }
+
+ val newJobManagerActorSystem = if(!singleActorSystem) {
+ startJobManagerActorSystem(index)
+ } else {
+ jmActorSystems.head
+ }
+
+ // reset the original configuration
+ originalConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, oldPort)
+
+ val newJobManagerActor = startJobManager(index, newJobManagerActorSystem)
+
+ jobManagerActors = Some(jmActors.patch(index, Seq(newJobManagerActor), 1))
+ jobManagerActorSystems = Some(jmActorSystems.patch(
+ index,
+ Seq(newJobManagerActorSystem),
+ 1))
+
+ val lrs = createLeaderRetrievalService()
+
+ jobManagerLeaderRetrievalService = Some(lrs)
+ lrs.start(this)
+
+ case _ => throw new Exception("The JobManager of the TestingCluster have not " +
+ "been started properly.")
+ }
+ }
+ }
+
+ def restartTaskManager(index: Int): Unit = {
+ (taskManagerActorSystems, taskManagerActors) match {
+ case (Some(tmActorSystems), Some(tmActors)) =>
+ val stopped = gracefulStop(tmActors(index), TestingCluster.MAX_RESTART_DURATION)
+ Await.result(stopped, TestingCluster.MAX_RESTART_DURATION)
+
+ if(!singleActorSystem) {
+ tmActorSystems(index).shutdown()
+ tmActorSystems(index).awaitTermination()
+ }
+
+ val taskManagerActorSystem = if(!singleActorSystem) {
+ startTaskManagerActorSystem(index)
+ } else {
+ tmActorSystems.head
+ }
+
+ val taskManagerActor = startTaskManager(index, taskManagerActorSystem)
+
+ taskManagerActors = Some(tmActors.patch(index, Seq(taskManagerActor), 1))
+ taskManagerActorSystems = Some(tmActorSystems.patch(index, Seq(taskManagerActorSystem), 1))
+
+ case _ => throw new Exception("The TaskManager of the TestingCluster have not " +
+ "been started properly.")
+ }
+ }
+
+ def addTaskManager(): Unit = {
+ if (useSingleActorSystem) {
+ (jobManagerActorSystems, taskManagerActors) match {
+ case (Some(jmSystems), Some(tmActors)) =>
+ val index = numTaskManagers
+ taskManagerActors = Some(tmActors :+ startTaskManager(index, jmSystems(0)))
+ numTaskManagers += 1
+ case _ => throw new IllegalStateException("Cluster has not been started properly.")
+ }
+ } else {
+ (taskManagerActorSystems, taskManagerActors) match {
+ case (Some(tmSystems), Some(tmActors)) =>
+ val index = numTaskManagers
+ val newTmSystem = startTaskManagerActorSystem(index)
+ val newTmActor = startTaskManager(index, newTmSystem)
+
+ taskManagerActorSystems = Some(tmSystems :+ newTmSystem)
+ taskManagerActors = Some(tmActors :+ newTmActor)
+
+ numTaskManagers += 1
+ case _ => throw new IllegalStateException("Cluster has not been started properly.")
+ }
+ }
+ }
+}
+
+object TestingCluster {
+ val MAX_RESTART_DURATION = new FiniteDuration(2, TimeUnit.MINUTES)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
new file mode 100644
index 0000000..62349db
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import java.util.concurrent.ExecutorService
+
+import akka.actor.ActorRef
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
+import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.metrics.MetricRegistry
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/** JobManager implementation extended by testing messages
+ *
+ */
+class TestingJobManager(
+ flinkConfiguration: Configuration,
+ executorService: ExecutorService,
+ instanceManager: InstanceManager,
+ scheduler: Scheduler,
+ libraryCacheManager: BlobLibraryCacheManager,
+ archive: ActorRef,
+ restartStrategyFactory: RestartStrategyFactory,
+ timeout: FiniteDuration,
+ leaderElectionService: LeaderElectionService,
+ submittedJobGraphs : SubmittedJobGraphStore,
+ checkpointRecoveryFactory : CheckpointRecoveryFactory,
+ savepointStore : SavepointStore,
+ jobRecoveryTimeout : FiniteDuration,
+ metricRegistry : Option[MetricRegistry])
+ extends JobManager(
+ flinkConfiguration,
+ executorService,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archive,
+ restartStrategyFactory,
+ timeout,
+ leaderElectionService,
+ submittedJobGraphs,
+ checkpointRecoveryFactory,
+ savepointStore,
+ jobRecoveryTimeout,
+ metricRegistry)
+ with TestingJobManagerLike {}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
new file mode 100644
index 0000000..5ba2790
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.{ActorRef, Cancellable, Terminated}
+import akka.pattern.{ask, pipe}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
+import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient, RequestClassloadingProps}
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
+import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
+import org.apache.flink.runtime.testingUtils.TestingMessages._
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/** This mixin can be used to decorate a JobManager with messages for testing purpose. */
+trait TestingJobManagerLike extends FlinkActor {
+ that: JobManager =>
+
+ import context._
+
+ import scala.collection.JavaConverters._
+
+ val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
+ val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
+
+ val waitForAllVerticesToBeRunningOrFinished =
+ scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
+
+ var periodicCheck: Option[Cancellable] = None
+
+ val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
+ collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
+
+ val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]()
+
+ val waitForLeader = scala.collection.mutable.HashSet[ActorRef]()
+
+ val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
+ new Ordering[(Int, ActorRef)] {
+ override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1
+ })
+
+ val waitForClient = scala.collection.mutable.HashSet[ActorRef]()
+
+ val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
+
+ var disconnectDisabled = false
+
+ var postStopEnabled = true
+
+ abstract override def postStop(): Unit = {
+ if (postStopEnabled) {
+ super.postStop()
+ } else {
+ // only stop leader election service to revoke the leadership of this JM so that a new JM
+ // can be elected leader
+ leaderElectionService.stop()
+ }
+ }
+
+ abstract override def handleMessage: Receive = {
+ handleTestingMessage orElse super.handleMessage
+ }
+
+ def handleTestingMessage: Receive = {
+ case Alive => sender() ! Acknowledge
+
+ case RequestExecutionGraph(jobID) =>
+ currentJobs.get(jobID) match {
+ case Some((executionGraph, jobInfo)) => sender() ! decorateMessage(
+ ExecutionGraphFound(
+ jobID,
+ executionGraph)
+ )
+
+ case None => archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender())
+ }
+
+ case WaitForAllVerticesToBeRunning(jobID) =>
+ if(checkIfAllVerticesRunning(jobID)){
+ sender() ! decorateMessage(AllVerticesRunning(jobID))
+ }else{
+ val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]())
+ waitForAllVerticesToBeRunning += jobID -> (waiting + sender())
+
+ if(periodicCheck.isEmpty){
+ periodicCheck =
+ Some(
+ context.system.scheduler.schedule(
+ 0 seconds,
+ 200 millis,
+ self,
+ decorateMessage(NotifyListeners)
+ )
+ )
+ }
+ }
+ case WaitForAllVerticesToBeRunningOrFinished(jobID) =>
+ if(checkIfAllVerticesRunningOrFinished(jobID)){
+ sender() ! decorateMessage(AllVerticesRunning(jobID))
+ }else{
+ val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]())
+ waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender())
+
+ if(periodicCheck.isEmpty){
+ periodicCheck =
+ Some(
+ context.system.scheduler.schedule(
+ 0 seconds,
+ 200 millis,
+ self,
+ decorateMessage(NotifyListeners)
+ )
+ )
+ }
+ }
+
+ case NotifyListeners =>
+ for(jobID <- currentJobs.keySet){
+ notifyListeners(jobID)
+ }
+
+ if(waitForAllVerticesToBeRunning.isEmpty && waitForAllVerticesToBeRunningOrFinished.isEmpty) {
+ periodicCheck foreach { _.cancel() }
+ periodicCheck = None
+ }
+
+
+ case NotifyWhenJobRemoved(jobID) =>
+ val gateways = instanceManager.getAllRegisteredInstances.asScala.map(_.getActorGateway)
+
+ val responses = gateways.map{
+ gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean]
+ }
+
+ val jobRemovedOnJobManager = (self ? CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean]
+
+ val allFutures = responses ++ Seq(jobRemovedOnJobManager)
+
+ import context.dispatcher
+ Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo sender()
+
+ case CheckIfJobRemoved(jobID) =>
+ if(currentJobs.contains(jobID)) {
+ context.system.scheduler.scheduleOnce(
+ 200 milliseconds,
+ self,
+ decorateMessage(CheckIfJobRemoved(jobID))
+ )(context.dispatcher, sender())
+ } else {
+ sender() ! decorateMessage(true)
+ }
+
+ case NotifyWhenTaskManagerTerminated(taskManager) =>
+ val waiting = waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set())
+ waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + sender)
+
+ case msg@Terminated(taskManager) =>
+ super.handleMessage(msg)
+
+ waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
+ _ foreach {
+ listener =>
+ listener ! decorateMessage(TaskManagerTerminated(taskManager))
+ }
+ }
+
+ // see shutdown method for reply
+ case NotifyOfComponentShutdown =>
+ waitForShutdown += sender()
+
+ case NotifyWhenAccumulatorChange(jobID) =>
+
+ val (updated, registered) = waitForAccumulatorUpdate.
+ getOrElse(jobID, (false, Set[ActorRef]()))
+ waitForAccumulatorUpdate += jobID -> (updated, registered + sender)
+ sender ! true
+
+ /**
+ * Notification from the task manager that changed accumulator are transferred on next
+ * Hearbeat. We need to keep this state to notify the listeners on next Heartbeat report.
+ */
+ case AccumulatorsChanged(jobID: JobID) =>
+ waitForAccumulatorUpdate.get(jobID) match {
+ case Some((updated, registered)) =>
+ waitForAccumulatorUpdate.put(jobID, (true, registered))
+ case None =>
+ }
+
+ /**
+ * Disabled async processing of accumulator values and send accumulators to the listeners if
+ * we previously received an [[AccumulatorsChanged]] message.
+ */
+ case msg : Heartbeat =>
+ super.handleMessage(msg)
+
+ waitForAccumulatorUpdate foreach {
+ case (jobID, (updated, actors)) if updated =>
+ currentJobs.get(jobID) match {
+ case Some((graph, jobInfo)) =>
+ val flinkAccumulators = graph.getFlinkAccumulators
+ val userAccumulators = graph.aggregateUserAccumulators
+ actors foreach {
+ actor => actor ! UpdatedAccumulators(jobID, flinkAccumulators, userAccumulators)
+ }
+ case None =>
+ }
+ waitForAccumulatorUpdate.put(jobID, (false, actors))
+ case _ =>
+ }
+
+ case RequestWorkingTaskManager(jobID) =>
+ currentJobs.get(jobID) match {
+ case Some((eg, _)) =>
+ if(eg.getAllExecutionVertices.asScala.isEmpty){
+ sender ! decorateMessage(WorkingTaskManager(None))
+ } else {
+ val resource = eg.getAllExecutionVertices.asScala.head.getCurrentAssignedResource
+
+ if(resource == null){
+ sender ! decorateMessage(WorkingTaskManager(None))
+ } else {
+ sender ! decorateMessage(
+ WorkingTaskManager(
+ Some(resource.getTaskManagerActorGateway())
+ )
+ )
+ }
+ }
+ case None => sender ! decorateMessage(WorkingTaskManager(None))
+ }
+
+ case NotifyWhenJobStatus(jobID, state) =>
+ val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID,
+ scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]())
+
+ val listener = jobStatusListener.getOrElse(state, Set[ActorRef]())
+
+ jobStatusListener += state -> (listener + sender)
+
+ case msg@JobStatusChanged(jobID, newJobStatus, _, _) =>
+ super.handleMessage(msg)
+
+ val cleanup = waitForJobStatus.get(jobID) match {
+ case Some(stateListener) =>
+ stateListener.remove(newJobStatus) match {
+ case Some(listeners) =>
+ listeners foreach {
+ _ ! decorateMessage(JobStatusIs(jobID, newJobStatus))
+ }
+ case _ =>
+ }
+ stateListener.isEmpty
+
+ case _ => false
+ }
+
+ if (cleanup) {
+ waitForJobStatus.remove(jobID)
+ }
+
+ case DisableDisconnect =>
+ disconnectDisabled = true
+
+ case DisablePostStop =>
+ postStopEnabled = false
+
+ case RequestSavepoint(savepointPath) =>
+ try {
+ val savepoint = savepointStore.loadSavepoint(savepointPath)
+ sender ! ResponseSavepoint(savepoint)
+ }
+ catch {
+ case e: Exception =>
+ sender ! ResponseSavepoint(null)
+ }
+
+ case msg: Disconnect =>
+ if (!disconnectDisabled) {
+ super.handleMessage(msg)
+
+ val taskManager = sender()
+
+ waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
+ _ foreach {
+ listener =>
+ listener ! decorateMessage(TaskManagerTerminated(taskManager))
+ }
+ }
+ }
+
+ case NotifyWhenLeader =>
+ if (leaderElectionService.hasLeadership) {
+ sender() ! true
+ } else {
+ waitForLeader += sender()
+ }
+
+ case msg: GrantLeadership =>
+ super.handleMessage(msg)
+
+ waitForLeader.foreach(_ ! true)
+
+ waitForLeader.clear()
+
+ case NotifyWhenClientConnects =>
+ waitForClient += sender()
+ sender() ! true
+
+ case msg: RegisterJobClient =>
+ super.handleMessage(msg)
+ waitForClient.foreach(_ ! ClientConnected)
+ case msg: RequestClassloadingProps =>
+ super.handleMessage(msg)
+ waitForClient.foreach(_ ! ClassLoadingPropsDelivered)
+
+ case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) =>
+ if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) {
+ // there are already at least numRegisteredTaskManager registered --> send Acknowledge
+ sender() ! Acknowledge
+ } else {
+ // wait until we see at least numRegisteredTaskManager being registered at the JobManager
+ waitForNumRegisteredTaskManagers += ((numRegisteredTaskManager, sender()))
+ }
+
+ // TaskManager may be registered on these two messages
+ case msg @ (_: RegisterTaskManager) =>
+ super.handleMessage(msg)
+
+ // dequeue all senders which wait for instanceManager.getNumberOfStartedTaskManagers or
+ // fewer registered TaskManagers
+ while (waitForNumRegisteredTaskManagers.nonEmpty &&
+ waitForNumRegisteredTaskManagers.head._1 <=
+ instanceManager.getNumberOfRegisteredTaskManagers) {
+ val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
+ receiver ! Acknowledge
+ }
+ }
+
+ def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
+ currentJobs.get(jobID) match {
+ case Some((eg, _)) =>
+ eg.getAllExecutionVertices.asScala.forall( _.getExecutionState == ExecutionState.RUNNING)
+ case None => false
+ }
+ }
+
+ def checkIfAllVerticesRunningOrFinished(jobID: JobID): Boolean = {
+ currentJobs.get(jobID) match {
+ case Some((eg, _)) =>
+ eg.getAllExecutionVertices.asScala.forall {
+ case vertex =>
+ (vertex.getExecutionState == ExecutionState.RUNNING
+ || vertex.getExecutionState == ExecutionState.FINISHED)
+ }
+ case None => false
+ }
+ }
+
+ def notifyListeners(jobID: JobID): Unit = {
+ if(checkIfAllVerticesRunning(jobID)) {
+ waitForAllVerticesToBeRunning.remove(jobID) match {
+ case Some(listeners) =>
+ for (listener <- listeners) {
+ listener ! decorateMessage(AllVerticesRunning(jobID))
+ }
+ case _ =>
+ }
+ }
+
+ if(checkIfAllVerticesRunningOrFinished(jobID)) {
+ waitForAllVerticesToBeRunningOrFinished.remove(jobID) match {
+ case Some(listeners) =>
+ for (listener <- listeners) {
+ listener ! decorateMessage(AllVerticesRunning(jobID))
+ }
+ case _ =>
+ }
+ }
+ }
+
+ /**
+ * No killing of the VM for testing.
+ */
+ override protected def shutdown(): Unit = {
+ log.info("Shutting down TestingJobManager.")
+ waitForShutdown.foreach(_ ! ComponentShutdown(self))
+ waitForShutdown.clear()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
new file mode 100644
index 0000000..d07c48f
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import java.util.Map
+
+import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
+import org.apache.flink.api.common.accumulators.Accumulator
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
+import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
+import org.apache.flink.runtime.instance.ActorGateway
+import org.apache.flink.runtime.jobgraph.JobStatus
+
+object TestingJobManagerMessages {
+
+ case class RequestExecutionGraph(jobID: JobID)
+
+ sealed trait ResponseExecutionGraph {
+ def jobID: JobID
+ }
+
+ case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph) extends
+ ResponseExecutionGraph
+
+ case class ExecutionGraphNotFound(jobID: JobID) extends ResponseExecutionGraph
+
+ case class WaitForAllVerticesToBeRunning(jobID: JobID)
+ case class WaitForAllVerticesToBeRunningOrFinished(jobID: JobID)
+ case class AllVerticesRunning(jobID: JobID)
+
+ case class NotifyWhenJobRemoved(jobID: JobID)
+
+ case class RequestWorkingTaskManager(jobID: JobID)
+ case class WorkingTaskManager(gatewayOption: Option[ActorGateway])
+
+ case class NotifyWhenJobStatus(jobID: JobID, state: JobStatus)
+ case class JobStatusIs(jobID: JobID, state: JobStatus)
+
+ case object NotifyListeners
+
+ case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
+ case class TaskManagerTerminated(taskManager: ActorRef)
+
+ /**
+ * Registers a listener to receive a message when accumulators changed.
+ * The change must be explicitly triggered by the TestingTaskManager which can receive an
+ * [[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]]
+ * message by a task that changed the accumulators. This message is then
+ * forwarded to the JobManager which will send the accumulators in the [[UpdatedAccumulators]]
+ * message when the next Heartbeat occurs.
+ */
+ case class NotifyWhenAccumulatorChange(jobID: JobID)
+
+ /**
+ * Reports updated accumulators back to the listener.
+ */
+ case class UpdatedAccumulators(jobID: JobID,
+ flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, Accumulator[_,_]]],
+ userAccumulators: Map[String, Accumulator[_,_]])
+
+ /** Notifies the sender when the [[TestingJobManager]] has been elected as the leader
+ *
+ */
+ case object NotifyWhenLeader
+
+ /**
+ * Notifies the sender when the [[TestingJobManager]] receives new clients for jobs
+ */
+ case object NotifyWhenClientConnects
+ /**
+ * Notifes of client connect
+ */
+ case object ClientConnected
+ /**
+ * Notifies when the client has requested class loading information
+ */
+ case object ClassLoadingPropsDelivered
+
+ /**
+ * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]]
+ * message when at least numRegisteredTaskManager have registered at the JobManager.
+ *
+ * @param numRegisteredTaskManager minimum number of registered TMs before the sender is notified
+ */
+ case class NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager: Int)
+
+ /** Disables the post stop method of the [[TestingJobManager]].
+ *
+ * Only the leaderElectionService is stopped in the postStop method call to revoke the leadership
+ */
+ case object DisablePostStop
+
+ /**
+ * Requests a savepoint from the job manager.
+ *
+ * @param savepointPath The path of the savepoint to request.
+ */
+ case class RequestSavepoint(savepointPath: String)
+
+ /**
+ * Response to a savepoint request.
+ *
+ * @param savepoint The requested savepoint or null if none available.
+ */
+ case class ResponseSavepoint(savepoint: Savepoint)
+
+ def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader
+ def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects
+ def getDisablePostStop(): AnyRef = DisablePostStop
+
+ def getClientConnected(): AnyRef = ClientConnected
+ def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
new file mode 100644
index 0000000..48a1ddd
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import org.apache.flink.runtime.jobmanager.MemoryArchivist
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphFound, ExecutionGraphNotFound, RequestExecutionGraph}
+
+/** Memory archivist extended by testing messages
+ *
+ * @param maxEntries number of maximum number of archived jobs
+ */
+class TestingMemoryArchivist(maxEntries: Int) extends MemoryArchivist(maxEntries) {
+
+ override def handleMessage: Receive = {
+ handleTestingMessage orElse super.handleMessage
+ }
+
+ def handleTestingMessage: Receive = {
+ case RequestExecutionGraph(jobID) =>
+ val executionGraph = graphs.get(jobID)
+
+ executionGraph match {
+ case Some(graph) => sender ! decorateMessage(ExecutionGraphFound(jobID, graph))
+ case None => sender ! decorateMessage(ExecutionGraphNotFound(jobID))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
new file mode 100644
index 0000000..91d169a
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
+
+object TestingMessages {
+
+ case class CheckIfJobRemoved(jobID: JobID)
+
+ case object DisableDisconnect
+
+ case object Alive
+
+ def getAlive: AnyRef = Alive
+
+ def getDisableDisconnect: AnyRef = DisableDisconnect
+
+ case object NotifyOfComponentShutdown
+ case class ComponentShutdown(ref: ActorRef)
+
+ def getNotifyOfComponentShutdown(): AnyRef = NotifyOfComponentShutdown
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
new file mode 100644
index 0000000..9b5a147
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
+import org.apache.flink.runtime.memory.MemoryManager
+import org.apache.flink.runtime.taskmanager.{TaskManagerLocation, TaskManager, TaskManagerConfiguration}
+
+import scala.language.postfixOps
+
+/** Subclass of the [[TaskManager]] to support testing messages
+ */
+class TestingTaskManager(
+ config: TaskManagerConfiguration,
+ resourceID: ResourceID,
+ connectionInfo: TaskManagerLocation,
+ memoryManager: MemoryManager,
+ ioManager: IOManager,
+ network: NetworkEnvironment,
+ numberOfSlots: Int,
+ leaderRetrievalService: LeaderRetrievalService)
+ extends TaskManager(
+ config,
+ resourceID,
+ connectionInfo,
+ memoryManager,
+ ioManager,
+ network,
+ numberOfSlots,
+ leaderRetrievalService)
+ with TestingTaskManagerLike {
+
+ def this(
+ config: TaskManagerConfiguration,
+ connectionInfo: TaskManagerLocation,
+ memoryManager: MemoryManager,
+ ioManager: IOManager,
+ network: NetworkEnvironment,
+ numberOfSlots: Int,
+ leaderRetrievalService: LeaderRetrievalService) {
+ this(
+ config,
+ ResourceID.generate(),
+ connectionInfo,
+ memoryManager,
+ ioManager,
+ network,
+ numberOfSlots,
+ leaderRetrievalService)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
new file mode 100644
index 0000000..2498dbe
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.{ActorRef, Terminated}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.messages.JobManagerMessages.{RequestLeaderSessionID, ResponseLeaderSessionID}
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered}
+import org.apache.flink.runtime.messages.TaskMessages.{SubmitTask, TaskInFinalState, UpdateTaskExecutionState}
+import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
+import org.apache.flink.runtime.testingUtils.TestingMessages._
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/** This mixin can be used to decorate a TaskManager with messages for testing purposes. */
+trait TestingTaskManagerLike extends FlinkActor {
+ that: TaskManager =>
+
+ import scala.collection.JavaConverters._
+
+ val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
+ val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
+ val waitForRegisteredAtResourceManager =
+ scala.collection.mutable.HashMap[ActorRef, Set[ActorRef]]()
+ val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
+ val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]()
+
+ /** Map of registered task submit listeners */
+ val registeredSubmitTaskListeners = scala.collection.mutable.HashMap[JobID, ActorRef]()
+
+ val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
+
+ var disconnectDisabled = false
+
+ /**
+ * Handler for testing related messages
+ */
+ abstract override def handleMessage: Receive = {
+ handleTestingMessage orElse super.handleMessage
+ }
+
+ def handleTestingMessage: Receive = {
+ case Alive => sender() ! Acknowledge
+
+ case NotifyWhenTaskIsRunning(executionID) =>
+ Option(runningTasks.get(executionID)) match {
+ case Some(task) if task.getExecutionState == ExecutionState.RUNNING =>
+ sender ! decorateMessage(true)
+
+ case _ =>
+ val listeners = waitForRunning.getOrElse(executionID, Set())
+ waitForRunning += (executionID -> (listeners + sender))
+ }
+
+ case RequestRunningTasks =>
+ sender ! decorateMessage(ResponseRunningTasks(runningTasks.asScala.toMap))
+
+ case NotifyWhenTaskRemoved(executionID) =>
+ Option(runningTasks.get(executionID)) match {
+ case Some(_) =>
+ val set = waitForRemoval.getOrElse(executionID, Set())
+ waitForRemoval += (executionID -> (set + sender))
+ case None =>
+ if(unregisteredTasks.contains(executionID)) {
+ sender ! decorateMessage(true)
+ } else {
+ val set = waitForRemoval.getOrElse(executionID, Set())
+ waitForRemoval += (executionID -> (set + sender))
+ }
+ }
+
+ case TaskInFinalState(executionID) =>
+ super.handleMessage(TaskInFinalState(executionID))
+ waitForRemoval.remove(executionID) match {
+ case Some(actors) => for(actor <- actors) actor ! decorateMessage(true)
+ case None =>
+ }
+
+ unregisteredTasks += executionID
+
+ case NotifyWhenJobRemoved(jobID) =>
+ if(runningTasks.values.asScala.exists(_.getJobID == jobID)){
+ context.system.scheduler.scheduleOnce(
+ 200 milliseconds,
+ self,
+ decorateMessage(CheckIfJobRemoved(jobID)))(
+ context.dispatcher,
+ sender()
+ )
+ }else{
+ sender ! decorateMessage(true)
+ }
+
+ case CheckIfJobRemoved(jobID) =>
+ if(runningTasks.values.asScala.forall(_.getJobID != jobID)){
+ sender ! decorateMessage(true)
+ } else {
+ context.system.scheduler.scheduleOnce(
+ 200 milliseconds,
+ self,
+ decorateMessage(CheckIfJobRemoved(jobID)))(
+ context.dispatcher,
+ sender()
+ )
+ }
+
+ case NotifyWhenJobManagerTerminated(jobManager) =>
+ val waiting = waitForJobManagerToBeTerminated.getOrElse(jobManager.path.name, Set())
+ waitForJobManagerToBeTerminated += jobManager.path.name -> (waiting + sender)
+
+ case RegisterSubmitTaskListener(jobId) =>
+ registeredSubmitTaskListeners.put(jobId, sender())
+
+ case msg@SubmitTask(tdd) =>
+ registeredSubmitTaskListeners.get(tdd.getJobID) match {
+ case Some(listenerRef) =>
+ listenerRef ! ResponseSubmitTaskListener(tdd)
+ case None =>
+ // Nothing to do
+ }
+
+ super.handleMessage(msg)
+
+ /**
+ * Message from task manager that accumulator values changed and need to be reported immediately
+ * instead of lazily through the
+ * [[org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat]] message. We forward this
+ * message to the job manager that it knows it should report to the listeners.
+ */
+ case msg: AccumulatorsChanged =>
+ currentJobManager match {
+ case Some(jobManager) =>
+ jobManager.forward(msg)
+ sendHeartbeatToJobManager()
+ sender ! true
+ case None =>
+ }
+
+ case msg@Terminated(jobManager) =>
+ super.handleMessage(msg)
+
+ waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
+ _ foreach {
+ _ ! decorateMessage(JobManagerTerminated(jobManager))
+ }
+ }
+
+ case msg:Disconnect =>
+ if (!disconnectDisabled) {
+ super.handleMessage(msg)
+
+ val jobManager = sender()
+
+ waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
+ _ foreach {
+ _ ! decorateMessage(JobManagerTerminated(jobManager))
+ }
+ }
+ }
+
+ case DisableDisconnect =>
+ disconnectDisabled = true
+
+ case NotifyOfComponentShutdown =>
+ waitForShutdown += sender()
+
+ case msg @ UpdateTaskExecutionState(taskExecutionState) =>
+ super.handleMessage(msg)
+
+ if(taskExecutionState.getExecutionState == ExecutionState.RUNNING) {
+ waitForRunning.get(taskExecutionState.getID) foreach {
+ _ foreach (_ ! decorateMessage(true))
+ }
+ }
+
+ case RequestLeaderSessionID =>
+ sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
+
+ case NotifyWhenRegisteredAtJobManager(jobManager: ActorRef) =>
+ if(isConnected && jobManager == currentJobManager.get) {
+ sender() ! true
+ } else {
+ val list = waitForRegisteredAtResourceManager.getOrElse(
+ jobManager,
+ Set[ActorRef]())
+
+ waitForRegisteredAtResourceManager += jobManager -> (list + sender())
+ }
+
+ case msg @ (_: AcknowledgeRegistration | _: AlreadyRegistered) =>
+ super.handleMessage(msg)
+
+ val jm = sender()
+
+ waitForRegisteredAtResourceManager.remove(jm).foreach {
+ listeners => listeners.foreach{
+ listener =>
+ listener ! true
+ }
+ }
+ }
+
+ /**
+ * No killing of the VM for testing.
+ */
+ override protected def shutdown(): Unit = {
+ log.info("Shutting down TestingJobManager.")
+ waitForShutdown.foreach(_ ! ComponentShutdown(self))
+ waitForShutdown.clear()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
new file mode 100644
index 0000000..32c3c55
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.taskmanager.Task
+
+/**
+ * Additional messages that the [[TestingTaskManager]] understands.
+ */
+object TestingTaskManagerMessages {
+
+ case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
+
+ case class NotifyWhenTaskIsRunning(executionID: ExecutionAttemptID)
+
+ case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
+ import collection.JavaConverters._
+ def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava
+ }
+
+ case object RequestRunningTasks
+
+ case class NotifyWhenJobManagerTerminated(jobManager: ActorRef)
+
+ case class JobManagerTerminated(jobManager: ActorRef)
+
+ case class NotifyWhenRegisteredAtJobManager(resourceManager: ActorRef)
+
+ /**
+ * Message to give a hint to the task manager that accumulator values were updated in the task.
+ * This message is forwarded to the job manager which knows that it needs to notify listeners
+ * of accumulator updates.
+ */
+ case class AccumulatorsChanged(jobID: JobID)
+
+ /**
+ * Registers a listener for all [[org.apache.flink.runtime.messages.TaskMessages.SubmitTask]]
+ * messages of the given job.
+ *
+ * If a task is submitted with the given job ID the task deployment
+ * descriptor is forwarded to the listener.
+ *
+ * @param jobId The job ID to listen for.
+ */
+ case class RegisterSubmitTaskListener(jobId: JobID)
+
+ /**
+ * A response to a listened job ID containing the submitted task deployment descriptor.
+ *
+ * @param tdd The submitted task deployment descriptor.
+ */
+ case class ResponseSubmitTaskListener(tdd: TaskDeploymentDescriptor)
+
+ // --------------------------------------------------------------------------
+ // Utility methods to allow simpler case object access from Java
+ // --------------------------------------------------------------------------
+
+ def getRequestRunningTasksMessage: AnyRef = {
+ RequestRunningTasks
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index ee1b264..00410cc 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -22,9 +22,10 @@ import java.io._
import java.util.concurrent.TimeUnit
import org.apache.flink.configuration.GlobalConfiguration
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.util.TestLogger
-import org.junit.{AfterClass, BeforeClass, Test, Assert}
+import org.junit.{AfterClass, Assert, BeforeClass, Test}
import scala.concurrent.duration.FiniteDuration
import scala.tools.nsc.Settings
@@ -334,7 +335,7 @@ class ScalaShellITCase extends TestLogger {
}
object ScalaShellITCase {
- var cluster: Option[ForkableFlinkMiniCluster] = None
+ var cluster: Option[LocalFlinkMiniCluster] = None
val parallelism = 4
@BeforeClass
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 8bb440c..f94ff68 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -40,6 +40,7 @@ import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -49,7 +50,6 @@ import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.TestEnvironment;
import org.junit.After;
import org.junit.AfterClass;
@@ -134,7 +134,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
}
}
- private static ForkableFlinkMiniCluster flinkCluster;
+ private static LocalFlinkMiniCluster flinkCluster;
// ------------------------------------------------------------------------
// Cluster Setup (Cassandra & Flink)
@@ -205,7 +205,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
- flinkCluster = new ForkableFlinkMiniCluster(config);
+ flinkCluster = new LocalFlinkMiniCluster(config);
flinkCluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 9e3c33b..c4949ff 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -30,7 +31,6 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.InstantiationUtil;
import org.junit.AfterClass;
@@ -58,7 +58,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
private static KafkaTestEnvironment kafkaServer;
private static Properties standardProps;
- private static ForkableFlinkMiniCluster flink;
+ private static LocalFlinkMiniCluster flink;
@BeforeClass
public static void prepare() throws IOException, ClassNotFoundException {
@@ -88,7 +88,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
- flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+ flink = new LocalFlinkMiniCluster(flinkConfig, false);
flink.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index eddb57c..771db17 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -22,8 +22,8 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
@@ -65,7 +65,7 @@ public abstract class KafkaTestBase extends TestLogger {
protected static Properties standardProps;
- protected static ForkableFlinkMiniCluster flink;
+ protected static LocalFlinkMiniCluster flink;
protected static int flinkPort;
@@ -105,7 +105,7 @@ public abstract class KafkaTestBase extends TestLogger {
flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
- flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+ flink = new LocalFlinkMiniCluster(flinkConfig, false);
flink.start();
flinkPort = flink.getLeaderRPCPort();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
index 3705943..2e452c1 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -21,11 +21,11 @@ import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisEventsGeneratorProducerThread;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,7 +80,7 @@ public class ManualExactlyOnceTest {
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
- ForkableFlinkMiniCluster flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+ LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false);
flink.start();
final int flinkPort = flink.getLeaderRPCPort();
[2/4] flink git commit: [FLINK-4458] Replace ForkableFlinkMiniCluster
by LocalFlinkMiniCluster
Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 934a795..6abea2a 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -27,11 +27,11 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,7 +92,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
- ForkableFlinkMiniCluster flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+ LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false);
flink.start();
final int flinkPort = flink.getLeaderRPCPort();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
index ee415d1..29b3a3e 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
@@ -18,8 +18,9 @@
package org.apache.flink.streaming.api.scala
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.streaming.util.TestStreamEnvironment
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
+import org.apache.flink.test.util.TestBaseUtils
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitSuiteLike
@@ -29,7 +30,7 @@ trait ScalaStreamingMultipleProgramsTestBase
with BeforeAndAfterAll {
val parallelism = 4
- var cluster: Option[ForkableFlinkMiniCluster] = None
+ var cluster: Option[LocalFlinkMiniCluster] = None
override protected def beforeAll(): Unit = {
val cluster = Some(
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml b/flink-test-utils-parent/flink-test-utils/pom.xml
index 2ab52b5..18ecfde 100644
--- a/flink-test-utils-parent/flink-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-test-utils/pom.xml
@@ -79,153 +79,4 @@ under the License.
</dependency>
</dependencies>
-
- <build>
- <plugins>
- <!-- Scala Compiler -->
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.1.4</version>
- <executions>
- <!-- Run scala compiler in the process-resources phase, so that dependencies
- on scala classes can be resolved later in the (Java) compile phase -->
- <execution>
- <id>scala-compile-first</id>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
-
- <!-- Run scala compiler in the process-test-resources phase, so that
- dependencies on scala classes can be resolved later in the (Java) test-compile
- phase -->
- <execution>
- <id>scala-test-compile</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <jvmArgs>
- <jvmArg>-Xms128m</jvmArg>
- <jvmArg>-Xmx512m</jvmArg>
- </jvmArgs>
- </configuration>
- </plugin>
-
- <!-- Eclipse Integration -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>2.8</version>
- <configuration>
- <downloadSources>true</downloadSources>
- <projectnatures>
- <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
- <projectnature>org.eclipse.jdt.core.javanature</projectnature>
- </projectnatures>
- <buildcommands>
- <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
- </buildcommands>
- <classpathContainers>
- <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
- <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
- </classpathContainers>
- <excludes>
- <exclude>org.scala-lang:scala-library</exclude>
- <exclude>org.scala-lang:scala-compiler</exclude>
- </excludes>
- <sourceIncludes>
- <sourceInclude>**/*.scala</sourceInclude>
- <sourceInclude>**/*.java</sourceInclude>
- </sourceIncludes>
- </configuration>
- </plugin>
-
- <!-- Adding scala source directories to build path -->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <!-- Add src/main/scala to eclipse build path -->
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/scala</source>
- </sources>
- </configuration>
- </execution>
- <!-- Add src/test/scala to eclipse build path -->
- <execution>
- <id>add-test-source</id>
- <phase>generate-test-sources</phase>
- <goals>
- <goal>add-test-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/test/scala</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <!-- Scala Code Style, most of the configuration done via plugin management -->
- <plugin>
- <groupId>org.scalastyle</groupId>
- <artifactId>scalastyle-maven-plugin</artifactId>
- <configuration>
- <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
- </configuration>
- </plugin>
-
- </plugins>
- <pluginManagement>
- <plugins>
- <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>
- net.alchim31.maven
- </groupId>
- <artifactId>
- scala-maven-plugin
- </artifactId>
- <versionRange>
- [3.1.4,)
- </versionRange>
- <goals>
- <goal>compile</goal>
- <goal>testCompile</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index c5fbaf0..a478908 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -19,8 +19,8 @@
package org.apache.flink.streaming.util;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.TestBaseUtils;
import org.junit.AfterClass;
@@ -61,7 +61,7 @@ public class StreamingMultipleProgramsTestBase extends AbstractTestBase {
protected static final int DEFAULT_PARALLELISM = 4;
- protected static ForkableFlinkMiniCluster cluster;
+ protected static LocalFlinkMiniCluster cluster;
public StreamingMultipleProgramsTestBase() {
super(new Configuration());
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index c700102..64c68dc 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -20,10 +20,10 @@ package org.apache.flink.streaming.util;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.Preconditions;
/**
@@ -32,10 +32,10 @@ import org.apache.flink.util.Preconditions;
public class TestStreamEnvironment extends StreamExecutionEnvironment {
/** The mini cluster in which this environment executes its jobs */
- private ForkableFlinkMiniCluster executor;
+ private LocalFlinkMiniCluster executor;
- public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
+ public TestStreamEnvironment(LocalFlinkMiniCluster executor, int parallelism) {
this.executor = Preconditions.checkNotNull(executor);
setParallelism(parallelism);
}
@@ -57,7 +57,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
* @param cluster The test cluster to run the test program on.
* @param parallelism The default parallelism for the test programs.
*/
- public static void setAsContext(final ForkableFlinkMiniCluster cluster, final int parallelism) {
+ public static void setAsContext(final LocalFlinkMiniCluster cluster, final int parallelism) {
StreamExecutionEnvironmentFactory factory = new StreamExecutionEnvironmentFactory() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index c2da691..316fd21 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import scala.concurrent.duration.FiniteDuration;
import java.io.File;
@@ -48,7 +49,7 @@ public abstract class AbstractTestBase extends TestBaseUtils {
protected int numTaskManagers = 1;
/** The mini cluster that runs the test programs */
- protected ForkableFlinkMiniCluster executor;
+ protected LocalFlinkMiniCluster executor;
public AbstractTestBase(Configuration config) {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index d7f09bd..4e83245 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -18,6 +18,7 @@
package org.apache.flink.test.util;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.runners.Parameterized;
@@ -72,7 +73,7 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
protected static boolean startWebServer = false;
- protected static ForkableFlinkMiniCluster cluster = null;
+ protected static LocalFlinkMiniCluster cluster = null;
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 4014b80..b774f97 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -32,7 +32,8 @@ import org.apache.commons.io.IOUtils;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import org.apache.hadoop.fs.FileSystem;
@@ -104,7 +105,7 @@ public class TestBaseUtils extends TestLogger {
}
- public static ForkableFlinkMiniCluster startCluster(
+ public static LocalFlinkMiniCluster startCluster(
int numTaskManagers,
int taskManagerNumSlots,
boolean startWebserver,
@@ -126,7 +127,7 @@ public class TestBaseUtils extends TestLogger {
return startCluster(config, singleActorSystem);
}
- public static ForkableFlinkMiniCluster startCluster(
+ public static LocalFlinkMiniCluster startCluster(
Configuration config,
boolean singleActorSystem) throws Exception {
@@ -147,7 +148,7 @@ public class TestBaseUtils extends TestLogger {
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString());
- ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, singleActorSystem);
+ LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, singleActorSystem);
cluster.start();
@@ -155,7 +156,7 @@ public class TestBaseUtils extends TestLogger {
}
- public static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout) throws Exception {
+ public static void stopCluster(LocalFlinkMiniCluster executor, FiniteDuration timeout) throws Exception {
if (logDir != null) {
FileUtils.deleteDirectory(logDir);
}
@@ -169,11 +170,15 @@ public class TestBaseUtils extends TestLogger {
List<Future<Object>> numActiveConnectionsResponseFutures = new ArrayList<>();
for (ActorRef tm : tms) {
- bcVariableManagerResponseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages
- .RequestBroadcastVariablesWithReferences$.MODULE$, new Timeout(timeout)));
-
- numActiveConnectionsResponseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages
- .RequestNumActiveConnections$.MODULE$, new Timeout(timeout)));
+ bcVariableManagerResponseFutures.add(Patterns.ask(
+ tm,
+ TaskManagerMessages.getRequestBroadcastVariablesWithReferences(),
+ new Timeout(timeout)));
+
+ numActiveConnectionsResponseFutures.add(Patterns.ask(
+ tm,
+ TaskManagerMessages.getRequestNumActiveConnections(),
+ new Timeout(timeout)));
}
Future<Iterable<Object>> bcVariableManagerFutureResponses = Futures.sequence(
@@ -182,8 +187,7 @@ public class TestBaseUtils extends TestLogger {
Iterable<Object> responses = Await.result(bcVariableManagerFutureResponses, timeout);
for (Object response : responses) {
- numUnreleasedBCVars += ((TestingTaskManagerMessages
- .ResponseBroadcastVariablesWithReferences) response).number();
+ numUnreleasedBCVars += ((TaskManagerMessages.ResponseBroadcastVariablesWithReferences) response).number();
}
Future<Iterable<Object>> numActiveConnectionsFutureResponses = Futures.sequence(
@@ -192,8 +196,7 @@ public class TestBaseUtils extends TestLogger {
responses = Await.result(numActiveConnectionsFutureResponses, timeout);
for (Object response : responses) {
- numActiveConnections += ((TestingTaskManagerMessages
- .ResponseNumActiveConnections) response).number();
+ numActiveConnections += ((TaskManagerMessages.ResponseNumActiveConnections) response).number();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index 7cb88be..aea8152 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -29,10 +29,11 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
public class TestEnvironment extends ExecutionEnvironment {
- private final ForkableFlinkMiniCluster executor;
+ private final LocalFlinkMiniCluster executor;
private TestEnvironment lastEnv = null;
@@ -46,7 +47,7 @@ public class TestEnvironment extends ExecutionEnvironment {
}
}
- public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
+ public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism) {
this.executor = executor;
setParallelism(parallelism);
@@ -54,7 +55,7 @@ public class TestEnvironment extends ExecutionEnvironment {
getConfig().setCodeAnalysisMode(CodeAnalysisMode.DISABLE);
}
- public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled) {
+ public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled) {
this(executor, parallelism);
if (isObjectReuseEnabled) {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
deleted file mode 100644
index fa3135a..0000000
--- a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util
-
-import java.util.concurrent.TimeoutException
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.pattern.Patterns._
-import akka.pattern.ask
-
-import org.apache.curator.test.TestingCluster
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.jobmanager.{JobManager, HighAvailabilityMode}
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
-import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingMemoryArchivist, TestingTaskManager}
-import org.apache.flink.runtime.testutils.TestingResourceManager
-
-import scala.concurrent.{Await, Future}
-import scala.concurrent.duration._
-
-/**
- * A forkable mini cluster is a special case of the mini cluster, used for parallel test execution
- * on build servers. If multiple tests run in parallel, the cluster picks up the fork number and
- * uses it to avoid port conflicts.
- *
- * @param userConfiguration Configuration object with the user provided configuration values
- * @param singleActorSystem true, if all actors (JobManager and TaskManager) shall be run in the
- * same [[ActorSystem]], otherwise false.
- */
-class ForkableFlinkMiniCluster(
- userConfiguration: Configuration,
- singleActorSystem: Boolean)
- extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) {
-
- def this(userConfiguration: Configuration) = this(userConfiguration, true)
-
- // --------------------------------------------------------------------------
-
- var zookeeperCluster: Option[TestingCluster] = None
-
- override def generateConfiguration(userConfiguration: Configuration): Configuration = {
- val forkNumberString = System.getProperty("forkNumber")
-
- val forkNumber = try {
- Integer.parseInt(forkNumberString)
- }
- catch {
- case e: NumberFormatException => -1
- }
-
- val config = userConfiguration.clone()
-
- if (forkNumber != -1) {
- val jobManagerRPC = 1024 + forkNumber*400
- val taskManagerRPC = 1024 + forkNumber*400 + 100
- val taskManagerData = 1024 + forkNumber*400 + 200
- val resourceManagerRPC = 1024 + forkNumber*400 + 300
-
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRPC)
- config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, taskManagerRPC)
- config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData)
- config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerRPC)
- }
-
- super.generateConfiguration(config)
- }
-
- override def startJobManager(index: Int, actorSystem: ActorSystem): ActorRef = {
- val config = configuration.clone()
-
- val jobManagerName = getJobManagerName(index)
- val archiveName = getArchiveName(index)
-
- val jobManagerPort = config.getInteger(
- ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
-
- if (jobManagerPort > 0) {
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
- }
-
- val (jobManager, _) = JobManager.startJobManagerActors(
- config,
- actorSystem,
- Some(jobManagerName),
- Some(archiveName),
- classOf[TestingJobManager],
- classOf[TestingMemoryArchivist])
-
- jobManager
- }
-
- override def startResourceManager(index: Int, system: ActorSystem): ActorRef = {
- val config = configuration.clone()
-
- val resourceManagerName = getResourceManagerName(index)
-
- val resourceManagerPort = config.getInteger(
- ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
-
- if (resourceManagerPort > 0) {
- config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
- }
-
- val resourceManager = FlinkResourceManager.startResourceManagerActors(
- config,
- system,
- createLeaderRetrievalService(),
- classOf[TestingResourceManager],
- resourceManagerName)
-
- resourceManager
- }
-
- override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
- val config = configuration.clone()
-
- val rpcPort = config.getInteger(
- ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
-
- val dataPort = config.getInteger(
- ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
-
- if (rpcPort > 0) {
- config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index)
- }
- if (dataPort > 0) {
- config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)
- }
-
- val localExecution = numTaskManagers == 1
-
- TaskManager.startTaskManagerComponentsAndActor(
- config,
- ResourceID.generate(),
- system,
- hostname,
- Some(TaskManager.TASK_MANAGER_NAME + index),
- Some(createLeaderRetrievalService()),
- localExecution,
- classOf[TestingTaskManager])
- }
-
- def addTaskManager(): Unit = {
- if (useSingleActorSystem) {
- (jobManagerActorSystems, taskManagerActors) match {
- case (Some(jmSystems), Some(tmActors)) =>
- val index = numTaskManagers
- taskManagerActors = Some(tmActors :+ startTaskManager(index, jmSystems(0)))
- numTaskManagers += 1
- case _ => throw new IllegalStateException("Cluster has not been started properly.")
- }
- } else {
- (taskManagerActorSystems, taskManagerActors) match {
- case (Some(tmSystems), Some(tmActors)) =>
- val index = numTaskManagers
- val newTmSystem = startTaskManagerActorSystem(index)
- val newTmActor = startTaskManager(index, newTmSystem)
-
- taskManagerActorSystems = Some(tmSystems :+ newTmSystem)
- taskManagerActors = Some(tmActors :+ newTmActor)
-
- numTaskManagers += 1
- case _ => throw new IllegalStateException("Cluster has not been started properly.")
- }
- }
- }
-
- def restartLeadingJobManager(): Unit = {
- this.synchronized {
- (jobManagerActorSystems, jobManagerActors) match {
- case (Some(jmActorSystems), Some(jmActors)) =>
- val leader = getLeaderGateway(AkkaUtils.getTimeout(configuration))
- val index = getLeaderIndex(AkkaUtils.getTimeout(configuration))
-
- clearLeader()
-
- val stopped = gracefulStop(leader.actor(), ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
- Await.result(stopped, ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
-
- if(!singleActorSystem) {
- jmActorSystems(index).shutdown()
- jmActorSystems(index).awaitTermination()
- }
-
- val newJobManagerActorSystem = if(!singleActorSystem) {
- startJobManagerActorSystem(index)
- } else {
- jmActorSystems.head
- }
-
- val newJobManagerActor = startJobManager(index, newJobManagerActorSystem)
-
- jobManagerActors = Some(jmActors.patch(index, Seq(newJobManagerActor), 1))
- jobManagerActorSystems = Some(jmActorSystems.patch(
- index,
- Seq(newJobManagerActorSystem),
- 1))
-
- val lrs = createLeaderRetrievalService()
-
- jobManagerLeaderRetrievalService = Some(lrs)
- lrs.start(this)
-
- case _ => throw new Exception("The JobManager of the ForkableFlinkMiniCluster have not " +
- "been started properly.")
- }
- }
- }
-
-
- def restartTaskManager(index: Int): Unit = {
- (taskManagerActorSystems, taskManagerActors) match {
- case (Some(tmActorSystems), Some(tmActors)) =>
- val stopped = gracefulStop(tmActors(index), ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
- Await.result(stopped, ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
-
- if(!singleActorSystem) {
- tmActorSystems(index).shutdown()
- tmActorSystems(index).awaitTermination()
- }
-
- val taskManagerActorSystem = if(!singleActorSystem) {
- startTaskManagerActorSystem(index)
- } else {
- tmActorSystems.head
- }
-
- val taskManagerActor = startTaskManager(index, taskManagerActorSystem)
-
- taskManagerActors = Some(tmActors.patch(index, Seq(taskManagerActor), 1))
- taskManagerActorSystems = Some(tmActorSystems.patch(index, Seq(taskManagerActorSystem), 1))
-
- case _ => throw new Exception("The TaskManager of the ForkableFlinkMiniCluster have not " +
- "been started properly.")
- }
- }
-
- override def start(): Unit = {
- val zookeeperURL = configuration.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "")
-
- zookeeperCluster = if (haMode == HighAvailabilityMode.ZOOKEEPER &&
- zookeeperURL.equals("")) {
- LOG.info("Starting ZooKeeper cluster.")
-
- val testingCluster = new TestingCluster(1)
-
- configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
- testingCluster.getConnectString)
-
- testingCluster.start()
-
- Some(testingCluster)
- } else {
- None
- }
-
- super.start()
- }
-
- override def stop(): Unit = {
- super.stop()
-
- zookeeperCluster.foreach{
- LOG.info("Stopping ZooKeeper cluster.")
- _.close()
- }
- }
-
- def waitForTaskManagersToBeRegisteredAtJobManager(jobManager: ActorRef): Unit = {
- val futures = taskManagerActors.map {
- _.map {
- tm => (tm ? NotifyWhenRegisteredAtJobManager(jobManager))(timeout)
- }
- }.getOrElse(Seq())
-
- try {
- Await.ready(Future.sequence(futures), timeout)
- } catch {
- case t: TimeoutException =>
- throw new Exception("Timeout while waiting for TaskManagers to register at " +
- s"${jobManager.path}")
- }
-
- }
-}
-
-object ForkableFlinkMiniCluster {
-
- val MAX_RESTART_DURATION = 2 minute
-
- val DEFAULT_MINICLUSTER_AKKA_ASK_TIMEOUT = "200 s"
-
- def startCluster(
- numSlots: Int,
- numTaskManagers: Int,
- timeout: String = DEFAULT_MINICLUSTER_AKKA_ASK_TIMEOUT)
- : ForkableFlinkMiniCluster = {
-
- val config = new Configuration()
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers)
- config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
-
- val cluster = new ForkableFlinkMiniCluster(config)
-
- cluster.start()
-
- cluster
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
index cac8451..cc70fee 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -44,7 +44,7 @@ import static org.junit.Assert.fail;
*/
public class AccumulatorErrorITCase {
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
public static void startCluster() {
@@ -53,7 +53,7 @@ public class AccumulatorErrorITCase {
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 49e18e0..624bfff 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -234,7 +234,6 @@ public class AccumulatorLiveITCase {
fail("Wrong accumulator results when map task begins execution.");
}
-
int expectedAccVal = 0;
/* for mapper task */
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 9671fce..8a08f15 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -22,8 +22,8 @@ package org.apache.flink.test.cancelling;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
@@ -39,7 +39,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import static org.apache.flink.runtime.taskmanager.TaskCancelTest.awaitRunning;
import static org.apache.flink.runtime.taskmanager.TaskCancelTest.cancelJob;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
-import org.apache.flink.util.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.junit.After;
@@ -65,7 +64,7 @@ public abstract class CancelingTestBase extends TestLogger {
// --------------------------------------------------------------------------------------------
- protected ForkableFlinkMiniCluster executor;
+ protected LocalFlinkMiniCluster executor;
protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
@@ -88,7 +87,7 @@ public abstract class CancelingTestBase extends TestLogger {
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096);
config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048);
- this.executor = new ForkableFlinkMiniCluster(config, false);
+ this.executor = new LocalFlinkMiniCluster(config, false);
this.executor.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 163fb42..94ff66f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
@@ -35,7 +36,6 @@ import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -60,7 +60,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
private static final int PARALLELISM = 4;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
@@ -71,7 +71,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
config.setString(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT, "60 s");
config.setString(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, "60 s");
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index fa5339d..0aee128 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -41,7 +42,6 @@ import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -76,7 +76,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
private static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024;
private static final int PARALLELISM = 4;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
@@ -95,7 +95,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 8915bff..7f1d7f3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -43,7 +44,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
@@ -73,7 +73,7 @@ public class RescalingITCase extends TestLogger {
private static int slotsPerTaskManager = 2;
private static int numSlots = numTaskManagers * slotsPerTaskManager;
- private static ForkableFlinkMiniCluster cluster;
+ private static TestingCluster cluster;
@ClassRule
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -92,7 +92,7 @@ public class RescalingITCase extends TestLogger {
config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString());
- cluster = new ForkableFlinkMiniCluster(config);
+ cluster = new TestingCluster(config);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 550ba75..7409fe7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -43,7 +43,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CheckpointListener;
@@ -51,6 +50,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestSavepoint;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseSavepoint;
@@ -62,8 +62,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.testutils.junit.RetryOnFailure;
import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
@@ -76,7 +74,6 @@ import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -137,7 +134,7 @@ public class SavepointITCase extends TestLogger {
LOG.info("Created temporary directory: " + tmpDir + ".");
- ForkableFlinkMiniCluster flink = null;
+ TestingCluster flink = null;
try {
// Create a test actor system
@@ -168,7 +165,7 @@ public class SavepointITCase extends TestLogger {
LOG.info("Flink configuration: " + config + ".");
// Start Flink
- flink = new ForkableFlinkMiniCluster(config);
+ flink = new TestingCluster(config);
LOG.info("Starting Flink cluster.");
flink.start();
@@ -261,7 +258,7 @@ public class SavepointITCase extends TestLogger {
LOG.info("JobManager: " + jobManager + ".");
final Throwable[] error = new Throwable[1];
- final ForkableFlinkMiniCluster finalFlink = flink;
+ final TestingCluster finalFlink = flink;
final Multimap<JobVertexID, TaskDeploymentDescriptor> tdds = HashMultimap.create();
new JavaTestKit(testActorSystem) {{
@@ -422,7 +419,7 @@ public class SavepointITCase extends TestLogger {
LOG.info("Created temporary directory: " + tmpDir + ".");
- ForkableFlinkMiniCluster flink = null;
+ TestingCluster flink = null;
List<File> checkpointFiles = new ArrayList<>();
try {
@@ -447,7 +444,7 @@ public class SavepointITCase extends TestLogger {
LOG.info("Flink configuration: " + config + ".");
// Start Flink
- flink = new ForkableFlinkMiniCluster(config);
+ flink = new TestingCluster(config);
LOG.info("Starting Flink cluster.");
flink.start();
@@ -559,7 +556,7 @@ public class SavepointITCase extends TestLogger {
// Test deadline
final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
- ForkableFlinkMiniCluster flink = null;
+ TestingCluster flink = null;
try {
// Flink configuration
@@ -570,7 +567,7 @@ public class SavepointITCase extends TestLogger {
LOG.info("Flink configuration: " + config + ".");
// Start Flink
- flink = new ForkableFlinkMiniCluster(config);
+ flink = new TestingCluster(config);
LOG.info("Starting Flink cluster.");
flink.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index cf15052..6bf511f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -34,7 +35,6 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -80,7 +80,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
private static final int NUM_TASK_SLOTS = 3;
private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
public static void startCluster() {
@@ -91,7 +91,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 ms");
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 67c05e5..5f6cd4a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -20,8 +20,8 @@ package org.apache.flink.test.checkpointing;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.TestLogger;
@@ -43,7 +43,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
protected static final int NUM_TASK_SLOTS = 4;
protected static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
public static void startCluster() {
@@ -53,7 +53,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index 2e6ce78..e424a8d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
@@ -34,7 +35,6 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -71,7 +71,7 @@ public class WindowCheckpointingITCase extends TestLogger {
private static final int PARALLELISM = 4;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
@@ -81,7 +81,7 @@ public class WindowCheckpointingITCase extends TestLogger {
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 8b56d3d..7afafe4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.classloading;
-import akka.pattern.AskTimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
@@ -37,9 +36,9 @@ import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -82,7 +81,7 @@ public class ClassLoaderITCase extends TestLogger {
public static final TemporaryFolder FOLDER = new TemporaryFolder();
- private static ForkableFlinkMiniCluster testCluster;
+ private static TestingCluster testCluster;
private static int parallelism;
@@ -105,7 +104,7 @@ public class ClassLoaderITCase extends TestLogger {
config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY,
FOLDER.newFolder().getAbsoluteFile().toURI().toString());
- testCluster = new ForkableFlinkMiniCluster(config, false);
+ testCluster = new TestingCluster(config, false);
testCluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
index c9059f1..a74ed34 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
@@ -29,8 +29,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -42,7 +42,6 @@ import java.util.concurrent.Semaphore;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
-
/**
* Tests retrieval of a job from a running Flink cluster
*/
@@ -54,7 +53,7 @@ public class JobRetrievalITCase extends TestLogger {
@BeforeClass
public static void before() {
- cluster = new ForkableFlinkMiniCluster(new Configuration(), false);
+ cluster = new TestingCluster(new Configuration(), false);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index 28c2e58..178656d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -26,8 +26,8 @@ import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -47,7 +47,7 @@ public class JobSubmissionFailsITCase {
private static final int NUM_SLOTS = 20;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
private static JobGraph workingJobGraph;
@BeforeClass
@@ -58,7 +58,7 @@ public class JobSubmissionFailsITCase {
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2);
- cluster = new ForkableFlinkMiniCluster(config);
+ cluster = new LocalFlinkMiniCluster(config);
cluster.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
index ca2c156..133ebd0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
@@ -29,8 +29,8 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.Collector;
@@ -52,7 +52,7 @@ public class CustomDistributionITCase extends TestLogger {
// The mini cluster that is shared across tests
// ------------------------------------------------------------------------
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
public static void setup() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
index 34a7eed..e18e82a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
@@ -23,11 +23,10 @@ import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -54,7 +53,7 @@ public class RemoteEnvironmentITCase {
private static final String VALID_STARTUP_TIMEOUT = "100 s";
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
public static void setupCluster() {
@@ -62,7 +61,7 @@ public class RemoteEnvironmentITCase {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index 09b5e7e..a67e6ef 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -49,14 +49,14 @@ public class AutoParallelismITCase {
private static final int SLOTS_PER_TM = 7;
private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
public static void setupCluster() {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TM);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index f30f61f..51f3534 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.types.Value;
import org.junit.AfterClass;
@@ -43,7 +43,7 @@ public class CustomSerializationITCase {
private static final int PARLLELISM = 5;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
public static void startCluster() {
@@ -51,7 +51,7 @@ public class CustomSerializationITCase {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARLLELISM);
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 30);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index 42419fb..06b93ea 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -29,7 +29,7 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.Collector;
@@ -52,7 +52,7 @@ import static org.junit.Assert.*;
@SuppressWarnings("serial")
public class MiscellaneousIssuesITCase {
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
public static void startCluster() {
@@ -61,7 +61,7 @@ public class MiscellaneousIssuesITCase {
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index 12b7a68..a43bab6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -32,8 +32,8 @@ import org.apache.flink.examples.java.clustering.KMeans;
import org.apache.flink.examples.java.clustering.util.KMeansData;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -43,7 +43,7 @@ public class SuccessAfterNetworkBuffersFailureITCase {
@Test
public void testSuccessfulProgramAfterFailure() {
- ForkableFlinkMiniCluster cluster = null;
+ LocalFlinkMiniCluster cluster = null;
try {
Configuration config = new Configuration();
@@ -52,7 +52,7 @@ public class SuccessAfterNetworkBuffersFailureITCase {
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 840);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index 40732df..b99858a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -55,6 +55,7 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseRunningTasks;
@@ -62,7 +63,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.QueryableStateStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
@@ -108,7 +108,7 @@ public class QueryableStateITCase extends TestLogger {
* Shared between all the test. Make sure to have at least NUM_SLOTS
* available after your test finishes, e.g. cancel the job you submitted.
*/
- private static ForkableFlinkMiniCluster cluster;
+ private static TestingCluster cluster;
@BeforeClass
public static void setup() {
@@ -120,7 +120,7 @@ public class QueryableStateITCase extends TestLogger {
config.setInteger(ConfigConstants.QUERYABLE_STATE_CLIENT_NETWORK_THREADS, 1);
config.setInteger(ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS, 1);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new TestingCluster(config, false);
cluster.start(true);
} catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
index 8a45d62..8a43ee4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
@@ -24,11 +24,11 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -49,7 +49,7 @@ public class FastFailuresITCase extends TestLogger {
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
- ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, false);
+ LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
index 0c5d14b..a0d6b58 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
@@ -20,7 +20,7 @@ package org.apache.flink.test.recovery;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.BeforeClass;
public class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCaseBase {
@@ -34,8 +34,8 @@ public class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCas
config.setString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, "1 second");
config.setString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, "100 ms");
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
index 6355a8f..f09efc5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
@@ -20,7 +20,7 @@ package org.apache.flink.test.recovery;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.BeforeClass;
public class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecoveryITCaseBase {
@@ -33,8 +33,8 @@ public class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecover
config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "100 ms");
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
index 004340c..bf7c524 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.AfterClass;
import org.junit.Test;
@@ -42,7 +42,7 @@ import static org.junit.Assert.*;
@SuppressWarnings("serial")
public abstract class SimpleRecoveryITCaseBase {
- protected static ForkableFlinkMiniCluster cluster;
+ protected static LocalFlinkMiniCluster cluster;
@AfterClass
public static void teardownCluster() {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 6c621ac..5d29905 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
@@ -65,7 +65,7 @@ public class TaskManagerFailureRecoveryITCase {
final int PARALLELISM = 4;
- ForkableFlinkMiniCluster cluster = null;
+ LocalFlinkMiniCluster cluster = null;
ActorSystem additionalSystem = null;
try {
@@ -78,7 +78,7 @@ public class TaskManagerFailureRecoveryITCase {
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "20 s");
config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 20);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index 7710f06..0b008eb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -26,8 +26,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.Collector;
@@ -63,7 +63,7 @@ public class IPv6HostnamesITCase extends TestLogger {
- ForkableFlinkMiniCluster flink = null;
+ LocalFlinkMiniCluster flink = null;
try {
final String addressString = ipv6address.getHostAddress();
log.info("Test will use IPv6 address " + addressString + " for connection tests");
@@ -75,7 +75,7 @@ public class IPv6HostnamesITCase extends TestLogger {
conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
conf.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
- flink = new ForkableFlinkMiniCluster(conf, false);
+ flink = new LocalFlinkMiniCluster(conf, false);
flink.start();
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(addressString, flink.getLeaderRPCPort());
[4/4] flink git commit: [FLINK-4458] Replace ForkableFlinkMiniCluster
by LocalFlinkMiniCluster
Posted by tr...@apache.org.
[FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFlinkMiniCluster
Rename _configuration to originalConfiguration
Remove testing classes from main scope in flink-runtime
Previously, the ForkableFlinkMiniCluster which resided in flink-test-utils required
these files to be in the main scope of flink-runtime. With the removal of the
ForkableFlinkMiniCluster, these classes are now no longer needed and can be moved
back to the test scope.
This closes #2450.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02b852e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02b852e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02b852e3
Branch: refs/heads/master
Commit: 02b852e3571e46f25fdfc79f43ceb726ddff9ba7
Parents: 920cda4
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Aug 31 17:58:09 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Sep 8 17:17:28 2016 +0200
----------------------------------------------------------------------
.../api/avro/AvroExternalJarProgramITCase.java | 7 +-
.../flink/contrib/streaming/CollectITCase.java | 4 +-
.../operations/DegreesWithExceptionITCase.java | 6 +-
.../ReduceOnEdgesWithExceptionITCase.java | 6 +-
.../ReduceOnNeighborsWithExceptionITCase.java | 6 +-
.../apache/flink/ml/util/FlinkTestBase.scala | 11 +-
.../clusterframework/FlinkResourceManager.java | 13 +-
.../testutils/TestingResourceManager.java | 137 ------
.../flink/runtime/jobmanager/JobManager.scala | 45 +-
.../runtime/messages/TaskManagerMessages.scala | 26 ++
.../runtime/minicluster/FlinkMiniCluster.scala | 73 +++-
.../minicluster/LocalFlinkMiniCluster.scala | 235 ++++++++---
.../flink/runtime/taskmanager/TaskManager.scala | 130 ++++--
.../testingUtils/TestingJobManager.scala | 72 ----
.../testingUtils/TestingJobManagerLike.scala | 417 -------------------
.../TestingJobManagerMessages.scala | 133 ------
.../testingUtils/TestingMemoryArchivist.scala | 43 --
.../runtime/testingUtils/TestingMessages.scala | 40 --
.../testingUtils/TestingTaskManager.scala | 70 ----
.../testingUtils/TestingTaskManagerLike.scala | 248 -----------
.../TestingTaskManagerMessages.scala | 94 -----
.../LeaderElectionRetrievalTestingCluster.java | 3 +-
.../testutils/TestingResourceManager.java | 137 ++++++
.../runtime/testingUtils/TestingCluster.scala | 322 ++++++++------
.../testingUtils/TestingJobManager.scala | 71 ++++
.../testingUtils/TestingJobManagerLike.scala | 417 +++++++++++++++++++
.../TestingJobManagerMessages.scala | 132 ++++++
.../testingUtils/TestingMemoryArchivist.scala | 43 ++
.../runtime/testingUtils/TestingMessages.scala | 40 ++
.../testingUtils/TestingTaskManager.scala | 70 ++++
.../testingUtils/TestingTaskManagerLike.scala | 234 +++++++++++
.../TestingTaskManagerMessages.scala | 82 ++++
.../flink/api/scala/ScalaShellITCase.scala | 7 +-
.../cassandra/CassandraConnectorITCase.java | 6 +-
.../kafka/KafkaShortRetentionTestBase.java | 6 +-
.../connectors/kafka/KafkaTestBase.java | 6 +-
.../manualtests/ManualExactlyOnceTest.java | 4 +-
...nualExactlyOnceWithStreamReshardingTest.java | 4 +-
...ScalaStreamingMultipleProgramsTestBase.scala | 5 +-
.../flink-test-utils/pom.xml | 149 -------
.../util/StreamingMultipleProgramsTestBase.java | 4 +-
.../streaming/util/TestStreamEnvironment.java | 8 +-
.../flink/test/util/AbstractTestBase.java | 3 +-
.../test/util/MultipleProgramsTestBase.java | 3 +-
.../apache/flink/test/util/TestBaseUtils.java | 31 +-
.../apache/flink/test/util/TestEnvironment.java | 7 +-
.../test/util/ForkableFlinkMiniCluster.scala | 335 ---------------
.../accumulators/AccumulatorErrorITCase.java | 6 +-
.../accumulators/AccumulatorLiveITCase.java | 1 -
.../test/cancelling/CancelingTestBase.java | 7 +-
.../EventTimeAllWindowCheckpointingITCase.java | 6 +-
.../EventTimeWindowCheckpointingITCase.java | 6 +-
.../test/checkpointing/RescalingITCase.java | 6 +-
.../test/checkpointing/SavepointITCase.java | 19 +-
.../StreamCheckpointNotifierITCase.java | 6 +-
.../StreamFaultToleranceTestBase.java | 6 +-
.../WindowCheckpointingITCase.java | 6 +-
.../test/classloading/ClassLoaderITCase.java | 7 +-
.../clients/examples/JobRetrievalITCase.java | 5 +-
.../JobSubmissionFailsITCase.java | 6 +-
.../CustomDistributionITCase.java | 4 +-
.../RemoteEnvironmentITCase.java | 7 +-
.../flink/test/misc/AutoParallelismITCase.java | 6 +-
.../test/misc/CustomSerializationITCase.java | 6 +-
.../test/misc/MiscellaneousIssuesITCase.java | 6 +-
...SuccessAfterNetworkBuffersFailureITCase.java | 6 +-
.../flink/test/query/QueryableStateITCase.java | 6 +-
.../flink/test/recovery/FastFailuresITCase.java | 4 +-
...SimpleRecoveryFailureRateStrategyITBase.java | 6 +-
...RecoveryFixedDelayRestartStrategyITBase.java | 6 +-
.../test/recovery/SimpleRecoveryITCaseBase.java | 4 +-
.../TaskManagerFailureRecoveryITCase.java | 6 +-
.../flink/test/runtime/IPv6HostnamesITCase.java | 6 +-
.../ZooKeeperLeaderElectionITCase.java | 56 +--
.../test/streaming/runtime/TimestampITCase.java | 6 +-
.../flink/test/web/WebFrontendITCase.java | 6 +-
.../jobmanager/JobManagerFailsITCase.scala | 8 +-
.../taskmanager/TaskManagerFailsITCase.scala | 12 +-
flink-yarn-tests/pom.xml | 8 +
.../org/apache/flink/yarn/YarnTestBase.java | 1 -
tools/maven/scalastyle-config.xml | 2 +-
81 files changed, 2037 insertions(+), 2167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index 29a7e58..1030ff8 100644
--- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -25,12 +25,11 @@ import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.Assert;
import org.junit.Test;
-
public class AvroExternalJarProgramITCase {
private static final String JAR_FILE = "maven-test-jar.jar";
@@ -40,12 +39,12 @@ public class AvroExternalJarProgramITCase {
@Test
public void testExternalProgram() {
- ForkableFlinkMiniCluster testMiniCluster = null;
+ LocalFlinkMiniCluster testMiniCluster = null;
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
- testMiniCluster = new ForkableFlinkMiniCluster(config, false);
+ testMiniCluster = new LocalFlinkMiniCluster(config, false);
testMiniCluster.start();
String jarFile = JAR_FILE;
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
index 10ea85c..d691621 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
@@ -19,9 +19,9 @@
package org.apache.flink.contrib.streaming;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.Test;
@@ -36,7 +36,7 @@ public class CollectITCase {
@Test
public void testCollect() throws Exception {
- final ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(new Configuration(), false);
+ final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new Configuration(), false);
try {
cluster.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
index 551a97b..02eea07 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.types.LongValue;
import org.junit.AfterClass;
@@ -39,7 +39,7 @@ public class DegreesWithExceptionITCase {
private static final int PARALLELISM = 4;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
@@ -47,7 +47,7 @@ public class DegreesWithExceptionITCase {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
index 56a0a59..666f7ef 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
@@ -30,7 +30,7 @@ import org.apache.flink.graph.EdgesFunctionWithVertexValue;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -42,7 +42,7 @@ public class ReduceOnEdgesWithExceptionITCase {
private static final int PARALLELISM = 4;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
@@ -50,7 +50,7 @@ public class ReduceOnEdgesWithExceptionITCase {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
index 7458e08..0bbdc84 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
@@ -31,7 +31,7 @@ import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
import org.apache.flink.graph.ReduceNeighborsFunction;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -43,7 +43,7 @@ public class ReduceOnNeighborsWithExceptionITCase {
private static final int PARALLELISM = 4;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
@@ -51,7 +51,7 @@ public class ReduceOnNeighborsWithExceptionITCase {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
index fb98f24..6353d6a 100644
--- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
+++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
@@ -18,14 +18,15 @@
package org.apache.flink.ml.util
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils, TestEnvironment}
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+import org.apache.flink.test.util.{TestBaseUtils, TestEnvironment}
import org.scalatest.{BeforeAndAfter, Suite}
-/** Mixin to start and stop a ForkableFlinkMiniCluster automatically for Scala based tests.
+/** Mixin to start and stop a LocalFlinkMiniCluster automatically for Scala based tests.
* Additionally a TestEnvironment with the started cluster is created and set as the default
* [[org.apache.flink.api.java.ExecutionEnvironment]].
*
- * This mixin starts a ForkableFlinkMiniCluster with one TaskManager and a number of slots given
+ * This mixin starts a LocalFlinkMiniCluster with one TaskManager and a number of slots given
* by parallelism. This value can be overridden in a sub class in order to start the cluster
* with a different number of slots.
*
@@ -37,7 +38,7 @@ import org.scalatest.{BeforeAndAfter, Suite}
* @example
* {{{
* def testSomething: Unit = {
- * // Obtain TestEnvironment with started ForkableFlinkMiniCluster
+ * // Obtain TestEnvironment with started LocalFlinkMiniCluster
* val env = ExecutionEnvironment.getExecutionEnvironment
*
* env.fromCollection(...)
@@ -50,7 +51,7 @@ import org.scalatest.{BeforeAndAfter, Suite}
trait FlinkTestBase extends BeforeAndAfter {
that: Suite =>
- var cluster: Option[ForkableFlinkMiniCluster] = None
+ var cluster: Option[LocalFlinkMiniCluster] = None
val parallelism = 4
before {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index 95be084..7ea286d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -767,8 +767,19 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
Class<? extends FlinkResourceManager<?>> resourceManagerClass,
String resourceManagerActorName) {
- Props resourceMasterProps = Props.create(resourceManagerClass, configuration, leaderRetriever);
+ Props resourceMasterProps = getResourceManagerProps(
+ resourceManagerClass,
+ configuration,
+ leaderRetriever);
return actorSystem.actorOf(resourceMasterProps, resourceManagerActorName);
}
+
+ public static Props getResourceManagerProps(
+ Class<? extends FlinkResourceManager> resourceManagerClass,
+ Configuration configuration,
+ LeaderRetrievalService leaderRetrievalService) {
+
+ return Props.create(resourceManagerClass, configuration, leaderRetrievalService);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
deleted file mode 100644
index 495cacd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testutils;
-
-import akka.actor.ActorRef;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.testingUtils.TestingMessages;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-
-/**
- * A testing resource manager which may alter the default standalone resource master's behavior.
- */
-public class TestingResourceManager extends StandaloneResourceManager {
-
- /** Set of Actors which want to be informed of a connection to the job manager */
- private Set<ActorRef> waitForResourceManagerConnected = new HashSet<>();
-
- /** Set of Actors which want to be informed of a shutdown */
- private Set<ActorRef> waitForShutdown = new HashSet<>();
-
- /** Flag to signal a connection to the JobManager */
- private boolean isConnected = false;
-
- public TestingResourceManager(Configuration flinkConfig, LeaderRetrievalService leaderRetriever) {
- super(flinkConfig, leaderRetriever);
- }
-
- /**
- * Overwrite messages here if desired
- */
- @Override
- protected void handleMessage(Object message) {
-
- if (message instanceof GetRegisteredResources) {
- sender().tell(new GetRegisteredResourcesReply(getStartedTaskManagers()), self());
- } else if (message instanceof FailResource) {
- ResourceID resourceID = ((FailResource) message).resourceID;
- notifyWorkerFailed(resourceID, "Failed for test case.");
-
- } else if (message instanceof NotifyWhenResourceManagerConnected) {
- if (isConnected) {
- sender().tell(
- Messages.getAcknowledge(),
- self());
- } else {
- waitForResourceManagerConnected.add(sender());
- }
- } else if (message instanceof RegisterResourceManagerSuccessful) {
- super.handleMessage(message);
-
- isConnected = true;
-
- for (ActorRef ref : waitForResourceManagerConnected) {
- ref.tell(
- Messages.getAcknowledge(),
- self());
- }
- waitForResourceManagerConnected.clear();
-
- } else if (message instanceof TestingMessages.NotifyOfComponentShutdown$) {
- waitForShutdown.add(sender());
- } else if (message instanceof TestingMessages.Alive$) {
- sender().tell(Messages.getAcknowledge(), self());
- } else {
- super.handleMessage(message);
- }
- }
-
- /**
- * Testing messages
- */
- public static class GetRegisteredResources {}
-
- public static class GetRegisteredResourcesReply {
-
- public Collection<ResourceID> resources;
-
- public GetRegisteredResourcesReply(Collection<ResourceID> resources) {
- this.resources = resources;
- }
-
- }
-
- /**
- * Fails all resources that the resource manager has registered
- */
- public static class FailResource {
-
- public ResourceID resourceID;
-
- public FailResource(ResourceID resourceID) {
- this.resourceID = resourceID;
- }
- }
-
- /**
- * The sender of this message will be informed of a connection to the Job Manager
- */
- public static class NotifyWhenResourceManagerConnected {}
-
- /**
- * Inform registered listeners about a shutdown of the application.
- */
- @Override
- protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
- for (ActorRef listener : waitForShutdown) {
- listener.tell(new TestingMessages.ComponentShutdown(self()), self());
- }
- waitForShutdown.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 88af604..f67be0e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa
import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.executiongraph.{StatusListenerMessenger, ExecutionGraph, ExecutionJobVertex}
+import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex, StatusListenerMessenger}
import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
@@ -2721,7 +2721,7 @@ object JobManager {
configuration,
None)
- val archiveProps = Props(archiveClass, archiveCount)
+ val archiveProps = getArchiveProps(archiveClass, archiveCount)
// start the archiver with the given name, or without (avoid name conflicts)
val archive: ActorRef = archiveActorName match {
@@ -2729,7 +2729,7 @@ object JobManager {
case None => actorSystem.actorOf(archiveProps)
}
- val jobManagerProps = Props(
+ val jobManagerProps = getJobManagerProps(
jobManagerClass,
configuration,
executorService,
@@ -2754,6 +2754,45 @@ object JobManager {
(jobManager, archive)
}
+ def getArchiveProps(archiveClass: Class[_ <: MemoryArchivist], archiveCount: Int): Props = {
+ Props(archiveClass, archiveCount)
+ }
+
+ def getJobManagerProps(
+ jobManagerClass: Class[_ <: JobManager],
+ configuration: Configuration,
+ executorService: ExecutorService,
+ instanceManager: InstanceManager,
+ scheduler: FlinkScheduler,
+ libraryCacheManager: BlobLibraryCacheManager,
+ archive: ActorRef,
+ restartStrategyFactory: RestartStrategyFactory,
+ timeout: FiniteDuration,
+ leaderElectionService: LeaderElectionService,
+ submittedJobGraphStore: SubmittedJobGraphStore,
+ checkpointRecoveryFactory: CheckpointRecoveryFactory,
+ savepointStore: SavepointStore,
+ jobRecoveryTimeout: FiniteDuration,
+ metricsRegistry: Option[FlinkMetricRegistry]): Props = {
+
+ Props(
+ jobManagerClass,
+ configuration,
+ executorService,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archive,
+ restartStrategyFactory,
+ timeout,
+ leaderElectionService,
+ submittedJobGraphStore,
+ checkpointRecoveryFactory,
+ savepointStore,
+ jobRecoveryTimeout,
+ metricsRegistry)
+ }
+
// --------------------------------------------------------------------------
// Resolving the JobManager endpoint
// --------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index 2d99245..b433015 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -130,6 +130,16 @@ object TaskManagerMessages {
*/
case class RequestTaskManagerLog(requestType : LogTypeRequest)
+ /** Requests the number of active connections at the ConnectionManager */
+ case object RequestNumActiveConnections
+
+ case class ResponseNumActiveConnections(number: Int)
+
+ /** Requests the number of broadcast variables with references */
+ case object RequestBroadcastVariablesWithReferences
+
+ case class ResponseBroadcastVariablesWithReferences(number: Int)
+
// --------------------------------------------------------------------------
// Utility getters for case objects to simplify access from Java
@@ -166,4 +176,20 @@ object TaskManagerMessages {
def getRequestTaskManagerStdout(): AnyRef = {
RequestTaskManagerLog(StdOutFileRequest)
}
+
+ /**
+ * Accessor for the case object instance, to simplify Java interoperability.
+ * @return The RequestBroadcastVariablesWithReferences case object instance.
+ */
+ def getRequestBroadcastVariablesWithReferences(): RequestBroadcastVariablesWithReferences.type = {
+ RequestBroadcastVariablesWithReferences
+ }
+
+ /**
+ * Accessor for the case object instance, to simplify Java interoperability.
+ * @return The RequestNumActiveConnections case object instance.
+ */
+ def getRequestNumActiveConnections(): RequestNumActiveConnections.type = {
+ RequestNumActiveConnections
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index a547d25..0178bd3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -69,7 +69,7 @@ abstract class FlinkMiniCluster(
ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
InetAddress.getByName("localhost").getHostAddress())
- val configuration = generateConfiguration(userConfiguration)
+ protected val originalConfiguration = generateConfiguration(userConfiguration)
/** Future to the [[ActorGateway]] of the current leader */
var leaderGateway: Promise[ActorGateway] = Promise()
@@ -79,16 +79,16 @@ abstract class FlinkMiniCluster(
/** Future lock */
val futureLock = new Object()
-
+
implicit val executionContext = ExecutionContext.global
- implicit val timeout = AkkaUtils.getTimeout(configuration)
+ implicit val timeout = AkkaUtils.getTimeout(originalConfiguration)
- val haMode = HighAvailabilityMode.fromConfig(configuration)
+ val haMode = HighAvailabilityMode.fromConfig(originalConfiguration)
val numJobManagers = getNumberOfJobManagers
- var numTaskManagers = configuration.getInteger(
+ var numTaskManagers = originalConfiguration.getInteger(
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)
@@ -105,6 +105,22 @@ abstract class FlinkMiniCluster(
private var isRunning = false
+ def configuration: Configuration = {
+ if (originalConfiguration.getInteger(
+ ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) == 0) {
+ val leaderConfiguration = new Configuration(originalConfiguration)
+
+ val leaderPort = getLeaderRPCPort
+
+ leaderConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, leaderPort)
+
+ leaderConfiguration
+ } else {
+ originalConfiguration
+ }
+ }
+
// --------------------------------------------------------------------------
// Abstract Methods
// --------------------------------------------------------------------------
@@ -125,7 +141,7 @@ abstract class FlinkMiniCluster(
if(haMode == HighAvailabilityMode.NONE) {
1
} else {
- configuration.getInteger(
+ originalConfiguration.getInteger(
ConfigConstants.LOCAL_NUMBER_JOB_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER
)
@@ -136,7 +152,7 @@ abstract class FlinkMiniCluster(
if(haMode == HighAvailabilityMode.NONE) {
1
} else {
- configuration.getInteger(
+ originalConfiguration.getInteger(
ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER
)
@@ -177,40 +193,55 @@ abstract class FlinkMiniCluster(
Await.result(indexFuture, timeout)
}
+ def getLeaderRPCPort: Int = {
+ val index = getLeaderIndex(timeout)
+
+ jobManagerActorSystems match {
+ case Some(jmActorSystems) =>
+ AkkaUtils.getAddress(jmActorSystems(index)).port match {
+ case Some(p) => p
+ case None => -1
+ }
+
+ case None => throw new Exception("The JobManager of the LocalFlinkMiniCluster has not been " +
+ "started properly.")
+ }
+ }
+
def getResourceManagerAkkaConfig(index: Int): Config = {
if (useSingleActorSystem) {
- AkkaUtils.getAkkaConfig(configuration, None)
+ AkkaUtils.getAkkaConfig(originalConfiguration, None)
} else {
- val port = configuration.getInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
+ val port = originalConfiguration.getInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
+ ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
val resolvedPort = if(port != 0) port + index else port
- AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
+ AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort)))
}
}
def getJobManagerAkkaConfig(index: Int): Config = {
if (useSingleActorSystem) {
- AkkaUtils.getAkkaConfig(configuration, None)
+ AkkaUtils.getAkkaConfig(originalConfiguration, None)
}
else {
- val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+ val port = originalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
val resolvedPort = if(port != 0) port + index else port
- AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
+ AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort)))
}
}
def getTaskManagerAkkaConfig(index: Int): Config = {
- val port = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
+ val port = originalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
val resolvedPort = if(port != 0) port + index else port
- AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
+ AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort)))
}
/**
@@ -257,7 +288,7 @@ abstract class FlinkMiniCluster(
"The FlinkMiniCluster has not been started yet.")
}
} else {
- JobClient.startJobClientActorSystem(configuration)
+ JobClient.startJobClientActorSystem(originalConfiguration)
}
}
@@ -320,7 +351,7 @@ abstract class FlinkMiniCluster(
val jobManagerAkkaURL = AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0))
- webMonitor = startWebServer(configuration, jmActorSystems(0), jobManagerAkkaURL)
+ webMonitor = startWebServer(originalConfiguration, jmActorSystems(0), jobManagerAkkaURL)
if(waitForTaskManagerRegistration) {
waitForTaskManagersToBeRegistered()
@@ -528,7 +559,7 @@ abstract class FlinkMiniCluster(
new StandaloneLeaderRetrievalService(
AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0)))
} else {
- ZooKeeperUtils.createLeaderRetrievalService(configuration)
+ ZooKeeperUtils.createLeaderRetrievalService(originalConfiguration)
}
case _ => throw new Exception("The FlinkMiniCluster has not been started properly.")
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index d30c047..cac5d91 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -18,23 +18,36 @@
package org.apache.flink.runtime.minicluster
-import akka.actor.{ActorRef, ActorSystem}
-import org.apache.flink.api.common.JobID
+import java.util.concurrent.ExecutorService
+import akka.actor.{ActorRef, ActorSystem, Props}
+import org.apache.flink.api.common.JobID
import org.apache.flink.api.common.io.FileOutputFormat
import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
-import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.clusterframework.types.{ResourceID, ResourceIDRetrievable}
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.io.network.netty.NettyConfig
-import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
+import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
+import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.messages.JobManagerMessages
-import org.apache.flink.runtime.messages.JobManagerMessages.{CancellationFailure, CancellationResponse, StoppingFailure, StoppingResponse, RunningJobsStatus, RunningJobs}
-import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse}
+import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation}
import org.apache.flink.runtime.util.EnvironmentInformation
import scala.concurrent.Await
+import scala.concurrent.duration.FiniteDuration
/**
* Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same
@@ -65,8 +78,25 @@ class LocalFlinkMiniCluster(
config
}
+ //------------------------------------------------------------------------------------------------
+ // Actor classes
+ //------------------------------------------------------------------------------------------------
+
+ val jobManagerClass: Class[_ <: JobManager] = classOf[JobManager]
+
+ val taskManagerClass: Class[_ <: TaskManager] = classOf[TaskManager]
+
+ val memoryArchivistClass: Class[_ <: MemoryArchivist] = classOf[MemoryArchivist]
+
+ val resourceManagerClass: Class[_ <: FlinkResourceManager[_ <: ResourceIDRetrievable]] =
+ classOf[StandaloneResourceManager]
+
+ //------------------------------------------------------------------------------------------------
+ // Start methods for the distributed components
+ //------------------------------------------------------------------------------------------------
+
override def startJobManager(index: Int, system: ActorSystem): ActorRef = {
- val config = configuration.clone()
+ val config = originalConfiguration.clone()
val jobManagerName = getJobManagerName(index)
val archiveName = getArchiveName(index)
@@ -79,19 +109,48 @@ class LocalFlinkMiniCluster(
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
}
- val (jobManager, _) = JobManager.startJobManagerActors(
- config,
- system,
- Some(jobManagerName),
- Some(archiveName),
- classOf[JobManager],
- classOf[MemoryArchivist])
-
- jobManager
+ val (executorService,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ restartStrategyFactory,
+ timeout,
+ archiveCount,
+ leaderElectionService,
+ submittedJobGraphStore,
+ checkpointRecoveryFactory,
+ savepointStore,
+ jobRecoveryTimeout,
+ metricsRegistry) = JobManager.createJobManagerComponents(config, createLeaderElectionService())
+
+ val archive = system.actorOf(
+ getArchiveProps(
+ memoryArchivistClass,
+ archiveCount),
+ archiveName)
+
+ system.actorOf(
+ getJobManagerProps(
+ jobManagerClass,
+ config,
+ executorService,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archive,
+ restartStrategyFactory,
+ timeout,
+ leaderElectionService,
+ submittedJobGraphStore,
+ checkpointRecoveryFactory,
+ savepointStore,
+ jobRecoveryTimeout,
+ metricsRegistry),
+ jobManagerName)
}
override def startResourceManager(index: Int, system: ActorSystem): ActorRef = {
- val config = configuration.clone()
+ val config = originalConfiguration.clone()
val resourceManagerName = getResourceManagerName(index)
@@ -103,18 +162,16 @@ class LocalFlinkMiniCluster(
config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
}
- val resourceManager = FlinkResourceManager.startResourceManagerActors(
+ val resourceManagerProps = getResourceManagerProps(
+ resourceManagerClass,
config,
- system,
- createLeaderRetrievalService(),
- classOf[StandaloneResourceManager],
- resourceManagerName)
+ createLeaderRetrievalService())
- resourceManager
+ system.actorOf(resourceManagerProps, resourceManagerName)
}
override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
- val config = configuration.clone()
+ val config = originalConfiguration.clone()
val rpcPort = config.getInteger(
ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
@@ -138,32 +195,115 @@ class LocalFlinkMiniCluster(
} else {
TaskManager.TASK_MANAGER_NAME
}
-
- TaskManager.startTaskManagerComponentsAndActor(
+
+ val resourceID = ResourceID.generate() // generate random resource id
+
+ val (taskManagerConfig,
+ taskManagerLocation,
+ memoryManager,
+ ioManager,
+ network,
+ leaderRetrievalService) = TaskManager.createTaskManagerComponents(
config,
- ResourceID.generate(), // generate random resource id
- system,
+ resourceID,
hostname, // network interface to bind to
- Some(taskManagerActorName), // actor name
- Some(createLeaderRetrievalService()), // job manager leader retrieval service
localExecution, // start network stack?
- classOf[TaskManager])
+ Some(createLeaderRetrievalService()))
+
+ val props = getTaskManagerProps(
+ taskManagerClass,
+ taskManagerConfig,
+ resourceID,
+ taskManagerLocation,
+ memoryManager,
+ ioManager,
+ network,
+ leaderRetrievalService)
+
+ system.actorOf(props, taskManagerActorName)
}
- def getLeaderRPCPort: Int = {
- val index = getLeaderIndex(timeout)
+ //------------------------------------------------------------------------------------------------
+ // Props for the distributed components
+ //------------------------------------------------------------------------------------------------
- jobManagerActorSystems match {
- case Some(jmActorSystems) =>
- AkkaUtils.getAddress(jmActorSystems(index)).port match {
- case Some(p) => p
- case None => -1
- }
+ def getArchiveProps(archiveClass: Class[_ <: MemoryArchivist], archiveCount: Int): Props = {
+ JobManager.getArchiveProps(archiveClass, archiveCount)
+ }
- case None => throw new Exception("The JobManager of the LocalFlinkMiniCluster has not been " +
- "started properly.")
- }
+ def getJobManagerProps(
+ jobManagerClass: Class[_ <: JobManager],
+ configuration: Configuration,
+ executorService: ExecutorService,
+ instanceManager: InstanceManager,
+ scheduler: Scheduler,
+ libraryCacheManager: BlobLibraryCacheManager,
+ archive: ActorRef,
+ restartStrategyFactory: RestartStrategyFactory,
+ timeout: FiniteDuration,
+ leaderElectionService: LeaderElectionService,
+ submittedJobGraphStore: SubmittedJobGraphStore,
+ checkpointRecoveryFactory: CheckpointRecoveryFactory,
+ savepointStore: SavepointStore,
+ jobRecoveryTimeout: FiniteDuration,
+ metricsRegistry: Option[MetricRegistry]): Props = {
+
+ JobManager.getJobManagerProps(
+ jobManagerClass,
+ configuration,
+ executorService,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archive,
+ restartStrategyFactory,
+ timeout,
+ leaderElectionService,
+ submittedJobGraphStore,
+ checkpointRecoveryFactory,
+ savepointStore,
+ jobRecoveryTimeout,
+ metricsRegistry)
+ }
+
+ def getTaskManagerProps(
+ taskManagerClass: Class[_ <: TaskManager],
+ taskManagerConfig: TaskManagerConfiguration,
+ resourceID: ResourceID,
+ taskManagerLocation: TaskManagerLocation,
+ memoryManager: MemoryManager,
+ ioManager: IOManager,
+ networkEnvironment: NetworkEnvironment,
+ leaderRetrievalService: LeaderRetrievalService): Props = {
+
+ TaskManager.getTaskManagerProps(
+ taskManagerClass,
+ taskManagerConfig,
+ resourceID,
+ taskManagerLocation,
+ memoryManager,
+ ioManager,
+ networkEnvironment,
+ leaderRetrievalService)
+ }
+
+ def getResourceManagerProps(
+ resourceManagerClass: Class[_ <: FlinkResourceManager[_ <: ResourceIDRetrievable]],
+ configuration: Configuration,
+ leaderRetrievalService: LeaderRetrievalService): Props = {
+
+ FlinkResourceManager.getResourceManagerProps(
+ resourceManagerClass,
+ configuration,
+ leaderRetrievalService)
+ }
+
+ //------------------------------------------------------------------------------------------------
+ // Helper methods
+ //------------------------------------------------------------------------------------------------
+ def createLeaderElectionService(): Option[LeaderElectionService] = {
+ None
}
def initializeIOFormatClasses(configuration: Configuration): Unit = {
@@ -186,7 +326,7 @@ class LocalFlinkMiniCluster(
val bufferSize: Int = config.getInteger(
ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)
-
+
val bufferMem: Long = config.getLong(
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) * bufferSize.toLong
@@ -218,6 +358,7 @@ class LocalFlinkMiniCluster(
val config: Configuration = new Configuration()
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, hostname)
+ config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)
@@ -252,11 +393,11 @@ class LocalFlinkMiniCluster(
JobManager.ARCHIVE_NAME
}
}
-
+
// --------------------------------------------------------------------------
// Actions on running jobs
// --------------------------------------------------------------------------
-
+
def currentlyRunningJobs: Iterable[JobID] = {
val leader = getLeaderGateway(timeout)
val future = leader.ask(JobManagerMessages.RequestRunningJobsStatus, timeout)
@@ -269,7 +410,7 @@ class LocalFlinkMiniCluster(
currentlyRunningJobs.foreach(list.add)
list
}
-
+
def stopJob(id: JobID) : Unit = {
val leader = getLeaderGateway(timeout)
val response = leader.ask(new JobManagerMessages.StopJob(id), timeout)
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 84750a3..de85f30 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -354,6 +354,21 @@ class TaskManager(
case None =>
sender() ! new IOException("BlobService not available. Cannot upload TaskManager logs.")
}
+
+ case RequestBroadcastVariablesWithReferences =>
+ sender ! decorateMessage(
+ ResponseBroadcastVariablesWithReferences(
+ bcVarManager.getNumberOfVariablesWithReferences)
+ )
+
+ case RequestNumActiveConnections =>
+ val numActive = if (!network.isShutdown) {
+ network.getConnectionManager.getNumberOfActiveConnections
+ } else {
+ 0
+ }
+
+ sender ! decorateMessage(ResponseNumActiveConnections(numActive))
}
/**
@@ -1781,6 +1796,7 @@ object TaskManager {
}
/**
+ * Starts the task manager actor.
*
* @param configuration The configuration for the TaskManager.
* @param resourceID The id of the resource which the task manager will run on.
@@ -1817,11 +1833,75 @@ object TaskManager {
taskManagerClass: Class[_ <: TaskManager])
: ActorRef = {
- val (taskManagerConfig : TaskManagerConfiguration,
- netConfig: NetworkEnvironmentConfiguration,
- taskManagerAddress: InetSocketAddress,
- memType: MemoryType
- ) = parseTaskManagerConfiguration(
+ val (taskManagerConfig,
+ connectionInfo,
+ memoryManager,
+ ioManager,
+ network,
+ leaderRetrievalService) = createTaskManagerComponents(
+ configuration,
+ resourceID,
+ taskManagerHostname,
+ localTaskManagerCommunication,
+ leaderRetrievalServiceOption)
+
+ // create the actor properties (which define the actor constructor parameters)
+ val tmProps = getTaskManagerProps(
+ taskManagerClass,
+ taskManagerConfig,
+ resourceID,
+ connectionInfo,
+ memoryManager,
+ ioManager,
+ network,
+ leaderRetrievalService)
+
+ taskManagerActorName match {
+ case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
+ case None => actorSystem.actorOf(tmProps)
+ }
+ }
+
+ def getTaskManagerProps(
+ taskManagerClass: Class[_ <: TaskManager],
+ taskManagerConfig: TaskManagerConfiguration,
+ resourceID: ResourceID,
+ taskManagerLocation: TaskManagerLocation,
+ memoryManager: MemoryManager,
+ ioManager: IOManager,
+ networkEnvironment: NetworkEnvironment,
+ leaderRetrievalService: LeaderRetrievalService
+ ): Props = {
+ Props(
+ taskManagerClass,
+ taskManagerConfig,
+ resourceID,
+ taskManagerLocation,
+ memoryManager,
+ ioManager,
+ networkEnvironment,
+ taskManagerConfig.numberOfSlots,
+ leaderRetrievalService)
+ }
+
+ def createTaskManagerComponents(
+ configuration: Configuration,
+ resourceID: ResourceID,
+ taskManagerHostname: String,
+ localTaskManagerCommunication: Boolean,
+ leaderRetrievalServiceOption: Option[LeaderRetrievalService]):
+ (TaskManagerConfiguration,
+ TaskManagerLocation,
+ MemoryManager,
+ IOManager,
+ NetworkEnvironment,
+ LeaderRetrievalService) = {
+
+ val (taskManagerConfig : TaskManagerConfiguration,
+ netConfig: NetworkEnvironmentConfiguration,
+ taskManagerAddress: InetSocketAddress,
+ memType: MemoryType
+ ) = parseTaskManagerConfiguration(
configuration,
taskManagerHostname,
localTaskManagerCommunication)
@@ -1895,10 +1975,10 @@ object TaskManager {
// check if a value has been configured
val configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
- ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
- "MemoryManager needs at least one MB of memory. " +
- "If you leave this config parameter empty, the system automatically " +
- "pick a fraction of the available memory.")
+ ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
+ "MemoryManager needs at least one MB of memory. " +
+ "If you leave this config parameter empty, the system automatically " +
+ "pick a fraction of the available memory.")
val preAllocateMemory = configuration.getBoolean(
@@ -1910,7 +1990,7 @@ object TaskManager {
LOG.info(s"Using $configuredMemory MB for managed memory.")
} else {
LOG.info(s"Limiting managed memory to $configuredMemory MB, " +
- s"memory will be allocated lazily.")
+ s"memory will be allocated lazily.")
}
configuredMemory << 20 // megabytes to bytes
}
@@ -1928,10 +2008,10 @@ object TaskManager {
if (preAllocateMemory) {
LOG.info(s"Using $fraction of the currently free heap space for managed " +
- s"heap memory (${relativeMemSize >> 20} MB).")
+ s"heap memory (${relativeMemSize >> 20} MB).")
} else {
LOG.info(s"Limiting managed memory to $fraction of the currently free heap space " +
- s"(${relativeMemSize >> 20} MB), memory will be allocated lazily.")
+ s"(${relativeMemSize >> 20} MB), memory will be allocated lazily.")
}
relativeMemSize
@@ -1944,10 +2024,10 @@ object TaskManager {
if (preAllocateMemory) {
LOG.info(s"Using $fraction of the maximum memory size for " +
- s"managed off-heap memory (${directMemorySize >> 20} MB).")
+ s"managed off-heap memory (${directMemorySize >> 20} MB).")
} else {
LOG.info(s"Limiting managed memory to $fraction of the maximum memory size " +
- s"(${directMemorySize >> 20} MB), memory will be allocated lazily.")
+ s"(${directMemorySize >> 20} MB), memory will be allocated lazily.")
}
directMemorySize
@@ -1971,12 +2051,12 @@ object TaskManager {
memType match {
case MemoryType.HEAP =>
throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
- s" while allocating the TaskManager heap memory ($memorySize bytes).", e)
+ s" while allocating the TaskManager heap memory ($memorySize bytes).", e)
case MemoryType.OFF_HEAP =>
throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
- s" while allocating the TaskManager off-heap memory ($memorySize bytes). " +
- s"Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e)
+ s" while allocating the TaskManager off-heap memory ($memorySize bytes). " +
+ s"Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e)
case _ => throw e
}
@@ -1990,22 +2070,12 @@ object TaskManager {
case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
}
- // create the actor properties (which define the actor constructor parameters)
- val tmProps = Props(
- taskManagerClass,
- taskManagerConfig,
- resourceID,
+ (taskManagerConfig,
taskManagerLocation,
memoryManager,
ioManager,
network,
- taskManagerConfig.numberOfSlots,
leaderRetrievalService)
-
- taskManagerActorName match {
- case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
- case None => actorSystem.actorOf(tmProps)
- }
}
@@ -2055,8 +2125,8 @@ object TaskManager {
* @param taskManagerHostname The host name under which the TaskManager communicates.
* @param localTaskManagerCommunication True, to skip initializing the network stack.
* Use only in cases where only one task manager runs.
- * @return A tuple (TaskManagerConfiguration, network configuration,
- * InstanceConnectionInfo, JobManager actor Akka URL).
+ * @return A tuple (TaskManagerConfiguration, network configuration, inet socket address,
+ * memory tyep).
*/
@throws(classOf[IllegalArgumentException])
def parseTaskManagerConfiguration(
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
deleted file mode 100644
index 16331ac..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.ActorRef
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
-import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
-import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.metrics.MetricRegistry
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import java.util.concurrent.ExecutorService
-
-/** JobManager implementation extended by testing messages
- *
- */
-class TestingJobManager(
- flinkConfiguration: Configuration,
- executorService: ExecutorService,
- instanceManager: InstanceManager,
- scheduler: Scheduler,
- libraryCacheManager: BlobLibraryCacheManager,
- archive: ActorRef,
- restartStrategyFactory: RestartStrategyFactory,
- timeout: FiniteDuration,
- leaderElectionService: LeaderElectionService,
- submittedJobGraphs : SubmittedJobGraphStore,
- checkpointRecoveryFactory : CheckpointRecoveryFactory,
- savepointStore : SavepointStore,
- jobRecoveryTimeout : FiniteDuration,
- metricRegistry : Option[MetricRegistry])
- extends JobManager(
- flinkConfiguration,
- executorService,
- instanceManager,
- scheduler,
- libraryCacheManager,
- archive,
- restartStrategyFactory,
- timeout,
- leaderElectionService,
- submittedJobGraphs,
- checkpointRecoveryFactory,
- savepointStore,
- jobRecoveryTimeout,
- metricRegistry)
- with TestingJobManagerLike {}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
deleted file mode 100644
index 3947b17..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ /dev/null
@@ -1,417 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.{ActorRef, Cancellable, Terminated}
-import akka.pattern.{ask, pipe}
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.FlinkActor
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.jobmanager.JobManager
-import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient, RequestClassloadingProps}
-import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
-import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
-import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
-import org.apache.flink.runtime.testingUtils.TestingMessages._
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
-
-import scala.collection.mutable
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-/** This mixin can be used to decorate a JobManager with messages for testing purpose. */
-trait TestingJobManagerLike extends FlinkActor {
- that: JobManager =>
-
- import context._
-
- import scala.collection.JavaConverters._
-
- val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
- val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
-
- val waitForAllVerticesToBeRunningOrFinished =
- scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
-
- var periodicCheck: Option[Cancellable] = None
-
- val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
- collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
-
- val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]()
-
- val waitForLeader = scala.collection.mutable.HashSet[ActorRef]()
-
- val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
- new Ordering[(Int, ActorRef)] {
- override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1
- })
-
- val waitForClient = scala.collection.mutable.HashSet[ActorRef]()
-
- val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
-
- var disconnectDisabled = false
-
- var postStopEnabled = true
-
- abstract override def postStop(): Unit = {
- if (postStopEnabled) {
- super.postStop()
- } else {
- // only stop leader election service to revoke the leadership of this JM so that a new JM
- // can be elected leader
- leaderElectionService.stop()
- }
- }
-
- abstract override def handleMessage: Receive = {
- handleTestingMessage orElse super.handleMessage
- }
-
- def handleTestingMessage: Receive = {
- case Alive => sender() ! Acknowledge
-
- case RequestExecutionGraph(jobID) =>
- currentJobs.get(jobID) match {
- case Some((executionGraph, jobInfo)) => sender() ! decorateMessage(
- ExecutionGraphFound(
- jobID,
- executionGraph)
- )
-
- case None => archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender())
- }
-
- case WaitForAllVerticesToBeRunning(jobID) =>
- if(checkIfAllVerticesRunning(jobID)){
- sender() ! decorateMessage(AllVerticesRunning(jobID))
- }else{
- val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]())
- waitForAllVerticesToBeRunning += jobID -> (waiting + sender())
-
- if(periodicCheck.isEmpty){
- periodicCheck =
- Some(
- context.system.scheduler.schedule(
- 0 seconds,
- 200 millis,
- self,
- decorateMessage(NotifyListeners)
- )
- )
- }
- }
- case WaitForAllVerticesToBeRunningOrFinished(jobID) =>
- if(checkIfAllVerticesRunningOrFinished(jobID)){
- sender() ! decorateMessage(AllVerticesRunning(jobID))
- }else{
- val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]())
- waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender())
-
- if(periodicCheck.isEmpty){
- periodicCheck =
- Some(
- context.system.scheduler.schedule(
- 0 seconds,
- 200 millis,
- self,
- decorateMessage(NotifyListeners)
- )
- )
- }
- }
-
- case NotifyListeners =>
- for(jobID <- currentJobs.keySet){
- notifyListeners(jobID)
- }
-
- if(waitForAllVerticesToBeRunning.isEmpty && waitForAllVerticesToBeRunningOrFinished.isEmpty) {
- periodicCheck foreach { _.cancel() }
- periodicCheck = None
- }
-
-
- case NotifyWhenJobRemoved(jobID) =>
- val gateways = instanceManager.getAllRegisteredInstances.asScala.map(_.getActorGateway)
-
- val responses = gateways.map{
- gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean]
- }
-
- val jobRemovedOnJobManager = (self ? CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean]
-
- val allFutures = responses ++ Seq(jobRemovedOnJobManager)
-
- import context.dispatcher
- Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo sender()
-
- case CheckIfJobRemoved(jobID) =>
- if(currentJobs.contains(jobID)) {
- context.system.scheduler.scheduleOnce(
- 200 milliseconds,
- self,
- decorateMessage(CheckIfJobRemoved(jobID))
- )(context.dispatcher, sender())
- } else {
- sender() ! decorateMessage(true)
- }
-
- case NotifyWhenTaskManagerTerminated(taskManager) =>
- val waiting = waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set())
- waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + sender)
-
- case msg@Terminated(taskManager) =>
- super.handleMessage(msg)
-
- waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
- _ foreach {
- listener =>
- listener ! decorateMessage(TaskManagerTerminated(taskManager))
- }
- }
-
- // see shutdown method for reply
- case NotifyOfComponentShutdown =>
- waitForShutdown += sender()
-
- case NotifyWhenAccumulatorChange(jobID) =>
-
- val (updated, registered) = waitForAccumulatorUpdate.
- getOrElse(jobID, (false, Set[ActorRef]()))
- waitForAccumulatorUpdate += jobID -> (updated, registered + sender)
- sender ! true
-
- /**
- * Notification from the task manager that changed accumulator are transferred on next
- * Hearbeat. We need to keep this state to notify the listeners on next Heartbeat report.
- */
- case AccumulatorsChanged(jobID: JobID) =>
- waitForAccumulatorUpdate.get(jobID) match {
- case Some((updated, registered)) =>
- waitForAccumulatorUpdate.put(jobID, (true, registered))
- case None =>
- }
-
- /**
- * Disabled async processing of accumulator values and send accumulators to the listeners if
- * we previously received an [[AccumulatorsChanged]] message.
- */
- case msg : Heartbeat =>
- super.handleMessage(msg)
-
- waitForAccumulatorUpdate foreach {
- case (jobID, (updated, actors)) if updated =>
- currentJobs.get(jobID) match {
- case Some((graph, jobInfo)) =>
- val flinkAccumulators = graph.getFlinkAccumulators
- val userAccumulators = graph.aggregateUserAccumulators
- actors foreach {
- actor => actor ! UpdatedAccumulators(jobID, flinkAccumulators, userAccumulators)
- }
- case None =>
- }
- waitForAccumulatorUpdate.put(jobID, (false, actors))
- case _ =>
- }
-
- case RequestWorkingTaskManager(jobID) =>
- currentJobs.get(jobID) match {
- case Some((eg, _)) =>
- if(eg.getAllExecutionVertices.asScala.isEmpty){
- sender ! decorateMessage(WorkingTaskManager(None))
- } else {
- val resource = eg.getAllExecutionVertices.asScala.head.getCurrentAssignedResource
-
- if(resource == null){
- sender ! decorateMessage(WorkingTaskManager(None))
- } else {
- sender ! decorateMessage(
- WorkingTaskManager(
- Some(resource.getTaskManagerActorGateway())
- )
- )
- }
- }
- case None => sender ! decorateMessage(WorkingTaskManager(None))
- }
-
- case NotifyWhenJobStatus(jobID, state) =>
- val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID,
- scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]())
-
- val listener = jobStatusListener.getOrElse(state, Set[ActorRef]())
-
- jobStatusListener += state -> (listener + sender)
-
- case msg@JobStatusChanged(jobID, newJobStatus, _, _) =>
- super.handleMessage(msg)
-
- val cleanup = waitForJobStatus.get(jobID) match {
- case Some(stateListener) =>
- stateListener.remove(newJobStatus) match {
- case Some(listeners) =>
- listeners foreach {
- _ ! decorateMessage(JobStatusIs(jobID, newJobStatus))
- }
- case _ =>
- }
- stateListener.isEmpty
-
- case _ => false
- }
-
- if (cleanup) {
- waitForJobStatus.remove(jobID)
- }
-
- case DisableDisconnect =>
- disconnectDisabled = true
-
- case DisablePostStop =>
- postStopEnabled = false
-
- case RequestSavepoint(savepointPath) =>
- try {
- val savepoint = savepointStore.loadSavepoint(savepointPath)
- sender ! ResponseSavepoint(savepoint)
- }
- catch {
- case e: Exception =>
- sender ! ResponseSavepoint(null)
- }
-
- case msg: Disconnect =>
- if (!disconnectDisabled) {
- super.handleMessage(msg)
-
- val taskManager = sender()
-
- waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
- _ foreach {
- listener =>
- listener ! decorateMessage(TaskManagerTerminated(taskManager))
- }
- }
- }
-
- case NotifyWhenLeader =>
- if (leaderElectionService.hasLeadership) {
- sender() ! true
- } else {
- waitForLeader += sender()
- }
-
- case msg: GrantLeadership =>
- super.handleMessage(msg)
-
- waitForLeader.foreach(_ ! true)
-
- waitForLeader.clear()
-
- case NotifyWhenClientConnects =>
- waitForClient += sender()
- sender() ! true
-
- case msg: RegisterJobClient =>
- super.handleMessage(msg)
- waitForClient.foreach(_ ! ClientConnected)
- case msg: RequestClassloadingProps =>
- super.handleMessage(msg)
- waitForClient.foreach(_ ! ClassLoadingPropsDelivered)
-
- case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) =>
- if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) {
- // there are already at least numRegisteredTaskManager registered --> send Acknowledge
- sender() ! Acknowledge
- } else {
- // wait until we see at least numRegisteredTaskManager being registered at the JobManager
- waitForNumRegisteredTaskManagers += ((numRegisteredTaskManager, sender()))
- }
-
- // TaskManager may be registered on these two messages
- case msg @ (_: RegisterTaskManager) =>
- super.handleMessage(msg)
-
- // dequeue all senders which wait for instanceManager.getNumberOfStartedTaskManagers or
- // fewer registered TaskManagers
- while (waitForNumRegisteredTaskManagers.nonEmpty &&
- waitForNumRegisteredTaskManagers.head._1 <=
- instanceManager.getNumberOfRegisteredTaskManagers) {
- val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
- receiver ! Acknowledge
- }
- }
-
- def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
- currentJobs.get(jobID) match {
- case Some((eg, _)) =>
- eg.getAllExecutionVertices.asScala.forall( _.getExecutionState == ExecutionState.RUNNING)
- case None => false
- }
- }
-
- def checkIfAllVerticesRunningOrFinished(jobID: JobID): Boolean = {
- currentJobs.get(jobID) match {
- case Some((eg, _)) =>
- eg.getAllExecutionVertices.asScala.forall {
- case vertex =>
- (vertex.getExecutionState == ExecutionState.RUNNING
- || vertex.getExecutionState == ExecutionState.FINISHED)
- }
- case None => false
- }
- }
-
- def notifyListeners(jobID: JobID): Unit = {
- if(checkIfAllVerticesRunning(jobID)) {
- waitForAllVerticesToBeRunning.remove(jobID) match {
- case Some(listeners) =>
- for (listener <- listeners) {
- listener ! decorateMessage(AllVerticesRunning(jobID))
- }
- case _ =>
- }
- }
-
- if(checkIfAllVerticesRunningOrFinished(jobID)) {
- waitForAllVerticesToBeRunningOrFinished.remove(jobID) match {
- case Some(listeners) =>
- for (listener <- listeners) {
- listener ! decorateMessage(AllVerticesRunning(jobID))
- }
- case _ =>
- }
- }
- }
-
- /**
- * No killing of the VM for testing.
- */
- override protected def shutdown(): Unit = {
- log.info("Shutting down TestingJobManager.")
- waitForShutdown.foreach(_ ! ComponentShutdown(self))
- waitForShutdown.clear()
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
deleted file mode 100644
index f121305..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import java.util.Map
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.JobID
-import org.apache.flink.api.common.accumulators.Accumulator
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
-import org.apache.flink.runtime.instance.ActorGateway
-import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
-
-object TestingJobManagerMessages {
-
- case class RequestExecutionGraph(jobID: JobID)
-
- sealed trait ResponseExecutionGraph {
- def jobID: JobID
- }
-
- case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph) extends
- ResponseExecutionGraph
-
- case class ExecutionGraphNotFound(jobID: JobID) extends ResponseExecutionGraph
-
- case class WaitForAllVerticesToBeRunning(jobID: JobID)
- case class WaitForAllVerticesToBeRunningOrFinished(jobID: JobID)
- case class AllVerticesRunning(jobID: JobID)
-
- case class NotifyWhenJobRemoved(jobID: JobID)
-
- case class RequestWorkingTaskManager(jobID: JobID)
- case class WorkingTaskManager(gatewayOption: Option[ActorGateway])
-
- case class NotifyWhenJobStatus(jobID: JobID, state: JobStatus)
- case class JobStatusIs(jobID: JobID, state: JobStatus)
-
- case object NotifyListeners
-
- case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
- case class TaskManagerTerminated(taskManager: ActorRef)
-
- /**
- * Registers a listener to receive a message when accumulators changed.
- * The change must be explicitly triggered by the TestingTaskManager which can receive an
- * [[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]]
- * message by a task that changed the accumulators. This message is then
- * forwarded to the JobManager which will send the accumulators in the [[UpdatedAccumulators]]
- * message when the next Heartbeat occurs.
- */
- case class NotifyWhenAccumulatorChange(jobID: JobID)
-
- /**
- * Reports updated accumulators back to the listener.
- */
- case class UpdatedAccumulators(jobID: JobID,
- flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, Accumulator[_,_]]],
- userAccumulators: Map[String, Accumulator[_,_]])
-
- /** Notifies the sender when the [[TestingJobManager]] has been elected as the leader
- *
- */
- case object NotifyWhenLeader
-
- /**
- * Notifies the sender when the [[TestingJobManager]] receives new clients for jobs
- */
- case object NotifyWhenClientConnects
- /**
- * Notifes of client connect
- */
- case object ClientConnected
- /**
- * Notifies when the client has requested class loading information
- */
- case object ClassLoadingPropsDelivered
-
- /**
- * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]]
- * message when at least numRegisteredTaskManager have registered at the JobManager.
- *
- * @param numRegisteredTaskManager minimum number of registered TMs before the sender is notified
- */
- case class NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager: Int)
-
- /** Disables the post stop method of the [[TestingJobManager]].
- *
- * Only the leaderElectionService is stopped in the postStop method call to revoke the leadership
- */
- case object DisablePostStop
-
- /**
- * Requests a savepoint from the job manager.
- *
- * @param savepointPath The path of the savepoint to request.
- */
- case class RequestSavepoint(savepointPath: String)
-
- /**
- * Response to a savepoint request.
- *
- * @param savepoint The requested savepoint or null if none available.
- */
- case class ResponseSavepoint(savepoint: Savepoint)
-
- def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader
- def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects
- def getDisablePostStop(): AnyRef = DisablePostStop
-
- def getClientConnected(): AnyRef = ClientConnected
- def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
deleted file mode 100644
index 48a1ddd..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import org.apache.flink.runtime.jobmanager.MemoryArchivist
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphFound, ExecutionGraphNotFound, RequestExecutionGraph}
-
-/** Memory archivist extended by testing messages
- *
- * @param maxEntries number of maximum number of archived jobs
- */
-class TestingMemoryArchivist(maxEntries: Int) extends MemoryArchivist(maxEntries) {
-
- override def handleMessage: Receive = {
- handleTestingMessage orElse super.handleMessage
- }
-
- def handleTestingMessage: Receive = {
- case RequestExecutionGraph(jobID) =>
- val executionGraph = graphs.get(jobID)
-
- executionGraph match {
- case Some(graph) => sender ! decorateMessage(ExecutionGraphFound(jobID, graph))
- case None => sender ! decorateMessage(ExecutionGraphNotFound(jobID))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
deleted file mode 100644
index 91d169a..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.JobID
-
-object TestingMessages {
-
- case class CheckIfJobRemoved(jobID: JobID)
-
- case object DisableDisconnect
-
- case object Alive
-
- def getAlive: AnyRef = Alive
-
- def getDisableDisconnect: AnyRef = DisableDisconnect
-
- case object NotifyOfComponentShutdown
- case class ComponentShutdown(ref: ActorRef)
-
- def getNotifyOfComponentShutdown(): AnyRef = NotifyOfComponentShutdown
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
deleted file mode 100644
index 9b5a147..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testingUtils
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.io.disk.iomanager.IOManager
-import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
-import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManagerLocation, TaskManager, TaskManagerConfiguration}
-
-import scala.language.postfixOps
-
-/** Subclass of the [[TaskManager]] to support testing messages
- */
-class TestingTaskManager(
- config: TaskManagerConfiguration,
- resourceID: ResourceID,
- connectionInfo: TaskManagerLocation,
- memoryManager: MemoryManager,
- ioManager: IOManager,
- network: NetworkEnvironment,
- numberOfSlots: Int,
- leaderRetrievalService: LeaderRetrievalService)
- extends TaskManager(
- config,
- resourceID,
- connectionInfo,
- memoryManager,
- ioManager,
- network,
- numberOfSlots,
- leaderRetrievalService)
- with TestingTaskManagerLike {
-
- def this(
- config: TaskManagerConfiguration,
- connectionInfo: TaskManagerLocation,
- memoryManager: MemoryManager,
- ioManager: IOManager,
- network: NetworkEnvironment,
- numberOfSlots: Int,
- leaderRetrievalService: LeaderRetrievalService) {
- this(
- config,
- ResourceID.generate(),
- connectionInfo,
- memoryManager,
- ioManager,
- network,
- numberOfSlots,
- leaderRetrievalService)
- }
-}