You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:07 UTC
[02/16] flink git commit: [FLINK-6136] Separate EmbeddedHaServices
and StandaloneHaServices
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index b22f1a6..05749c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -56,31 +56,20 @@ public class LeaderRetrievalUtils {
* Creates a {@link LeaderRetrievalService} based on the provided {@link Configuration} object.
*
* @param configuration Configuration containing the settings for the {@link LeaderRetrievalService}
- * @return The {@link LeaderRetrievalService} specified in the configuration object
- * @throws Exception
- */
- public static LeaderRetrievalService createLeaderRetrievalService(Configuration configuration)
- throws Exception {
- return createLeaderRetrievalService(configuration, false);
- }
-
- /**
- * Creates a {@link LeaderRetrievalService} based on the provided {@link Configuration} object.
- *
- * @param configuration Configuration containing the settings for the {@link LeaderRetrievalService}
* @param resolveInitialHostName If true, resolves the initial hostname
* @return The {@link LeaderRetrievalService} specified in the configuration object
* @throws Exception
*/
public static LeaderRetrievalService createLeaderRetrievalService(
- Configuration configuration, boolean resolveInitialHostName)
+ Configuration configuration,
+ boolean resolveInitialHostName)
throws Exception {
HighAvailabilityMode highAvailabilityMode = getRecoveryMode(configuration);
switch (highAvailabilityMode) {
case NONE:
- return StandaloneUtils.createLeaderRetrievalService(configuration, resolveInitialHostName);
+ return StandaloneUtils.createLeaderRetrievalService(configuration, resolveInitialHostName, null);
case ZOOKEEPER:
return ZooKeeperUtils.createLeaderRetrievalService(configuration);
default:
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
index 8436ced..1719b38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
@@ -18,15 +18,15 @@
package org.apache.flink.runtime.util;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution;
+import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.util.NetUtils;
-import scala.Option;
-import scala.Tuple3;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.util.ConfigurationException;
-import java.net.InetAddress;
import java.net.UnknownHostException;
/**
@@ -40,27 +40,15 @@ public final class StandaloneUtils {
*
* @param configuration Configuration instance containing the host and port information
* @return StandaloneLeaderRetrievalService
+ * @throws ConfigurationException
* @throws UnknownHostException
*/
- public static StandaloneLeaderRetrievalService createLeaderRetrievalService(
- Configuration configuration)
- throws UnknownHostException {
- return createLeaderRetrievalService(configuration, false);
- }
-
- /**
- * Creates a {@link StandaloneLeaderRetrievalService} from the given configuration. The
- * host and port for the remote Akka URL are retrieved from the provided configuration.
- *
- * @param configuration Configuration instance containing the host and port information
- * @param resolveInitialHostName If true, resolves the hostname of the StandaloneLeaderRetrievalService
- * @return StandaloneLeaderRetrievalService
- * @throws UnknownHostException
- */
- public static StandaloneLeaderRetrievalService createLeaderRetrievalService(
- Configuration configuration, boolean resolveInitialHostName)
- throws UnknownHostException {
- return createLeaderRetrievalService(configuration, resolveInitialHostName, null);
+ public static StandaloneLeaderRetrievalService createLeaderRetrievalService(Configuration configuration)
+ throws ConfigurationException, UnknownHostException {
+ return createLeaderRetrievalService(
+ configuration,
+ false,
+ null);
}
/**
@@ -73,38 +61,22 @@ public final class StandaloneUtils {
* @param resolveInitialHostName If true, resolves the hostname of the StandaloneLeaderRetrievalService
* @param jobManagerName Name of the JobManager actor
* @return StandaloneLeaderRetrievalService
- * @throws UnknownHostException if the host name cannot be resolved into an {@link InetAddress}
+ * @throws ConfigurationException if the job manager address cannot be retrieved from the configuration
+ * @throws UnknownHostException if the job manager address cannot be resolved
*/
public static StandaloneLeaderRetrievalService createLeaderRetrievalService(
Configuration configuration,
boolean resolveInitialHostName,
String jobManagerName)
- throws UnknownHostException {
-
- Tuple3<String, String, Object> stringIntPair = TaskManager.getAndCheckJobManagerAddress(configuration);
-
- String protocol = stringIntPair._1();
- String jobManagerHostname = stringIntPair._2();
- int jobManagerPort = (Integer) stringIntPair._3();
-
- // Do not try to resolve a hostname to prevent resolving to the wrong IP address
- String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(jobManagerHostname, jobManagerPort);
-
- if (resolveInitialHostName) {
- try {
- //noinspection ResultOfMethodCallIgnored
- InetAddress.getByName(jobManagerHostname);
- }
- catch (UnknownHostException e) {
- throw new UnknownHostException("Cannot resolve the JobManager hostname '" + jobManagerHostname
- + "' specified in the configuration");
- }
- }
+ throws ConfigurationException, UnknownHostException {
+ Tuple2<String, Integer> hostnamePort = HighAvailabilityServicesUtils.getJobManagerAddress(configuration);
- String jobManagerAkkaUrl = JobManager.getRemoteJobManagerAkkaURL(
- protocol,
- hostPort,
- Option.apply(jobManagerName));
+ String jobManagerAkkaUrl = AkkaRpcServiceUtils.getRpcUrl(
+ hostnamePort.f0,
+ hostnamePort.f1,
+ jobManagerName != null ? jobManagerName : JobMaster.JOB_MANAGER_NAME,
+ resolveInitialHostName ? AddressResolution.TRY_ADDRESS_RESOLUTION : AddressResolution.NO_ADDRESS_RESOLUTION,
+ configuration);
return new StandaloneLeaderRetrievalService(jobManagerAkkaUrl);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index d94eb7a..62fa73d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -28,7 +28,7 @@ import com.typesafe.config.{Config, ConfigFactory}
import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration}
import org.apache.flink.runtime.net.SSLUtils
-import org.apache.flink.util.NetUtils
+import org.apache.flink.util.{ConfigurationException, NetUtils, Preconditions}
import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory}
import org.slf4j.LoggerFactory
@@ -707,18 +707,15 @@ object AkkaUtils {
"(d|day)|(h|hour)|(min|minute)|s|sec|second)|(ms|milli|millisecond)|" +
"(µs|micro|microsecond)|(ns|nano|nanosecond)"
}
-
- /** Returns the protocol field for the URL of the remote actor system given the user configuration
+
+ /**
+ * Returns the local akka url for the given actor name.
*
- * @param config instance containing the user provided configuration values
- * @return the remote url's protocol field
+ * @param actorName Actor name identifying the actor
+ * @return Local Akka URL for the given actor
*/
- def getAkkaProtocol(config: Configuration): String = {
- val sslEnabled = config.getBoolean(ConfigConstants.AKKA_SSL_ENABLED,
- ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) &&
- SSLUtils.getSSLEnabled(config)
- if (sslEnabled) "akka.ssl.tcp" else "akka.tcp"
+ def getLocalAkkaURL(actorName: String): String = {
+ "akka://flink/user/" + actorName
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 479ec51..68f43da 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -49,11 +49,15 @@ import org.apache.flink.runtime.execution.SuppressRestartsException
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
import org.apache.flink.runtime.executiongraph._
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager}
import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
+import org.apache.flink.runtime.jobmaster.JobMaster
+import org.apache.flink.runtime.jobmaster.JobMaster.{ARCHIVE_NAME, JOB_MANAGER_NAME}
import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService, StandaloneLeaderElectionService}
import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
@@ -72,8 +76,11 @@ import org.apache.flink.runtime.metrics.util.MetricUtils
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered}
import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
+import org.apache.flink.runtime.taskexecutor.TaskExecutor
+import org.apache.flink.runtime.taskexecutor.TaskExecutor.TASK_MANAGER_NAME
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util._
import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
@@ -1923,12 +1930,6 @@ object JobManager {
val STARTUP_FAILURE_RETURN_CODE = 1
val RUNTIME_FAILURE_RETURN_CODE = 2
- /** Name of the JobManager actor */
- val JOB_MANAGER_NAME = "jobmanager"
-
- /** Name of the archive actor */
- val ARCHIVE_NAME = "archive"
-
/**
* Entry point (main method) to run the JobManager in a standalone fashion.
@@ -2237,7 +2238,7 @@ object JobManager {
if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
LOG.info("Starting JobManager web frontend")
val leaderRetrievalService = LeaderRetrievalUtils
- .createLeaderRetrievalService(configuration)
+ .createLeaderRetrievalService(configuration, false)
// start the web frontend. we need to load this dynamically
// because it is not in the same project/dependencies
@@ -2287,7 +2288,7 @@ object JobManager {
ResourceID.generate(),
jobManagerSystem,
externalHostname,
- Some(TaskManager.TASK_MANAGER_NAME),
+ Some(TaskExecutor.TASK_MANAGER_NAME),
None,
localTaskManagerCommunication = true,
classOf[TaskManager])
@@ -2305,7 +2306,13 @@ object JobManager {
// start web monitor
webMonitor.foreach {
monitor =>
- val jobManagerAkkaUrl = JobManager.getRemoteJobManagerAkkaURL(configuration)
+ val hostnamePort = HighAvailabilityServicesUtils.getJobManagerAddress(configuration)
+ val jobManagerAkkaUrl = AkkaRpcServiceUtils.getRpcUrl(
+ hostnamePort.f0,
+ hostnamePort.f1,
+ JobMaster.JOB_MANAGER_NAME,
+ AddressResolution.NO_ADDRESS_RESOLUTION,
+ configuration)
monitor.start(jobManagerAkkaUrl)
}
@@ -2317,7 +2324,7 @@ object JobManager {
FlinkResourceManager.startResourceManagerActors(
configuration,
jobManagerSystem,
- LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
+ LeaderRetrievalUtils.createLeaderRetrievalService(configuration, false),
rmClass))
case None =>
LOG.info("Resource Manager class not provided. No resource manager will be started.")
@@ -2631,8 +2638,8 @@ object JobManager {
actorSystem,
futureExecutor,
ioExecutor,
- Some(JOB_MANAGER_NAME),
- Some(ARCHIVE_NAME),
+ Some(JobMaster.JOB_MANAGER_NAME),
+ Some(JobMaster.ARCHIVE_NAME),
jobManagerClass,
archiveClass)
}
@@ -2761,121 +2768,4 @@ object JobManager {
jobRecoveryTimeout,
metricsRegistry)
}
-
- // --------------------------------------------------------------------------
- // Resolving the JobManager endpoint
- // --------------------------------------------------------------------------
-
- /**
- * Builds the akka actor path for the JobManager actor, given the socket address
- * where the JobManager's actor system runs.
- *
- * @param protocol The protocol to be used to connect to the remote JobManager's actor system.
- * @param hostPort The external address of the JobManager's actor system in format host:port
- * @return The akka URL of the JobManager actor.
- */
- def getRemoteJobManagerAkkaURL(
- protocol: String,
- hostPort: String,
- name: Option[String] = None)
- : String = {
-
- require(protocol == "akka.tcp" || protocol == "akka.ssl.tcp",
- "protocol field should be either akka.tcp or akka.ssl.tcp")
-
- getJobManagerAkkaURLHelper(s"$protocol://flink@$hostPort", name)
- }
-
- /**
- * Returns the JobManager actor's remote Akka URL, given the configured hostname and port.
- *
- * @param config The configuration to parse
- * @return JobManager actor remote Akka URL
- */
- def getRemoteJobManagerAkkaURL(config: Configuration) : String = {
- val (protocol, hostname, port) = TaskManager.getAndCheckJobManagerAddress(config)
-
- val hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port)
-
- JobManager.getRemoteJobManagerAkkaURL(protocol, hostPort, Option.empty)
- }
-
- /**
- * Builds the akka actor path for the JobManager actor to address the actor within
- * its own actor system.
- *
- * @return The local akka URL of the JobManager actor.
- */
- def getLocalJobManagerAkkaURL(name: Option[String] = None): String = {
- getJobManagerAkkaURLHelper("akka://flink", name)
- }
-
- def getJobManagerAkkaURL(system: ActorSystem, name: Option[String] = None): String = {
- getJobManagerAkkaURLHelper(AkkaUtils.getAddress(system).toString, name)
- }
-
- private def getJobManagerAkkaURLHelper(address: String, name: Option[String]): String = {
- address + "/user/" + name.getOrElse(JOB_MANAGER_NAME)
- }
-
- /**
- * Resolves the JobManager actor reference in a blocking fashion.
- *
- * @param jobManagerUrl The akka URL of the JobManager.
- * @param system The local actor system that should perform the lookup.
- * @param timeout The maximum time to wait until the lookup fails.
- * @throws java.io.IOException Thrown, if the lookup fails.
- * @return The ActorRef to the JobManager
- */
- @throws(classOf[IOException])
- def getJobManagerActorRef(
- jobManagerUrl: String,
- system: ActorSystem,
- timeout: FiniteDuration)
- : ActorRef = {
- AkkaUtils.getActorRef(jobManagerUrl, system, timeout)
- }
-
- /**
- * Resolves the JobManager actor reference in a blocking fashion.
- *
- * @param protocol The protocol to be used to connect to the remote JobManager's actor system.
- * @param hostPort The external address of the JobManager's actor system in format host:port.
- * @param system The local actor system that should perform the lookup.
- * @param timeout The maximum time to wait until the lookup fails.
- * @throws java.io.IOException Thrown, if the lookup fails.
- * @return The ActorRef to the JobManager
- */
- @throws(classOf[IOException])
- def getJobManagerActorRef(
- protocol: String,
- hostPort: String,
- system: ActorSystem,
- timeout: FiniteDuration)
- : ActorRef = {
-
- val jmAddress = getRemoteJobManagerAkkaURL(protocol, hostPort)
- getJobManagerActorRef(jmAddress, system, timeout)
- }
-
- /**
- * Resolves the JobManager actor reference in a blocking fashion.
- *
- * @param hostPort The address of the JobManager's actor system in format host:port.
- * @param system The local actor system that should perform the lookup.
- * @param config The config describing the maximum time to wait until the lookup fails.
- * @throws java.io.IOException Thrown, if the lookup fails.
- * @return The ActorRef to the JobManager
- */
- @throws(classOf[IOException])
- def getJobManagerActorRef(
- hostPort: String,
- system: ActorSystem,
- config: Configuration)
- : ActorRef = {
-
- val timeout = AkkaUtils.getLookupTimeout(config)
- val protocol = AkkaUtils.getAkkaProtocol(config)
- getJobManagerActorRef(protocol, hostPort, system, timeout)
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 3d43da5..2f83548 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -38,13 +38,14 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.io.network.netty.NettyConfig
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore}
+import org.apache.flink.runtime.jobmaster.JobMaster
import org.apache.flink.runtime.leaderelection.LeaderElectionService
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.messages.JobManagerMessages
import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse}
import org.apache.flink.runtime.metrics.MetricRegistry
-import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
+import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
import org.apache.flink.runtime.util.EnvironmentInformation
@@ -207,9 +208,9 @@ class LocalFlinkMiniCluster(
val localExecution = numTaskManagers == 1
val taskManagerActorName = if (singleActorSystem) {
- TaskManager.TASK_MANAGER_NAME + "_" + (index + 1)
+ TaskExecutor.TASK_MANAGER_NAME + "_" + (index + 1)
} else {
- TaskManager.TASK_MANAGER_NAME
+ TaskExecutor.TASK_MANAGER_NAME
}
val resourceID = ResourceID.generate() // generate random resource id
@@ -397,9 +398,9 @@ class LocalFlinkMiniCluster(
protected def getJobManagerName(index: Int): String = {
if(singleActorSystem) {
- JobManager.JOB_MANAGER_NAME + "_" + (index + 1)
+ JobMaster.JOB_MANAGER_NAME + "_" + (index + 1)
} else {
- JobManager.JOB_MANAGER_NAME
+ JobMaster.JOB_MANAGER_NAME
}
}
@@ -413,9 +414,9 @@ class LocalFlinkMiniCluster(
protected def getArchiveName(index: Int): String = {
if(singleActorSystem) {
- JobManager.ARCHIVE_NAME + "_" + (index + 1)
+ JobMaster.ARCHIVE_NAME + "_" + (index + 1)
} else {
- JobManager.ARCHIVE_NAME
+ JobMaster.ARCHIVE_NAME
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 97e55f0..4065660 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -64,7 +64,7 @@ import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
-import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
+import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
import org.apache.flink.runtime.util._
import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
@@ -1496,9 +1496,6 @@ object TaskManager {
/** Return code for critical errors during the runtime */
val RUNTIME_FAILURE_RETURN_CODE = 2
- /** The name of the TaskManager actor */
- val TASK_MANAGER_NAME = "taskmanager"
-
/** Maximum time (milli seconds) that the TaskManager will spend searching for a
* suitable network interface to use for communication */
val MAX_STARTUP_CONNECT_TIME = 120000L
@@ -1664,7 +1661,9 @@ object TaskManager {
LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname)
}
else {
- val leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
+ val leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(
+ configuration,
+ true)
val lookupTimeout = AkkaUtils.getLookupTimeout(configuration)
val taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
@@ -1780,7 +1779,7 @@ object TaskManager {
resourceID,
taskManagerSystem,
taskManagerHostname,
- Some(TASK_MANAGER_NAME),
+ Some(TaskExecutor.TASK_MANAGER_NAME),
None,
localTaskManagerCommunication = false,
taskManagerClass)
@@ -1969,38 +1968,4 @@ object TaskManager {
throw new IOException("Could not connect to TaskManager at " + taskManagerUrl, e)
}
}
-
- // --------------------------------------------------------------------------
- // Parsing and checking the TaskManager Configuration
- // --------------------------------------------------------------------------
-
- /**
- * Gets the protocol, hostname and port of the JobManager from the configuration. Also checks that
- * the hostname is not null and the port non-negative.
- *
- * @param configuration The configuration to read the config values from.
- * @return A 3-tuple (protocol, hostname, port).
- */
- def getAndCheckJobManagerAddress(configuration: Configuration) : (String, String, Int) = {
-
- val protocol = AkkaUtils.getAkkaProtocol(configuration)
-
- val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
-
- val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
-
- if (hostname == null) {
- throw new Exception("Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY +
- "' is missing (hostname/address of JobManager to connect to).")
- }
-
- if (port <= 0 || port >= 65536) {
- throw new Exception("Invalid value for '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
- "' (port of the JobManager actor system) : " + port +
- ". it must be great than 0 and less than 65536.")
- }
-
- (protocol, hostname, port)
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 37a86c7..830dbf9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.highavailability.nonha.NonHaRegistry;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -147,7 +147,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
@Override
public RunningJobsRegistry getRunningJobsRegistry() {
- return new NonHaRegistry();
+ return new StandaloneRunningJobsRegistry();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
deleted file mode 100644
index b1881e5..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.highavailability;
-
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class ZooKeeperRegistryTest extends TestLogger {
-
- private TestingServer testingServer;
-
- @Before
- public void before() throws Exception {
- testingServer = new TestingServer();
- }
-
- @After
- public void after() throws Exception {
- testingServer.stop();
- testingServer = null;
- }
-
- /**
- * Tests that the function of ZookeeperRegistry, setJobRunning(), setJobFinished(), isJobRunning()
- */
- @Test
- public void testZooKeeperRegistry() throws Exception {
- Configuration configuration = new Configuration();
- configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
- configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-
- final HighAvailabilityServices zkHaService = new ZookeeperHaServices(
- ZooKeeperUtils.startCuratorFramework(configuration), Executors.directExecutor(), configuration);
-
- final RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry();
-
- try {
- JobID jobID = JobID.generate();
- assertEquals(JobSchedulingStatus.PENDING, zkRegistry.getJobSchedulingStatus(jobID));
-
- zkRegistry.setJobRunning(jobID);
- assertEquals(JobSchedulingStatus.RUNNING, zkRegistry.getJobSchedulingStatus(jobID));
-
- zkRegistry.setJobFinished(jobID);
- assertEquals(JobSchedulingStatus.DONE, zkRegistry.getJobSchedulingStatus(jobID));
-
- zkRegistry.clearJob(jobID);
- assertEquals(JobSchedulingStatus.PENDING, zkRegistry.getJobSchedulingStatus(jobID));
- } finally {
- zkHaService.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
deleted file mode 100644
index a9805a1..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.highavailability.leaderelection;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.leaderelection.LeaderContender;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.util.StringUtils;
-
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.Executor;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.*;
-
-/**
- * Tests for the {@link SingleLeaderElectionService}.
- */
-public class SingleLeaderElectionServiceTest {
-
- private static final Random RND = new Random();
-
- private final Executor executor = Executors.directExecutor();
-
- // ------------------------------------------------------------------------
-
- @Test
- public void testStartStopAssignLeadership() throws Exception {
- final UUID uuid = UUID.randomUUID();
- final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
-
- final LeaderContender contender = mockContender(service);
- final LeaderContender otherContender = mockContender(service);
-
- service.start(contender);
- verify(contender, times(1)).grantLeadership(uuid);
-
- service.stop();
- verify(contender, times(1)).revokeLeadership();
-
- // start with a new contender - the old contender must not gain another leadership
- service.start(otherContender);
- verify(otherContender, times(1)).grantLeadership(uuid);
-
- verify(contender, times(1)).grantLeadership(uuid);
- verify(contender, times(1)).revokeLeadership();
- }
-
- @Test
- public void testStopBeforeConfirmingLeadership() throws Exception {
- final UUID uuid = UUID.randomUUID();
- final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
-
- final LeaderContender contender = mock(LeaderContender.class);
-
- service.start(contender);
- verify(contender, times(1)).grantLeadership(uuid);
-
- service.stop();
-
- // because the leadership was never confirmed, there is no "revoke" call
- verifyNoMoreInteractions(contender);
- }
-
- @Test
- public void testStartOnlyOnce() throws Exception {
- final UUID uuid = UUID.randomUUID();
- final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
-
- final LeaderContender contender = mock(LeaderContender.class);
- final LeaderContender otherContender = mock(LeaderContender.class);
-
- service.start(contender);
- verify(contender, times(1)).grantLeadership(uuid);
-
- // should not be possible to start again this with another contender
- try {
- service.start(otherContender);
- fail("should fail with an exception");
- } catch (IllegalStateException e) {
- // expected
- }
-
- // should not be possible to start this again with the same contender
- try {
- service.start(contender);
- fail("should fail with an exception");
- } catch (IllegalStateException e) {
- // expected
- }
- }
-
- @Test
- public void testShutdown() throws Exception {
- final UUID uuid = UUID.randomUUID();
- final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
-
- // create a leader contender and let it grab leadership
- final LeaderContender contender = mockContender(service);
- service.start(contender);
- verify(contender, times(1)).grantLeadership(uuid);
-
- // some leader listeners
- final LeaderRetrievalListener listener1 = mock(LeaderRetrievalListener.class);
- final LeaderRetrievalListener listener2 = mock(LeaderRetrievalListener.class);
-
- LeaderRetrievalService listenerService1 = service.createLeaderRetrievalService();
- LeaderRetrievalService listenerService2 = service.createLeaderRetrievalService();
-
- listenerService1.start(listener1);
- listenerService2.start(listener2);
-
- // one listener stops
- listenerService1.stop();
-
- // shut down the service
- service.shutdown();
-
- // the leader contender and running listener should get error notifications
- verify(contender, times(1)).handleError(any(Exception.class));
- verify(listener2, times(1)).handleError(any(Exception.class));
-
- // the stopped listener gets no notification
- verify(listener1, times(0)).handleError(any(Exception.class));
-
- // should not be possible to start again after shutdown
- try {
- service.start(contender);
- fail("should fail with an exception");
- } catch (IllegalStateException e) {
- // expected
- }
-
- // no additional leadership grant
- verify(contender, times(1)).grantLeadership(any(UUID.class));
- }
-
- @Test
- public void testImmediateShutdown() throws Exception {
- final UUID uuid = UUID.randomUUID();
- final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
- service.shutdown();
-
- final LeaderContender contender = mock(LeaderContender.class);
-
- // should not be possible to start
- try {
- service.start(contender);
- fail("should fail with an exception");
- } catch (IllegalStateException e) {
- // expected
- }
-
- // no additional leadership grant
- verify(contender, times(0)).grantLeadership(any(UUID.class));
- }
-
-// @Test
-// public void testNotifyListenersWhenLeaderElected() throws Exception {
-// final UUID uuid = UUID.randomUUID();
-// final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
-//
-// final LeaderRetrievalListener listener1 = mock(LeaderRetrievalListener.class);
-// final LeaderRetrievalListener listener2 = mock(LeaderRetrievalListener.class);
-//
-// LeaderRetrievalService listenerService1 = service.createLeaderRetrievalService();
-// LeaderRetrievalService listenerService2 = service.createLeaderRetrievalService();
-//
-// listenerService1.start(listener1);
-// listenerService1.start(listener2);
-//
-// final LeaderContender contender = mockContender(service);
-// service.start(contender);
-//
-// veri
-// }
-
- // ------------------------------------------------------------------------
- // utilities
- // ------------------------------------------------------------------------
-
- private static LeaderContender mockContender(final LeaderElectionService service) {
- String address = StringUtils.getRandomString(RND, 5, 10, 'a', 'z');
- return mockContender(service, address);
- }
-
- private static LeaderContender mockContender(final LeaderElectionService service, final String address) {
- LeaderContender mockContender = mock(LeaderContender.class);
-
- when(mockContender.getAddress()).thenReturn(address);
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- final UUID uuid = (UUID) invocation.getArguments()[0];
- service.confirmLeaderSessionID(uuid);
- return null;
- }
- }).when(mockContender).grantLeadership(any(UUID.class));
-
- return mockContender;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java
new file mode 100644
index 0000000..7bf9364
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.embedded;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.UUID;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link EmbeddedHaServices}.
+ */
+public class EmbeddedHaServicesTest extends TestLogger {
+
+ private EmbeddedHaServices embeddedHaServices;
+
+ @Before
+ public void setupTest() {
+ embeddedHaServices = new EmbeddedHaServices(Executors.directExecutor());
+ }
+
+ @After
+ public void teardownTest() throws Exception {
+ if (embeddedHaServices != null) {
+ embeddedHaServices.closeAndCleanupAllData();
+ embeddedHaServices = null;
+ }
+ }
+
+ /**
+ * Tests that exactly one JobManager is elected as the leader for a given job id.
+ */
+ @Test
+ public void testJobManagerLeaderElection() throws Exception {
+ JobID jobId1 = new JobID();
+ JobID jobId2 = new JobID();
+
+ LeaderContender leaderContender1 = mock(LeaderContender.class);
+ LeaderContender leaderContender2 = mock(LeaderContender.class);
+ LeaderContender leaderContenderDifferentJobId = mock(LeaderContender.class);
+
+ LeaderElectionService leaderElectionService1 = embeddedHaServices.getJobManagerLeaderElectionService(jobId1);
+ LeaderElectionService leaderElectionService2 = embeddedHaServices.getJobManagerLeaderElectionService(jobId1);
+ LeaderElectionService leaderElectionServiceDifferentJobId = embeddedHaServices.getJobManagerLeaderElectionService(jobId2);
+
+ leaderElectionService1.start(leaderContender1);
+ leaderElectionService2.start(leaderContender2);
+ leaderElectionServiceDifferentJobId.start(leaderContenderDifferentJobId);
+
+ ArgumentCaptor<UUID> leaderIdArgumentCaptor1 = ArgumentCaptor.forClass(UUID.class);
+ ArgumentCaptor<UUID> leaderIdArgumentCaptor2 = ArgumentCaptor.forClass(UUID.class);
+ verify(leaderContender1, atLeast(0)).grantLeadership(leaderIdArgumentCaptor1.capture());
+ verify(leaderContender2, atLeast(0)).grantLeadership(leaderIdArgumentCaptor2.capture());
+
+ assertTrue(leaderIdArgumentCaptor1.getAllValues().isEmpty() ^ leaderIdArgumentCaptor2.getAllValues().isEmpty());
+
+ verify(leaderContenderDifferentJobId).grantLeadership(any(UUID.class));
+ }
+
+ /**
+ * Tests that exactly one ResourceManager is elected as the leader.
+ */
+ @Test
+ public void testResourceManagerLeaderElection() throws Exception {
+ LeaderContender leaderContender1 = mock(LeaderContender.class);
+ LeaderContender leaderContender2 = mock(LeaderContender.class);
+
+ LeaderElectionService leaderElectionService1 = embeddedHaServices.getResourceManagerLeaderElectionService();
+ LeaderElectionService leaderElectionService2 = embeddedHaServices.getResourceManagerLeaderElectionService();
+
+ leaderElectionService1.start(leaderContender1);
+ leaderElectionService2.start(leaderContender2);
+
+ ArgumentCaptor<UUID> leaderIdArgumentCaptor1 = ArgumentCaptor.forClass(UUID.class);
+ ArgumentCaptor<UUID> leaderIdArgumentCaptor2 = ArgumentCaptor.forClass(UUID.class);
+ verify(leaderContender1, atLeast(0)).grantLeadership(leaderIdArgumentCaptor1.capture());
+ verify(leaderContender2, atLeast(0)).grantLeadership(leaderIdArgumentCaptor2.capture());
+
+ assertTrue(leaderIdArgumentCaptor1.getAllValues().isEmpty() ^ leaderIdArgumentCaptor2.getAllValues().isEmpty());
+ }
+
+ /**
+ * Tests the JobManager leader retrieval for a given job.
+ */
+ @Test
+ public void testJobManagerLeaderRetrieval() throws Exception {
+ final String address = "foobar";
+ JobID jobId = new JobID();
+ LeaderRetrievalListener leaderRetrievalListener = mock(LeaderRetrievalListener.class);
+ LeaderContender leaderContender = mock(LeaderContender.class);
+ when(leaderContender.getAddress()).thenReturn(address);
+
+ LeaderElectionService leaderElectionService = embeddedHaServices.getJobManagerLeaderElectionService(jobId);
+ LeaderRetrievalService leaderRetrievalService = embeddedHaServices.getJobManagerLeaderRetriever(jobId);
+
+ leaderRetrievalService.start(leaderRetrievalListener);
+ leaderElectionService.start(leaderContender);
+
+ ArgumentCaptor<UUID> leaderIdArgumentCaptor = ArgumentCaptor.forClass(UUID.class);
+ verify(leaderContender).grantLeadership(leaderIdArgumentCaptor.capture());
+
+ final UUID leaderId = leaderIdArgumentCaptor.getValue();
+
+ leaderElectionService.confirmLeaderSessionID(leaderId);
+
+ verify(leaderRetrievalListener).notifyLeaderAddress(eq(address), eq(leaderId));
+ }
+
+ /**
+ * Tests the ResourceManager leader retrieval for a given job.
+ */
+ @Test
+ public void testResourceManagerLeaderRetrieval() throws Exception {
+ final String address = "foobar";
+ LeaderRetrievalListener leaderRetrievalListener = mock(LeaderRetrievalListener.class);
+ LeaderContender leaderContender = mock(LeaderContender.class);
+ when(leaderContender.getAddress()).thenReturn(address);
+
+ LeaderElectionService leaderElectionService = embeddedHaServices.getResourceManagerLeaderElectionService();
+ LeaderRetrievalService leaderRetrievalService = embeddedHaServices.getResourceManagerLeaderRetriever();
+
+ leaderRetrievalService.start(leaderRetrievalListener);
+ leaderElectionService.start(leaderContender);
+
+ ArgumentCaptor<UUID> leaderIdArgumentCaptor = ArgumentCaptor.forClass(UUID.class);
+ verify(leaderContender).grantLeadership(leaderIdArgumentCaptor.capture());
+
+ final UUID leaderId = leaderIdArgumentCaptor.getValue();
+
+ leaderElectionService.confirmLeaderSessionID(leaderId);
+
+ verify(leaderRetrievalListener).notifyLeaderAddress(eq(address), eq(leaderId));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionServiceTest.java
new file mode 100644
index 0000000..3875c4f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionServiceTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.leaderelection;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the {@link SingleLeaderElectionService}.
+ */
+public class SingleLeaderElectionServiceTest {
+
+ private static final Random RND = new Random();
+
+ private final Executor executor = Executors.directExecutor();
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testStartStopAssignLeadership() throws Exception {
+ final UUID uuid = UUID.randomUUID();
+ final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+ final LeaderContender contender = mockContender(service);
+ final LeaderContender otherContender = mockContender(service);
+
+ service.start(contender);
+ verify(contender, times(1)).grantLeadership(uuid);
+
+ service.stop();
+ verify(contender, times(1)).revokeLeadership();
+
+ // start with a new contender - the old contender must not gain another leadership
+ service.start(otherContender);
+ verify(otherContender, times(1)).grantLeadership(uuid);
+
+ verify(contender, times(1)).grantLeadership(uuid);
+ verify(contender, times(1)).revokeLeadership();
+ }
+
+ @Test
+ public void testStopBeforeConfirmingLeadership() throws Exception {
+ final UUID uuid = UUID.randomUUID();
+ final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+ final LeaderContender contender = mock(LeaderContender.class);
+
+ service.start(contender);
+ verify(contender, times(1)).grantLeadership(uuid);
+
+ service.stop();
+
+ // because the leadership was never confirmed, there is no "revoke" call
+ verifyNoMoreInteractions(contender);
+ }
+
+ @Test
+ public void testStartOnlyOnce() throws Exception {
+ final UUID uuid = UUID.randomUUID();
+ final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+ final LeaderContender contender = mock(LeaderContender.class);
+ final LeaderContender otherContender = mock(LeaderContender.class);
+
+ service.start(contender);
+ verify(contender, times(1)).grantLeadership(uuid);
+
+ // should not be possible to start again this with another contender
+ try {
+ service.start(otherContender);
+ fail("should fail with an exception");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+
+ // should not be possible to start this again with the same contender
+ try {
+ service.start(contender);
+ fail("should fail with an exception");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testShutdown() throws Exception {
+ final UUID uuid = UUID.randomUUID();
+ final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+ // create a leader contender and let it grab leadership
+ final LeaderContender contender = mockContender(service);
+ service.start(contender);
+ verify(contender, times(1)).grantLeadership(uuid);
+
+ // some leader listeners
+ final LeaderRetrievalListener listener1 = mock(LeaderRetrievalListener.class);
+ final LeaderRetrievalListener listener2 = mock(LeaderRetrievalListener.class);
+
+ LeaderRetrievalService listenerService1 = service.createLeaderRetrievalService();
+ LeaderRetrievalService listenerService2 = service.createLeaderRetrievalService();
+
+ listenerService1.start(listener1);
+ listenerService2.start(listener2);
+
+ // one listener stops
+ listenerService1.stop();
+
+ // shut down the service
+ service.shutdown();
+
+ // the leader contender and running listener should get error notifications
+ verify(contender, times(1)).handleError(any(Exception.class));
+ verify(listener2, times(1)).handleError(any(Exception.class));
+
+ // the stopped listener gets no notification
+ verify(listener1, times(0)).handleError(any(Exception.class));
+
+ // should not be possible to start again after shutdown
+ try {
+ service.start(contender);
+ fail("should fail with an exception");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+
+ // no additional leadership grant
+ verify(contender, times(1)).grantLeadership(any(UUID.class));
+ }
+
+ @Test
+ public void testImmediateShutdown() throws Exception {
+ final UUID uuid = UUID.randomUUID();
+ final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+ service.shutdown();
+
+ final LeaderContender contender = mock(LeaderContender.class);
+
+ // should not be possible to start
+ try {
+ service.start(contender);
+ fail("should fail with an exception");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+
+ // no additional leadership grant
+ verify(contender, times(0)).grantLeadership(any(UUID.class));
+ }
+
+// @Test
+// public void testNotifyListenersWhenLeaderElected() throws Exception {
+// final UUID uuid = UUID.randomUUID();
+// final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+//
+// final LeaderRetrievalListener listener1 = mock(LeaderRetrievalListener.class);
+// final LeaderRetrievalListener listener2 = mock(LeaderRetrievalListener.class);
+//
+// LeaderRetrievalService listenerService1 = service.createLeaderRetrievalService();
+// LeaderRetrievalService listenerService2 = service.createLeaderRetrievalService();
+//
+// listenerService1.start(listener1);
+// listenerService1.start(listener2);
+//
+// final LeaderContender contender = mockContender(service);
+// service.start(contender);
+//
+// veri
+// }
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ private static LeaderContender mockContender(final LeaderElectionService service) {
+ String address = StringUtils.getRandomString(RND, 5, 10, 'a', 'z');
+ return mockContender(service, address);
+ }
+
+ private static LeaderContender mockContender(final LeaderElectionService service, final String address) {
+ LeaderContender mockContender = mock(LeaderContender.class);
+
+ when(mockContender.getAddress()).thenReturn(address);
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ final UUID uuid = (UUID) invocation.getArguments()[0];
+ service.confirmLeaderSessionID(uuid);
+ return null;
+ }
+ }).when(mockContender).grantLeadership(any(UUID.class));
+
+ return mockContender;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
new file mode 100644
index 0000000..537ed20
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.standalone;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the {@link StandaloneHaServices}.
+ */
+public class StandaloneHaServicesTest extends TestLogger {
+
+ private final String jobManagerAddress = "jobManager";
+ private final String resourceManagerAddress = "resourceManager";
+
+ private StandaloneHaServices standaloneHaServices;
+
+ @Before
+ public void setupTest() {
+
+ standaloneHaServices = new StandaloneHaServices(resourceManagerAddress, jobManagerAddress);
+ }
+
+ @After
+ public void teardownTest() throws Exception {
+ if (standaloneHaServices != null) {
+ standaloneHaServices.closeAndCleanupAllData();
+ standaloneHaServices = null;
+ }
+ }
+
+ /**
+ * Tests that the standalone leader election services return a fixed address and leader session
+ * id.
+ */
+ @Test
+ public void testLeaderElection() throws Exception {
+ JobID jobId = new JobID();
+ LeaderContender jmLeaderContender = mock(LeaderContender.class);
+ LeaderContender rmLeaderContender = mock(LeaderContender.class);
+
+ LeaderElectionService jmLeaderElectionService = standaloneHaServices.getJobManagerLeaderElectionService(jobId);
+ LeaderElectionService rmLeaderElectionService = standaloneHaServices.getResourceManagerLeaderElectionService();
+
+ jmLeaderElectionService.start(jmLeaderContender);
+ rmLeaderElectionService.start(rmLeaderContender);
+
+ verify(jmLeaderContender).grantLeadership(eq(HighAvailabilityServices.DEFAULT_LEADER_ID));
+ verify(rmLeaderContender).grantLeadership(eq(HighAvailabilityServices.DEFAULT_LEADER_ID));
+ }
+
+ /**
+ * Tests that the standalone leader retrieval services return the specified address and the
+ * fixed leader session id.
+ */
+ @Test
+ public void testJobManagerLeaderRetrieval() throws Exception {
+ JobID jobId1 = new JobID();
+ JobID jobId2 = new JobID();
+ LeaderRetrievalListener jmListener1 = mock(LeaderRetrievalListener.class);
+ LeaderRetrievalListener jmListener2 = mock(LeaderRetrievalListener.class);
+ LeaderRetrievalListener rmListener = mock(LeaderRetrievalListener.class);
+
+ LeaderRetrievalService jmLeaderRetrievalService1 = standaloneHaServices.getJobManagerLeaderRetriever(jobId1);
+ LeaderRetrievalService jmLeaderRetrievalService2 = standaloneHaServices.getJobManagerLeaderRetriever(jobId2);
+ LeaderRetrievalService rmLeaderRetrievalService = standaloneHaServices.getResourceManagerLeaderRetriever();
+
+ jmLeaderRetrievalService1.start(jmListener1);
+ jmLeaderRetrievalService2.start(jmListener2);
+ rmLeaderRetrievalService.start(rmListener);
+
+ verify(jmListener1).notifyLeaderAddress(eq(jobManagerAddress), eq(HighAvailabilityServices.DEFAULT_LEADER_ID));
+ verify(jmListener2).notifyLeaderAddress(eq(jobManagerAddress), eq(HighAvailabilityServices.DEFAULT_LEADER_ID));
+ verify(rmListener).notifyLeaderAddress(eq(resourceManagerAddress), eq(HighAvailabilityServices.DEFAULT_LEADER_ID));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
new file mode 100644
index 0000000..06ffe3c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.zookeeper;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
+import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class ZooKeeperRegistryTest extends TestLogger {
+
+ private TestingServer testingServer;
+
+ @Before
+ public void before() throws Exception {
+ testingServer = new TestingServer();
+ }
+
+ @After
+ public void after() throws Exception {
+ testingServer.stop();
+ testingServer = null;
+ }
+
+ /**
+ * Tests that the function of ZookeeperRegistry, setJobRunning(), setJobFinished(), isJobRunning()
+ */
+ @Test
+ public void testZooKeeperRegistry() throws Exception {
+ Configuration configuration = new Configuration();
+ configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+ configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+ final HighAvailabilityServices zkHaService = new ZooKeeperHaServices(
+ ZooKeeperUtils.startCuratorFramework(configuration), Executors.directExecutor(), configuration);
+
+ final RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry();
+
+ try {
+ JobID jobID = JobID.generate();
+ assertEquals(JobSchedulingStatus.PENDING, zkRegistry.getJobSchedulingStatus(jobID));
+
+ zkRegistry.setJobRunning(jobID);
+ assertEquals(JobSchedulingStatus.RUNNING, zkRegistry.getJobSchedulingStatus(jobID));
+
+ zkRegistry.setJobFinished(jobID);
+ assertEquals(JobSchedulingStatus.DONE, zkRegistry.getJobSchedulingStatus(jobID));
+
+ zkRegistry.clearJob(jobID);
+ assertEquals(JobSchedulingStatus.PENDING, zkRegistry.getJobSchedulingStatus(jobID));
+ } finally {
+ zkHaService.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index 49c28e6..2ac3ea7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -28,8 +28,10 @@ import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.NetUtils;
import org.junit.Test;
import org.apache.flink.configuration.Configuration;
@@ -117,10 +119,14 @@ public class JobManagerProcessReapingTest {
if (jobManagerPort != -1) {
try {
- jobManagerRef = JobManager.getJobManagerActorRef(
- "akka.tcp",
- NetUtils.unresolvedHostAndPortToNormalizedString("localhost", jobManagerPort),
- localSystem, new FiniteDuration(25, TimeUnit.SECONDS));
+ final String jobManagerAkkaUrl = AkkaRpcServiceUtils.getRpcUrl(
+ "localhost",
+ jobManagerPort,
+ JobMaster.JOB_MANAGER_NAME,
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION,
+ AkkaRpcServiceUtils.AkkaProtocol.TCP);
+
+ jobManagerRef = AkkaUtils.getActorRef(jobManagerAkkaUrl, localSystem, new FiniteDuration(25L, TimeUnit.SECONDS));
} catch (Throwable t) {
// job manager probably not ready yet
lastError = t;
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index bdff401..fcca173 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -91,7 +91,7 @@ public class JobSubmitTest {
MemoryArchivist.class)._1();
try {
- LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(jmConfig);
+ LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(jmConfig, false);
jmGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
lrs,
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 1f1eb62..bc8c0b60 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -140,7 +140,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
- int num = 20;
+ int num = 10;
ZooKeeperLeaderElectionService[] leaderElectionService = new ZooKeeperLeaderElectionService[num];
TestingContender[] contenders = new TestingContender[num];
@@ -194,7 +194,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
}
}
- assertFalse(deadline.isOverdue());
+ assertFalse("Did not complete the leader reelection in time.", deadline.isOverdue());
assertEquals(num, numberSeenLeaders);
} finally {
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 6a8ff17..b79093f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -22,10 +22,11 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.NetUtils;
@@ -35,7 +36,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import scala.Option;
import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
@@ -104,10 +104,12 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
client[0] = ZooKeeperUtils.startCuratorFramework(config);
client[1] = ZooKeeperUtils.startCuratorFramework(config);
- String wrongHostPort = NetUtils.unresolvedHostAndPortToNormalizedString("1.1.1.1", 1234);
-
- String wrongAddress = JobManager.getRemoteJobManagerAkkaURL(AkkaUtils.getAkkaProtocol(config),
- wrongHostPort, Option.<String>empty());
+ String wrongAddress = AkkaRpcServiceUtils.getRpcUrl(
+ "1.1.1.1",
+ 1234,
+ "foobar",
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION,
+ config);
try {
localHost = InetAddress.getLocalHost();
@@ -126,8 +128,12 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
InetSocketAddress correctInetSocketAddress = new InetSocketAddress(localHost, serverSocket.getLocalPort());
String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(localHost.getHostName(), correctInetSocketAddress.getPort());
- String correctAddress = JobManager.getRemoteJobManagerAkkaURL(AkkaUtils.getAkkaProtocol(config),
- hostPort, Option.<String>empty());
+ String correctAddress = AkkaRpcServiceUtils.getRpcUrl(
+ localHost.getHostName(),
+ correctInetSocketAddress.getPort(),
+ JobMaster.JOB_MANAGER_NAME,
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION,
+ config);
faultyLeaderElectionService = ZooKeeperUtils.createLeaderElectionService(client[0], config);
TestingContender wrongLeaderAddressContender = new TestingContender(wrongAddress, faultyLeaderElectionService);
@@ -192,7 +198,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
- LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+ LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config, false);
InetAddress result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout);
assertEquals(InetAddress.getLocalHost(), result);
@@ -214,7 +220,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
@Override
public void run() {
try {
- LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+ LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config, false);
result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout);
} catch (Exception e) {
exception = e;
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index d3d4d43..cc79c5d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -44,7 +44,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -317,6 +317,7 @@ public class TaskExecutorTest extends TestLogger {
final ResourceID resourceID = ResourceID.generate();
final String resourceManagerAddress = "/resource/manager/address/one";
final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress);
+ final String jobManagerAddress = "localhost";
final TestingSerialRpcService rpc = new TestingSerialRpcService();
try {
@@ -335,7 +336,9 @@ public class TaskExecutorTest extends TestLogger {
TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
- NonHaServices haServices = new NonHaServices(resourceManagerAddress);
+ StandaloneHaServices haServices = new StandaloneHaServices(
+ resourceManagerAddress,
+ jobManagerAddress);
final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
final SlotReport slotReport = new SlotReport();
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index 2fafe5b..d904004 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.NetUtils;
@@ -138,7 +139,7 @@ public abstract class TaskManagerProcessReapingTestBase {
// is started and the TaskManager is up
String taskManagerActorName = String.format("akka.tcp://flink@%s/user/%s",
"localhost:" + taskManagerPort,
- TaskManager.TASK_MANAGER_NAME());
+ TaskExecutor.TASK_MANAGER_NAME);
ActorRef taskManagerRef = null;
Throwable lastError = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index f3b1d4a..5f35229 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
@@ -182,7 +183,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
// available. we give it the regular JobManager akka URL
taskManager = createTaskManager(
actorSystem,
- JobManager.getLocalJobManagerAkkaURL(Option.<String>empty()),
+ AkkaUtils.getLocalAkkaURL(JobMaster.JOB_MANAGER_NAME),
new Configuration(),
true,
false);
@@ -248,7 +249,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
// start the taskManager actor
taskManager = createTaskManager(
actorSystem,
- JobManager.getLocalJobManagerAkkaURL(Option.<String>empty()),
+ AkkaUtils.getLocalAkkaURL(JobMaster.JOB_MANAGER_NAME),
tmConfig,
true,
false);
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
index 48c65c0..6e1fb74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
@@ -23,15 +23,17 @@ import akka.actor.ActorSystem;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.JobManagerMode;
-import org.apache.flink.util.NetUtils;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -143,13 +145,15 @@ public class JobManagerProcess extends TestJvmProcess {
/**
* Returns the Akka URL of this JobManager.
*/
- public String getJobManagerAkkaURL(FiniteDuration timeout) throws InterruptedException {
+ public String getJobManagerAkkaURL(FiniteDuration timeout) throws InterruptedException, UnknownHostException {
int port = getJobManagerPort(timeout);
- return JobManager.getRemoteJobManagerAkkaURL(
- AkkaUtils.getAkkaProtocol(config),
- NetUtils.unresolvedHostAndPortToNormalizedString("localhost", port),
- Option.<String>empty());
+ return AkkaRpcServiceUtils.getRpcUrl(
+ "localhost",
+ port,
+ JobMaster.JOB_MANAGER_NAME,
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION,
+ config);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index 0daac2e..2404fb9 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -20,7 +20,10 @@ package org.apache.flink.runtime.akka
import java.net.InetSocketAddress
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.AkkaProtocol
import org.apache.flink.util.NetUtils
import org.junit.runner.RunWith
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
@@ -38,19 +41,21 @@ class AkkaUtilsTest
val address = new InetSocketAddress(host, port)
- val remoteAkkaURL = JobManager.getRemoteJobManagerAkkaURL(
- "akka.tcp",
- NetUtils.unresolvedHostAndPortToNormalizedString(host, port),
- Some("actor"))
+ val remoteAkkaUrl = AkkaRpcServiceUtils.getRpcUrl(
+ host,
+ port,
+ "actor",
+ AddressResolution.NO_ADDRESS_RESOLUTION,
+ AkkaProtocol.TCP)
- val result = AkkaUtils.getInetSockeAddressFromAkkaURL(remoteAkkaURL)
+ val result = AkkaUtils.getInetSockeAddressFromAkkaURL(remoteAkkaUrl)
result should equal(address)
}
test("getHostFromAkkaURL should throw an exception if the InetSocketAddress cannot be " +
"retrieved") {
- val localAkkaURL = JobManager.getLocalJobManagerAkkaURL(Some("actor"))
+ val localAkkaURL = AkkaUtils.getLocalAkkaURL("actor")
intercept[Exception] {
AkkaUtils.getInetSockeAddressFromAkkaURL(localAkkaURL)
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
index 1489fb2..97a001d 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
@@ -59,7 +59,10 @@ class JobManagerConnectionTest {
mustReturnWithinTimeout(Duration(5*timeout, TimeUnit.MILLISECONDS)) {
() => {
try {
- JobManager.getJobManagerActorRef(endpoint, actorSystem, config)
+ AkkaUtils.getActorRef(
+ endpoint,
+ actorSystem,
+ AkkaUtils.getLookupTimeout(config))
fail("Should fail since the JobManager is not reachable")
}
catch {
@@ -95,7 +98,10 @@ class JobManagerConnectionTest {
mustReturnWithinTimeout(Duration(5*timeout, TimeUnit.MILLISECONDS)) {
() => {
try {
- JobManager.getJobManagerActorRef(endpoint, actorSystem, config)
+ AkkaUtils.getActorRef(
+ endpoint,
+ actorSystem,
+ AkkaUtils.getLookupTimeout(config))
fail("Should fail since the JobManager is not reachable")
}
catch {
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 876e26b..d139a3f 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -35,10 +35,12 @@ import org.apache.flink.runtime.client.JobClient
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.concurrent.{ScheduledExecutor, ScheduledExecutorServiceAdapter}
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices
+import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
import org.apache.flink.runtime.jobgraph.JobGraph
import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
+import org.apache.flink.runtime.jobmaster.JobMaster
+import org.apache.flink.runtime.jobmaster.JobMaster.{ARCHIVE_NAME, JOB_MANAGER_NAME}
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService
import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
import org.apache.flink.runtime.taskmanager.TaskManager
@@ -516,8 +518,8 @@ object TestingUtils {
actorSystem,
futureExecutor,
ioExecutor,
- Some(prefix + JobManager.JOB_MANAGER_NAME),
- Some(prefix + JobManager.ARCHIVE_NAME),
+ Some(prefix + JobMaster.JOB_MANAGER_NAME),
+ Some(prefix + JobMaster.ARCHIVE_NAME),
jobManagerClass,
classOf[MemoryArchivist])
@@ -605,7 +607,6 @@ object TestingUtils {
new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID)
}
-
class ForwardingActor(val target: ActorRef, val leaderSessionID: Option[UUID])
extends FlinkActor with LeaderSessionMessageFilter with LogMessages {
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index df4f370..5eadba6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -275,7 +275,8 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
try {
LeaderRetrievalService lrService =
LeaderRetrievalUtils.createLeaderRetrievalService(
- cluster.configuration());
+ cluster.configuration(),
+ false);
JobExecutionResult result = JobClient.submitJobAndWait(
clientActorSystem,