You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/11 07:11:09 UTC

[GitHub] asfgit closed pull request #6759: [FLINK-10247][Metrics] Run MetricQueryService in a dedicated actor system

asfgit closed pull request #6759: [FLINK-10247][Metrics] Run MetricQueryService in a dedicated actor system
URL: https://github.com/apache/flink/pull/6759
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 56e45762263..a72d39e62cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -30,6 +30,7 @@
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rpc.akka.AkkaExecutorMode;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
@@ -46,6 +47,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.File;
@@ -85,13 +87,38 @@
 	 * @param portRangeDefinition The port range to choose a port from.
 	 * @param logger The logger to output log information.
 	 * @return The ActorSystem which has been started
-	 * @throws Exception
+	 * @throws Exception Thrown when actor system cannot be started in specified port range
+	 */
+	public static ActorSystem startActorSystem(
+		Configuration configuration,
+		String listeningAddress,
+		String portRangeDefinition,
+		Logger logger) throws Exception {
+		return startActorSystem(
+			configuration,
+			listeningAddress,
+			portRangeDefinition,
+			logger,
+			AkkaExecutorMode.FORK_JOIN_EXECUTOR);
+	}
+
+	/**
+	 * Starts an ActorSystem with the given configuration listening at the address/ports.
+	 *
+	 * @param configuration The Flink configuration
+	 * @param listeningAddress The address to listen at.
+	 * @param portRangeDefinition The port range to choose a port from.
+	 * @param logger The logger to output log information.
+	 * @param executorMode The executor mode of Akka actor system.
+	 * @return The ActorSystem which has been started
+	 * @throws Exception Thrown when actor system cannot be started in specified port range
 	 */
 	public static ActorSystem startActorSystem(
 			Configuration configuration,
 			String listeningAddress,
 			String portRangeDefinition,
-			Logger logger) throws Exception {
+			Logger logger,
+			@Nonnull AkkaExecutorMode executorMode) throws Exception {
 
 		// parse port range definition and create port iterator
 		Iterator<Integer> portsIterator;
@@ -117,7 +144,7 @@ public static ActorSystem startActorSystem(
 			}
 
 			try {
-				return startActorSystem(configuration, listeningAddress, port, logger);
+				return startActorSystem(configuration, listeningAddress, port, logger, executorMode);
 			}
 			catch (Exception e) {
 				// we can continue to try if this contains a netty channel exception
@@ -134,12 +161,31 @@ public static ActorSystem startActorSystem(
 			+ portRangeDefinition);
 	}
 
+	/**
+	 * Starts an Actor System at a specific port.
+	 *
+	 * @param configuration The Flink configuration.
+	 * @param listeningAddress The address to listen at.
+	 * @param listeningPort The port to listen at.
+	 * @param logger the logger to output log information.
+	 * @return The ActorSystem which has been started.
+	 * @throws Exception
+	 */
+	public static ActorSystem startActorSystem(
+		Configuration configuration,
+		String listeningAddress,
+		int listeningPort,
+		Logger logger) throws Exception {
+		return startActorSystem(configuration, listeningAddress, listeningPort, logger, AkkaExecutorMode.FORK_JOIN_EXECUTOR);
+	}
+
 	/**
 	 * Starts an Actor System at a specific port.
 	 * @param configuration The Flink configuration.
 	 * @param listeningAddress The address to listen at.
 	 * @param listeningPort The port to listen at.
 	 * @param logger the logger to output log information.
+	 * @param executorMode The executor mode of Akka actor system.
 	 * @return The ActorSystem which has been started.
 	 * @throws Exception
 	 */
@@ -147,7 +193,8 @@ public static ActorSystem startActorSystem(
 				Configuration configuration,
 				String listeningAddress,
 				int listeningPort,
-				Logger logger) throws Exception {
+				Logger logger,
+				AkkaExecutorMode executorMode) throws Exception {
 
 		String hostPortUrl = NetUtils.unresolvedHostAndPortToNormalizedString(listeningAddress, listeningPort);
 		logger.info("Trying to start actor system at {}", hostPortUrl);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 9eaef34a33a..772a95e471d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -49,6 +49,8 @@
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaActorSystemService;
+import org.apache.flink.runtime.rpc.akka.AkkaExecutorMode;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityContext;
@@ -129,6 +131,9 @@
 	@GuardedBy("lock")
 	private RpcService commonRpcService;
 
+	@GuardedBy("lock")
+	private AkkaActorSystemService metricQueryActorSystemService;
+
 	@GuardedBy("lock")
 	private ArchivedExecutionGraphStore archivedExecutionGraphStore;
 
@@ -259,9 +264,9 @@ protected void initializeServices(Configuration configuration) throws Exception
 			metricRegistry = createMetricRegistry(configuration);
 
 			// TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint
-			// start the MetricQueryService
-			final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem();
-			metricRegistry.startQueryService(actorSystem, null);
+			// Start actor system for metric query service on any available port
+			metricQueryActorSystemService = new AkkaActorSystemService(configuration, bindAddress, "0", AkkaExecutorMode.SINGLE_THREAD_EXECUTOR);
+			metricRegistry.startQueryService(metricQueryActorSystemService.getActorSystem(), null);
 
 			archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());
 
@@ -288,14 +293,22 @@ protected String getRPCPortRange(Configuration configuration) {
 	}
 
 	protected RpcService createRpcService(
-			Configuration configuration,
-			String bindAddress,
-			String portRange) throws Exception {
-		ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG);
+		Configuration configuration,
+		String bindAddress,
+		String portRange,
+		AkkaExecutorMode executorMode) throws Exception {
+		ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG, executorMode);
 		FiniteDuration duration = AkkaUtils.getTimeout(configuration);
 		return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit()));
 	}
 
+	protected RpcService createRpcService(
+		Configuration configuration,
+		String bindAddress,
+		String portRange) throws Exception {
+		return createRpcService(configuration, bindAddress, portRange, AkkaExecutorMode.FORK_JOIN_EXECUTOR);
+	}
+
 	protected HighAvailabilityServices createHaServices(
 		Configuration configuration,
 		Executor executor) throws Exception {
@@ -367,6 +380,10 @@ protected MetricRegistryImpl createMetricRegistry(Configuration configuration) {
 				terminationFutures.add(metricRegistry.shutdown());
 			}
 
+			if (metricQueryActorSystemService != null) {
+				terminationFutures.add(metricQueryActorSystemService.stopActorSystem());
+			}
+
 			if (commonRpcService != null) {
 				terminationFutures.add(commonRpcService.stopService());
 			}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 7929e3170b7..2d1aaa3f4f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
@@ -47,7 +48,6 @@
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
@@ -586,9 +586,15 @@ public void unRegisterInfoMessageListener(final String address) {
 		for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerRegistrationEntry : taskExecutors.entrySet()) {
 			final ResourceID tmResourceId = workerRegistrationEntry.getKey();
 			final WorkerRegistration<WorkerType> workerRegistration = workerRegistrationEntry.getValue();
-			final String taskManagerAddress = workerRegistration.getTaskExecutorGateway().getAddress();
-			final String tmMetricQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) +
-				MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + tmResourceId.getResourceIdString();
+			final TaskExecutorGateway taskExecutorGateway = workerRegistration.getTaskExecutorGateway();
+			String tmMetricQueryServicePath = null;
+			try {
+				tmMetricQueryServicePath =
+					taskExecutorGateway.getMetricQueryServiceAddress(AkkaUtils.getDefaultTimeout()).get();
+			} catch (Exception e) {
+				log.info("Fail to get task manager's metric query service path for worker: {}", taskExecutorGateway.getAddress());
+				continue;
+			}
 
 			metricQueryServicePaths.add(Tuple2.of(tmResourceId, tmMetricQueryServicePath));
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaActorSystemService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaActorSystemService.java
new file mode 100644
index 00000000000..d8dac344836
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaActorSystemService.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorSystem;
+import akka.actor.Terminated;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Service provide access, start, stop methods to an actor system.
+ */
+public class AkkaActorSystemService {
+
+	private static final Logger LOG = LoggerFactory.getLogger(AkkaActorSystemService.class);
+
+	private ActorSystem actorSystem;
+
+	private final Object lock = new Object();
+
+	private volatile boolean stopped;
+
+	private CompletableFuture<Void> terminationFuture;
+
+	public AkkaActorSystemService(
+		Configuration configuration,
+		String listeningAddress,
+		String portRangeDefinition,
+		@Nonnull AkkaExecutorMode executorMode) throws Exception {
+		actorSystem = BootstrapTools.startActorSystem(
+			configuration,
+			listeningAddress,
+			"0",
+			LOG,
+			AkkaExecutorMode.SINGLE_THREAD_EXECUTOR);
+
+		stopped = false;
+		terminationFuture = new CompletableFuture<>();
+	}
+
+	public ActorSystem getActorSystem() {
+		return actorSystem;
+	}
+
+	public CompletableFuture<Void> stopActorSystem() {
+		synchronized (lock) {
+			if (stopped) {
+				return terminationFuture;
+			}
+			stopped = true;
+		}
+
+		LOG.info("Stopping Akka actor system.");
+
+		final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());
+
+		actorSystemTerminationFuture.whenComplete(
+			(Terminated ignored, Throwable throwable) -> {
+				if (throwable != null) {
+					terminationFuture.completeExceptionally(throwable);
+				} else {
+					terminationFuture.complete(null);
+				}
+
+				LOG.info("Stopped Akka actor system.");
+			}
+		);
+
+		return terminationFuture;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaExecutorMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaExecutorMode.java
new file mode 100644
index 00000000000..014d7f61b40
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaExecutorMode.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+/**
+ * Options to specify which executor to use in a rpc service based on Akka
+ */
+public enum AkkaExecutorMode {
+	/** Used by default, use fork-join-executor dispatcher **/
+	FORK_JOIN_EXECUTOR,
+	/** Use single thread (Pinned) dispatcher **/
+	SINGLE_THREAD_EXECUTOR
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 3a626986361..36eb2aa7764 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -35,6 +35,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -71,8 +72,29 @@
 	 * @throws Exception      Thrown is some other error occurs while creating akka actor system
 	 */
 	public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception {
+		return createRpcService(hostname, port, configuration, AkkaExecutorMode.FORK_JOIN_EXECUTOR);
+	}
+
+	/**
+	 * Utility method to create RPC service from configuration and hostname, port.
+	 *
+	 * @param hostname   The hostname/address that describes the TaskManager's data location.
+	 * @param port           If true, the TaskManager will not initiate the TCP network stack.
+	 * @param configuration                 The configuration for the TaskManager.
+	 * @param executorMode	The execution mode of Akka actor system.
+	 * @return   The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
+	 * @throws IOException      Thrown, if the actor system can not bind to the address
+	 * @throws Exception      Thrown is some other error occurs while creating akka actor system
+	 */
+	public static RpcService createRpcService(
+		String hostname,
+		int port,
+		Configuration configuration,
+		@Nonnull AkkaExecutorMode executorMode) throws Exception {
 		LOG.info("Starting AkkaRpcService at {}.", NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port));
 
+		Preconditions.checkNotNull(executorMode);
+
 		final ActorSystem actorSystem;
 
 		try {
@@ -80,7 +102,7 @@ public static RpcService createRpcService(String hostname, int port, Configurati
 
 			if (hostname != null && !hostname.isEmpty()) {
 				// remote akka config
-				akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);
+				akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port, executorMode);
 			} else {
 				// local akka config
 				akkaConfig = AkkaUtils.getAkkaConfig(configuration);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index ae69e561bc7..df6d0dd85de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -157,6 +157,9 @@
 
 	private final BlobCacheService blobCacheService;
 
+	/** The path to metric query service on this Task Manager. */
+	private final String metricQueryServicePath;
+
 	// --------- TaskManager services --------
 
 	/** The connection information of this task manager. */
@@ -211,6 +214,7 @@ public TaskExecutor(
 			TaskManagerServices taskExecutorServices,
 			HeartbeatServices heartbeatServices,
 			TaskManagerMetricGroup taskManagerMetricGroup,
+			String metricQueryServicePath,
 			BlobCacheService blobCacheService,
 			FatalErrorHandler fatalErrorHandler) {
 
@@ -224,6 +228,7 @@ public TaskExecutor(
 		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 		this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
 		this.blobCacheService = checkNotNull(blobCacheService);
+		this.metricQueryServicePath = metricQueryServicePath;
 
 		this.taskSlotTable = taskExecutorServices.getTaskSlotTable();
 		this.jobManagerTable = taskExecutorServices.getJobManagerTable();
@@ -847,6 +852,11 @@ public void heartbeatFromResourceManager(ResourceID resourceID) {
 		}
 	}
 
+	@Override
+	public CompletableFuture<String> getMetricQueryServiceAddress(Time timeout) {
+		return CompletableFuture.completedFuture(metricQueryServicePath);
+	}
+
 	// ----------------------------------------------------------------------
 	// Disconnection RPCs
 	// ----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 4f792896216..abbb979cfa5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -195,4 +195,11 @@
 	 * @return Future which is completed with the {@link TransientBlobKey} of the uploaded file.
 	 */
 	CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType, @RpcTimeout Time timeout);
+
+	/**
+	 * Returns the fully qualified address of Metric Query Service on the TaskManager.
+	 *
+	 * @return Future String with Fully qualified (RPC) address of Metric Query Service on the TaskManager.
+	 */
+	CompletableFuture<String> getMetricQueryServiceAddress(@RpcTimeout Time timeout);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 40e628a4a38..5279a973796 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -43,7 +43,8 @@
 import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaActorSystemService;
+import org.apache.flink.runtime.rpc.akka.AkkaExecutorMode;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -58,11 +59,14 @@
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.Preconditions;
 
 import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.net.BindException;
@@ -102,6 +106,8 @@
 
 	private final RpcService rpcService;
 
+	private final AkkaActorSystemService metricQueryActorSystemService;
+
 	private final HighAvailabilityServices highAvailabilityServices;
 
 	private final MetricRegistryImpl metricRegistry;
@@ -133,13 +139,14 @@ public TaskManagerRunner(Configuration configuration, ResourceID resourceId) thr
 			HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
 
 		rpcService = createRpcService(configuration, highAvailabilityServices);
+		metricQueryActorSystemService = createMetricQueryActorSystemService(configuration, highAvailabilityServices);
 
 		HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
 
 		metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
 
 		// TODO: Temporary hack until the MetricQueryService has been ported to RpcEndpoint
-		final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();
+		final ActorSystem actorSystem = metricQueryActorSystemService.getActorSystem();
 		metricRegistry.startQueryService(actorSystem, resourceId);
 
 		blobCacheService = new BlobCacheService(
@@ -215,6 +222,10 @@ public void start() throws Exception {
 				exception = ExceptionUtils.firstOrSuppressed(e, exception);
 			}
 
+			if (metricQueryActorSystemService != null) {
+				terminationFutures.add(metricQueryActorSystemService.stopActorSystem());
+			}
+
 			try {
 				highAvailabilityServices.close();
 			} catch (Exception e) {
@@ -376,6 +387,8 @@ public static TaskExecutor startTaskManager(
 
 		TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
 
+		String metricQueryServicePath = metricRegistry.getMetricQueryServicePath();
+
 		return new TaskExecutor(
 			rpcService,
 			taskManagerConfiguration,
@@ -383,6 +396,7 @@ public static TaskExecutor startTaskManager(
 			taskManagerServices,
 			heartbeatServices,
 			taskManagerMetricGroup,
+			metricQueryServicePath,
 			blobCacheService,
 			fatalErrorHandler);
 	}
@@ -419,6 +433,50 @@ public static RpcService createRpcService(
 
 		final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT);
 
+		return bindWithPort(configuration, taskManagerHostname, portRangeDefinition, AkkaExecutorMode.FORK_JOIN_EXECUTOR);
+	}
+
+	/**
+	 * Create a actor system service for the metric query service.
+	 *
+	 * @param configuration The configuration for the TaskManager.
+	 * @param haServices to use for the task manager hostname retrieval
+	 */
+	public static AkkaActorSystemService createMetricQueryActorSystemService(
+		final Configuration configuration,
+		final HighAvailabilityServices haServices) throws Exception {
+
+		checkNotNull(configuration);
+		checkNotNull(haServices);
+
+		String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+
+		if (taskManagerHostname != null) {
+			LOG.info("Using configured hostname/address for MetricQueryService: {}.", taskManagerHostname);
+		} else {
+			Time lookupTimeout = Time.milliseconds(AkkaUtils.getLookupTimeout(configuration).toMillis());
+
+			InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
+				haServices.getResourceManagerLeaderRetriever(),
+				lookupTimeout);
+
+			taskManagerHostname = taskManagerAddress.getHostName();
+
+			LOG.info("MetricQueryService will use hostname/address '{}' ({}) for communication.",
+				taskManagerHostname, taskManagerAddress.getHostAddress());
+		}
+
+		return new AkkaActorSystemService(configuration, taskManagerHostname, "0", AkkaExecutorMode.SINGLE_THREAD_EXECUTOR);
+	}
+
+	private static RpcService bindWithPort(
+		Configuration configuration,
+		String taskManagerHostname,
+		String portRangeDefinition,
+		@Nonnull AkkaExecutorMode executorMode) throws Exception{
+
+		Preconditions.checkNotNull(executorMode);
+
 		// parse port range definition and create port iterator
 		Iterator<Integer> portsIterator;
 		try {
@@ -429,7 +487,7 @@ public static RpcService createRpcService(
 
 		while (portsIterator.hasNext()) {
 			try {
-				return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portsIterator.next(), configuration);
+				return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portsIterator.next(), configuration, executorMode);
 			}
 			catch (Exception e) {
 				// we can continue to try if this contains a netty channel exception
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 9ce1865204f..216876d1cc3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -28,6 +28,7 @@ import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException, SecurityOptions}
 import org.apache.flink.runtime.net.SSLUtils
+import org.apache.flink.runtime.rpc.akka.AkkaExecutorMode
 import org.apache.flink.util.NetUtils
 import org.jboss.netty.channel.ChannelException
 import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory}
@@ -117,7 +118,23 @@ object AkkaUtils {
   }
 
   /**
-    * Return a remote Akka config for the given configuration values.
+    * Returns a remote Akka config for the given configuration values.
+    *
+    * @param configuration containing the user provided configuration values
+    * @param hostname to bind against. If null, then the loopback interface is used
+    * @param port to bind against
+    * @param executorMode containing the user specified mode of executor
+    * @return A remote Akka config
+    */
+  def getAkkaConfig(configuration: Configuration,
+                    hostname: String,
+                    port: Int,
+                    executorMode: AkkaExecutorMode): Config = {
+    getAkkaConfig(configuration, Some((hostname, port)), executorMode)
+  }
+
+  /**
+    * Returns a remote Akka config for the given configuration values.
     *
     * @param configuration containing the user provided configuration values
     * @param hostname to bind against. If null, then the loopback interface is used
@@ -153,7 +170,25 @@ object AkkaUtils {
   @throws(classOf[UnknownHostException])
   def getAkkaConfig(configuration: Configuration,
                     externalAddress: Option[(String, Int)]): Config = {
-    val defaultConfig = getBasicAkkaConfig(configuration)
+    getAkkaConfig(configuration, externalAddress, AkkaExecutorMode.FORK_JOIN_EXECUTOR)
+  }
+
+  /**
+    * Creates an akka config with the provided configuration values. If the listening address is
+    * specified, then the actor system will listen on the respective address.
+    *
+    * @param configuration instance containing the user provided configuration values
+    * @param externalAddress optional tuple of bindAddress and port to be reachable at.
+    *                        If None is given, then an Akka config for local actor system
+    *                        will be returned
+    * @return Akka config
+    */
+  @throws(classOf[UnknownHostException])
+  def getAkkaConfig(configuration: Configuration,
+                    externalAddress: Option[(String, Int)],
+                    executorMode: AkkaExecutorMode): Config = {
+    val executorConfig = getExecutorConfigByExecutorMode(configuration, executorMode)
+    val defaultConfig = getBasicAkkaConfig(configuration, executorConfig)
 
     externalAddress match {
 
@@ -185,9 +220,12 @@ object AkkaUtils {
    * Gets the basic Akka config which is shared by remote and local actor systems.
    *
    * @param configuration instance which contains the user specified values for the configuration
+   * @param executorConfig the akka config for particular executor
    * @return Flink's basic Akka config
    */
-  private def getBasicAkkaConfig(configuration: Configuration): Config = {
+  private def getBasicAkkaConfig(
+    configuration: Configuration,
+    executorConfig: String): Config = {
     val akkaThroughput = configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT)
     val lifecycleEvents = configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)
 
@@ -205,24 +243,6 @@ object AkkaUtils {
     val supervisorStrategy = classOf[StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
       .getCanonicalName
 
-    val forkJoinExecutorParallelismFactor =
-      configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR)
-
-    val forkJoinExecutorParallelismMin =
-      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN)
-
-    val forkJoinExecutorParallelismMax =
-      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX)
-
-    val forkJoinExecutorConfig =
-      s"""
-         | fork-join-executor {
-         |   parallelism-factor = $forkJoinExecutorParallelismFactor
-         |   parallelism-min = $forkJoinExecutorParallelismMin
-         |   parallelism-max = $forkJoinExecutorParallelismMax
-         | }
-       """.stripMargin
-
     val config =
       s"""
         |akka {
@@ -250,7 +270,7 @@ object AkkaUtils {
         |   default-dispatcher {
         |     throughput = $akkaThroughput
         |
-        |   $forkJoinExecutorConfig
+        |   $executorConfig
         |   }
         | }
         |}
@@ -259,6 +279,39 @@ object AkkaUtils {
     ConfigFactory.parseString(config)
   }
 
+  def getExecutorConfigByExecutorMode(
+    configuration: Configuration,
+    executorMode: AkkaExecutorMode): String = {
+    executorMode match {
+      case AkkaExecutorMode.FORK_JOIN_EXECUTOR =>
+        val forkJoinExecutorParallelismFactor =
+          configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR)
+
+        val forkJoinExecutorParallelismMin =
+          configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN)
+
+        val forkJoinExecutorParallelismMax =
+          configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX)
+
+        s"""
+           | fork-join-executor {
+           |   parallelism-factor = $forkJoinExecutorParallelismFactor
+           |   parallelism-min = $forkJoinExecutorParallelismMin
+           |   parallelism-max = $forkJoinExecutorParallelismMax
+           | }
+        """.stripMargin
+
+      case AkkaExecutorMode.SINGLE_THREAD_EXECUTOR =>
+        s"""
+           | single-thread-executor {
+           |   executor = "thread-pool-executor"
+           |   type = PinnedDispatcher
+           |   threads-priority = ${Thread.MIN_PRIORITY}
+           | }
+        """.stripMargin
+    }
+  }
+
   def testDispatcherConfig: Config = {
     val config =
       s"""
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index f0e0efea970..a3fb05f199f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -164,6 +164,7 @@ public void testSlotAllocation() throws Exception {
 			taskManagerServices,
 			heartbeatServices,
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+			"",
 			new BlobCacheService(
 				configuration,
 				new VoidBlobStore(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 168b251da26..f57e3121fbe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -270,6 +270,7 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception {
 			taskManagerServices,
 			heartbeatServices,
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+			"",
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -358,6 +359,7 @@ public void testHeartbeatTimeoutWithResourceManager() throws Exception {
 			taskManagerServices,
 			heartbeatServices,
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+			"",
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -507,6 +509,7 @@ public void testHeartbeatSlotReporting() throws Exception {
 			taskManagerServices,
 			heartbeatServices,
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+			"",
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -585,6 +588,7 @@ public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
 			taskManagerServices,
 			new HeartbeatServices(1000L, 1000L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+			"",
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -648,6 +652,7 @@ public void testTriggerRegistrationOnLeaderChange() throws Exception {
 			taskManagerServices,
 			new HeartbeatServices(1000L, 1000L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+			"",
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -770,6 +775,7 @@ public void testTaskSubmission() throws Exception {
 			taskManagerServices,
 			new HeartbeatServices(1000L, 1000L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+			"",
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -870,6 +876,7 @@ public void testJobLeaderDetection() throws Exception {
 			taskManagerServices,
 			new HeartbeatServices(1000L, 1000L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+			"",
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -985,6 +992,7 @@ public void testSlotAcceptance() throws Exception {
 			taskManagerServices,
 			new HeartbeatServices(1000L, 1000L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+			"",
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -1104,6 +1112,7 @@ public void testSubmitTaskBeforeAcceptSlot() throws Exception {
 			taskManagerServices,
 			new HeartbeatServices(1000L, 1000L),
 			taskManagerMetricGroup,
+			"",
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -1219,6 +1228,7 @@ public void testFilterOutDuplicateJobMasterRegistrations() throws Exception {
 			taskManagerServices,
 			heartbeatServicesMock,
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+			"",
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -1291,6 +1301,7 @@ public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception {
 			taskManagerServices,
 			heartbeatServices,
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+			"",
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -1347,6 +1358,7 @@ public void testRemoveJobFromJobLeaderService() throws Exception {
 			taskManagerServices,
 			new HeartbeatServices(1000L, 1000L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+			"",
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -1439,6 +1451,7 @@ public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exceptio
 			taskManagerServices,
 			new HeartbeatServices(heartbeatInterval, 10L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+			"",
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -1545,6 +1558,7 @@ public void testReconnectionAttemptIfExplicitlyDisconnected() throws Exception {
 				.build(),
 			new HeartbeatServices(heartbeatInterval, 1000L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+			"",
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -1686,6 +1700,7 @@ private TaskExecutor createTaskExecutor(TaskManagerServices taskManagerServices)
                 taskManagerServices,
                 new HeartbeatServices(1000L, 1000L),
                 UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+                "",
                 dummyBlobCacheService,
                 testingFatalErrorHandler);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 912de36c881..ff6731c35d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -152,6 +152,11 @@ public void disconnectResourceManager(Exception cause) {
 		return FutureUtils.completedExceptionally(new UnsupportedOperationException());
 	}
 
+	@Override
+	public CompletableFuture<String> getMetricQueryServiceAddress(Time timeout) {
+		return CompletableFuture.completedFuture(address);
+	}
+
 	@Override
 	public String getAddress() {
 		return address;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services