You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/16 21:21:24 UTC

[GitHub] asfgit closed pull request #6859: Revert FLINK-10354 and FLINK-10247 for release-1.6

asfgit closed pull request #6859: Revert FLINK-10354 and FLINK-10247 for release-1.6
URL: https://github.com/apache/flink/pull/6859
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html
index 5f52abd38ed..0fe9d0c36f8 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -7,11 +7,6 @@
         </tr>
     </thead>
     <tbody>
-        <tr>
-            <td><h5>metrics.internal.query-service.port</h5></td>
-            <td style="word-wrap: break-word;">"0"</td>
-            <td>The port range used for Flink's internal metric query service. Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Flink components are running on the same machine. Per default Flink will pick a random port.</td>
-        </tr>
         <tr>
             <td><h5>metrics.latency.granularity</h5></td>
             <td style="word-wrap: break-word;">"subtask"</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index e76b7f2dbbe..fc6b3c14c46 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -130,18 +130,6 @@
 			.defaultValue(128)
 			.withDescription("Defines the number of measured latencies to maintain at each operator.");
 
-	/**
-	 * The default network port range for Flink's internal metric query service. The {@code "0"} means that
-	 * Flink searches for a free port.
-	 */
-	public static final ConfigOption<String> QUERY_SERVICE_PORT =
-		key("metrics.internal.query-service.port")
-		.defaultValue("0")
-		.withDescription("The port range used for Flink's internal metric query service. Accepts a list of ports " +
-			"(“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of " +
-			"ports to avoid collisions when multiple Flink components are running on the same machine. Per default " +
-			"Flink will pick a random port.");
-
 	private MetricOptions() {
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
deleted file mode 100644
index 89dcea45093..00000000000
--- a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.types;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.util.NoSuchElementException;
-import java.util.Optional;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-/**
- * Serializable {@link Optional}.
- */
-public final class SerializableOptional<T extends Serializable> implements Serializable {
-	private static final long serialVersionUID = -3312769593551775940L;
-
-	private static final SerializableOptional<?> EMPTY = new SerializableOptional<>(null);
-
-	@Nullable
-	private final T value;
-
-	private SerializableOptional(@Nullable T value) {
-		this.value = value;
-	}
-
-	public T get() {
-		if (value == null) {
-			throw new NoSuchElementException("No value present");
-		}
-		return value;
-	}
-
-	public boolean isPresent() {
-		return value != null;
-	}
-
-	public void ifPresent(Consumer<? super T> consumer) {
-		if (value != null) {
-			consumer.accept(value);
-		}
-	}
-
-	public <R> Optional<R> map(Function<? super T, ? extends R> mapper) {
-		if (value == null) {
-			return Optional.empty();
-		} else {
-			return Optional.ofNullable(mapper.apply(value));
-		}
-	}
-
-	public static <T extends Serializable> SerializableOptional<T> of(@Nonnull T value) {
-		return new SerializableOptional<>(value);
-	}
-
-	public static <T extends Serializable> SerializableOptional<T> ofNullable(@Nullable T value) {
-		if (value == null) {
-			return empty();
-		} else {
-			return of(value);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	public static <T extends Serializable> SerializableOptional<T> empty() {
-		return (SerializableOptional<T>) EMPTY;
-	}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 57337b6286f..e936b246222 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -839,22 +839,28 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) thro
 			// the pending checkpoint must be discarded after the finalization
 			Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);
 
-			try {
-				completedCheckpointStore.addCheckpoint(completedCheckpoint);
-			} catch (Exception exception) {
-				// we failed to store the completed checkpoint. Let's clean up
-				executor.execute(new Runnable() {
-					@Override
-					public void run() {
-						try {
-							completedCheckpoint.discardOnFailedStoring();
-						} catch (Throwable t) {
-							LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
+			// TODO: add savepoints to completed checkpoint store once FLINK-4815 has been completed
+			if (!completedCheckpoint.getProperties().isSavepoint()) {
+				try {
+					completedCheckpointStore.addCheckpoint(completedCheckpoint);
+				} catch (Exception exception) {
+					// we failed to store the completed checkpoint. Let's clean up
+					executor.execute(new Runnable() {
+						@Override
+						public void run() {
+							try {
+								completedCheckpoint.discardOnFailedStoring();
+							} catch (Throwable t) {
+								LOG.warn("Could not properly discard completed checkpoint {} of job {}.", completedCheckpoint.getCheckpointID(), job, t);
+							}
 						}
-					}
-				});
+					});
 
-				throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);
+					throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);
+				}
+
+				// drop those pending checkpoints that are at prior to the completed one
+				dropSubsumedCheckpoints(checkpointId);
 			}
 		} finally {
 			pendingCheckpoints.remove(checkpointId);
@@ -864,9 +870,6 @@ public void run() {
 
 		rememberRecentCheckpointId(checkpointId);
 
-		// drop those pending checkpoints that are at prior to the completed one
-		dropSubsumedCheckpoints(checkpointId);
-
 		// record the time when this was completed, to calculate
 		// the 'min delay between checkpoints'
 		lastCheckpointCompletionNanos = System.nanoTime();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 00b61737d20..56e45762263 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -46,7 +46,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.File;
@@ -86,66 +85,13 @@
 	 * @param portRangeDefinition The port range to choose a port from.
 	 * @param logger The logger to output log information.
 	 * @return The ActorSystem which has been started
-	 * @throws Exception Thrown when actor system cannot be started in specified port range
-	 */
-	public static ActorSystem startActorSystem(
-		Configuration configuration,
-		String listeningAddress,
-		String portRangeDefinition,
-		Logger logger) throws Exception {
-		return startActorSystem(
-			configuration,
-			listeningAddress,
-			portRangeDefinition,
-			logger,
-			ActorSystemExecutorMode.FORK_JOIN_EXECUTOR);
-	}
-
-	/**
-	 * Starts an ActorSystem with the given configuration listening at the address/ports.
-	 *
-	 * @param configuration The Flink configuration
-	 * @param listeningAddress The address to listen at.
-	 * @param portRangeDefinition The port range to choose a port from.
-	 * @param logger The logger to output log information.
-	 * @param executorMode The executor mode of Akka actor system.
-	 * @return The ActorSystem which has been started
-	 * @throws Exception Thrown when actor system cannot be started in specified port range
-	 */
-	public static ActorSystem startActorSystem(
-			Configuration configuration,
-			String listeningAddress,
-			String portRangeDefinition,
-			Logger logger,
-			@Nonnull ActorSystemExecutorMode executorMode) throws Exception {
-		return startActorSystem(
-			configuration,
-			AkkaUtils.getFlinkActorSystemName(),
-			listeningAddress,
-			portRangeDefinition,
-			logger,
-			executorMode);
-	}
-
-	/**
-	 * Starts an ActorSystem with the given configuration listening at the address/ports.
-	 *
-	 * @param configuration The Flink configuration
-	 * @param actorSystemName Name of the started {@link ActorSystem}
-	 * @param listeningAddress The address to listen at.
-	 * @param portRangeDefinition The port range to choose a port from.
-	 * @param logger The logger to output log information.
-	 * @param executorMode The executor mode of Akka actor system.
-	 * @return The ActorSystem which has been started
-	 * @throws Exception Thrown when actor system cannot be started in specified port range
+	 * @throws Exception
 	 */
 	public static ActorSystem startActorSystem(
 			Configuration configuration,
-			String actorSystemName,
 			String listeningAddress,
 			String portRangeDefinition,
-			Logger logger,
-			@Nonnull ActorSystemExecutorMode executorMode) throws Exception {
+			Logger logger) throws Exception {
 
 		// parse port range definition and create port iterator
 		Iterator<Integer> portsIterator;
@@ -171,13 +117,7 @@ public static ActorSystem startActorSystem(
 			}
 
 			try {
-				return startActorSystem(
-					configuration,
-					actorSystemName,
-					listeningAddress,
-					port,
-					logger,
-					executorMode);
+				return startActorSystem(configuration, listeningAddress, port, logger);
 			}
 			catch (Exception e) {
 				// we can continue to try if this contains a netty channel exception
@@ -196,7 +136,6 @@ public static ActorSystem startActorSystem(
 
 	/**
 	 * Starts an Actor System at a specific port.
-	 *
 	 * @param configuration The Flink configuration.
 	 * @param listeningAddress The address to listen at.
 	 * @param listeningPort The port to listen at.
@@ -204,57 +143,11 @@ public static ActorSystem startActorSystem(
 	 * @return The ActorSystem which has been started.
 	 * @throws Exception
 	 */
-	public static ActorSystem startActorSystem(
-		Configuration configuration,
-		String listeningAddress,
-		int listeningPort,
-		Logger logger) throws Exception {
-		return startActorSystem(configuration, listeningAddress, listeningPort, logger, ActorSystemExecutorMode.FORK_JOIN_EXECUTOR);
-	}
-
-	/**
-	 * Starts an Actor System at a specific port.
-	 * @param configuration The Flink configuration.
-	 * @param listeningAddress The address to listen at.
-	 * @param listeningPort The port to listen at.
-	 * @param logger the logger to output log information.
-	 * @param executorMode The executor mode of Akka actor system.
-	 * @return The ActorSystem which has been started.
-	 * @throws Exception
-	 */
 	public static ActorSystem startActorSystem(
 				Configuration configuration,
 				String listeningAddress,
 				int listeningPort,
-				Logger logger,
-				ActorSystemExecutorMode executorMode) throws Exception {
-		return startActorSystem(
-			configuration,
-			AkkaUtils.getFlinkActorSystemName(),
-			listeningAddress,
-			listeningPort,
-			logger,
-			executorMode);
-	}
-
-	/**
-	 * Starts an Actor System at a specific port.
-	 * @param configuration The Flink configuration.
-	 * @param actorSystemName Name of the started {@link ActorSystem}
-	 * @param listeningAddress The address to listen at.
-	 * @param listeningPort The port to listen at.
-	 * @param logger the logger to output log information.
-	 * @param executorMode The executor mode of Akka actor system.
-	 * @return The ActorSystem which has been started.
-	 * @throws Exception
-	 */
-	public static ActorSystem startActorSystem(
-		Configuration configuration,
-		String actorSystemName,
-		String listeningAddress,
-		int listeningPort,
-		Logger logger,
-		ActorSystemExecutorMode executorMode) throws Exception {
+				Logger logger) throws Exception {
 
 		String hostPortUrl = NetUtils.unresolvedHostAndPortToNormalizedString(listeningAddress, listeningPort);
 		logger.info("Trying to start actor system at {}", hostPortUrl);
@@ -262,13 +155,12 @@ public static ActorSystem startActorSystem(
 		try {
 			Config akkaConfig = AkkaUtils.getAkkaConfig(
 				configuration,
-				new Some<>(new Tuple2<>(listeningAddress, listeningPort)),
-				getExecutorConfigByExecutorMode(configuration, executorMode)
+				new Some<>(new Tuple2<>(listeningAddress, listeningPort))
 			);
 
 			logger.debug("Using akka configuration\n {}", akkaConfig);
 
-			ActorSystem actorSystem = AkkaUtils.createActorSystem(actorSystemName, akkaConfig);
+			ActorSystem actorSystem = AkkaUtils.createActorSystem(akkaConfig);
 
 			logger.info("Actor system started at {}", AkkaUtils.getAddress(actorSystem));
 			return actorSystem;
@@ -278,24 +170,13 @@ public static ActorSystem startActorSystem(
 				Throwable cause = t.getCause();
 				if (cause != null && t.getCause() instanceof BindException) {
 					throw new IOException("Unable to create ActorSystem at address " + hostPortUrl +
-						" : " + cause.getMessage(), t);
+							" : " + cause.getMessage(), t);
 				}
 			}
 			throw new Exception("Could not create actor system", t);
 		}
 	}
 
-	private static Config getExecutorConfigByExecutorMode(Configuration configuration, ActorSystemExecutorMode executorMode) {
-		switch (executorMode) {
-			case FORK_JOIN_EXECUTOR:
-				return AkkaUtils.getForkJoinExecutorConfig(configuration);
-			case FIXED_THREAD_POOL_EXECUTOR:
-				return AkkaUtils.getThreadPoolExecutorConfig();
-			default:
-				throw new IllegalArgumentException(String.format("Unknown ActorSystemExecutorMode %s.", executorMode));
-		}
-	}
-
 	/**
 	 * Starts the web frontend.
 	 *
@@ -623,14 +504,4 @@ public static Configuration cloneConfiguration(Configuration configuration) {
 
 		return clonedConfiguration;
 	}
-
-	/**
-	 * Options to specify which executor to use in an {@link ActorSystem}.
-	 */
-	public enum ActorSystemExecutorMode {
-		/** Used by default, use dispatcher with fork-join-executor. **/
-		FORK_JOIN_EXECUTOR,
-		/** Use dispatcher with fixed thread pool executor. **/
-		FIXED_THREAD_POOL_EXECUTOR
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 2a9baa925ff..fd0a0a1d39f 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -96,8 +96,6 @@
 
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FORK_JOIN_EXECUTOR;
-
 /**
  * Base class for the Flink cluster entry points.
  *
@@ -156,9 +154,6 @@
 	@GuardedBy("lock")
 	private WebMonitorEndpoint<?> webMonitorEndpoint;
 
-	@GuardedBy("lock")
-	private ActorSystem metricQueryServiceActorSystem;
-
 	@GuardedBy("lock")
 	private ArchivedExecutionGraphStore archivedExecutionGraphStore;
 
@@ -281,9 +276,9 @@ protected void initializeServices(Configuration configuration) throws Exception
 			metricRegistry = createMetricRegistry(configuration);
 
 			// TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint
-			// Start actor system for metric query service on any available port
-			metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(configuration, bindAddress, LOG);
-			metricRegistry.startQueryService(metricQueryServiceActorSystem, null);
+			// start the MetricQueryService
+			final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem();
+			metricRegistry.startQueryService(actorSystem, null);
 
 			archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());
 
@@ -401,7 +396,7 @@ protected RpcService createRpcService(
 			Configuration configuration,
 			String bindAddress,
 			String portRange) throws Exception {
-		ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG, FORK_JOIN_EXECUTOR);
+		ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG);
 		FiniteDuration duration = AkkaUtils.getTimeout(configuration);
 		return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit()));
 	}
@@ -469,10 +464,6 @@ protected MetricRegistryImpl createMetricRegistry(Configuration configuration) {
 				terminationFutures.add(metricRegistry.shutdown());
 			}
 
-			if (metricQueryServiceActorSystem != null) {
-				terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem));
-			}
-
 			if (commonRpcService != null) {
 				terminationFutures.add(commonRpcService.stopService());
 			}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 39e5f44583c..3fd268a1aeb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -18,11 +18,8 @@
 
 package org.apache.flink.runtime.metrics.util;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
@@ -30,7 +27,6 @@
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 
-import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,15 +45,12 @@
 import java.lang.management.ThreadMXBean;
 import java.util.List;
 
-import static org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FIXED_THREAD_POOL_EXECUTOR;
-
 /**
  * Utility class to register pre-defined metric sets.
  */
 public class MetricUtils {
 	private static final Logger LOG = LoggerFactory.getLogger(MetricUtils.class);
 	private static final String METRIC_GROUP_STATUS_NAME = "Status";
-	private static final String METRICS_ACTOR_SYSTEM_NAME = "flink-metrics";
 
 	private MetricUtils() {
 	}
@@ -109,17 +102,6 @@ public static void instantiateStatusMetrics(
 		instantiateCPUMetrics(jvm.addGroup("CPU"));
 	}
 
-	public static ActorSystem startMetricsActorSystem(Configuration configuration, String hostname, Logger logger) throws Exception {
-		final String portRange = configuration.getString(MetricOptions.QUERY_SERVICE_PORT);
-		return BootstrapTools.startActorSystem(
-			configuration,
-			METRICS_ACTOR_SYSTEM_NAME,
-			hostname,
-			portRange,
-			logger,
-			FIXED_THREAD_POOL_EXECUTOR);
-	}
-
 	private static void instantiateNetworkMetrics(
 		MetricGroup metrics,
 		final NetworkEnvironment network) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index bcd4c7b93fb..4bfdb25c4d6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -133,9 +133,6 @@
 	@GuardedBy("lock")
 	private RpcService resourceManagerRpcService;
 
-	@GuardedBy("lock")
-	private ActorSystem metricQueryServiceActorSystem;
-
 	@GuardedBy("lock")
 	private HighAvailabilityServices haServices;
 
@@ -255,11 +252,8 @@ public void start() throws Exception {
 				commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
 
 				// TODO: Temporary hack until the metric query service is ported to the RpcEndpoint
-				metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(
-					configuration,
-					commonRpcService.getAddress(),
-					LOG);
-				metricRegistry.startQueryService(metricQueryServiceActorSystem, null);
+				final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem();
+				metricRegistry.startQueryService(actorSystem, null);
 
 				if (useSingleRpcService) {
 					for (int i = 0; i < numTaskManagers; i++) {
@@ -356,7 +350,7 @@ public void start() throws Exception {
 						configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
 						"DispatcherRestEndpoint"),
 					new AkkaQueryServiceRetriever(
-						metricQueryServiceActorSystem,
+						actorSystem,
 						Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
 					haServices.getWebMonitorLeaderElectionService(),
 					new ShutDownFatalErrorHandler());
@@ -453,12 +447,24 @@ public void start() throws Exception {
 
 					final FutureUtils.ConjunctFuture<Void> componentsTerminationFuture = FutureUtils.completeAll(componentTerminationFutures);
 
-					final CompletableFuture<Void> metricSystemTerminationFuture = FutureUtils.composeAfterwards(
+					final CompletableFuture<Void> metricRegistryTerminationFuture = FutureUtils.runAfterwards(
 						componentsTerminationFuture,
-						this::closeMetricSystem);
+						() -> {
+							synchronized (lock) {
+								if (jobManagerMetricGroup != null) {
+									jobManagerMetricGroup.close();
+									jobManagerMetricGroup = null;
+								}
+								// metrics shutdown
+								if (metricRegistry != null) {
+									metricRegistry.shutdown();
+									metricRegistry = null;
+								}
+							}
+						});
 
 					// shut down the RpcServices
-					final CompletableFuture<Void> rpcServicesTerminationFuture = metricSystemTerminationFuture
+					final CompletableFuture<Void> rpcServicesTerminationFuture = metricRegistryTerminationFuture
 						.thenCompose((Void ignored) -> terminateRpcServices());
 
 					final CompletableFuture<Void> remainingServicesTerminationFuture = FutureUtils.runAfterwards(
@@ -482,29 +488,6 @@ public void start() throws Exception {
 		}
 	}
 
-	private CompletableFuture<Void> closeMetricSystem() {
-		synchronized (lock) {
-			if (jobManagerMetricGroup != null) {
-				jobManagerMetricGroup.close();
-				jobManagerMetricGroup = null;
-			}
-
-			final ArrayList<CompletableFuture<Void>> terminationFutures = new ArrayList<>(2);
-
-			// metrics shutdown
-			if (metricRegistry != null) {
-				terminationFutures.add(metricRegistry.shutdown());
-				metricRegistry = null;
-			}
-
-			if (metricQueryServiceActorSystem != null) {
-				terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem));
-			}
-
-			return FutureUtils.completeAll(terminationFutures);
-		}
-	}
-
 	// ------------------------------------------------------------------------
 	//  Accessing jobs
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index a3a075d0f98..d78e3465ec4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -48,6 +48,7 @@
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
@@ -75,13 +76,11 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -594,26 +593,19 @@ public void unRegisterInfoMessageListener(final String address) {
 
 	@Override
 	public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
-		final ArrayList<CompletableFuture<Optional<Tuple2<ResourceID, String>>>> metricQueryServicePathFutures = new ArrayList<>(taskExecutors.size());
+		final ArrayList<Tuple2<ResourceID, String>> metricQueryServicePaths = new ArrayList<>(taskExecutors.size());
 
 		for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerRegistrationEntry : taskExecutors.entrySet()) {
 			final ResourceID tmResourceId = workerRegistrationEntry.getKey();
 			final WorkerRegistration<WorkerType> workerRegistration = workerRegistrationEntry.getValue();
-			final TaskExecutorGateway taskExecutorGateway = workerRegistration.getTaskExecutorGateway();
+			final String taskManagerAddress = workerRegistration.getTaskExecutorGateway().getAddress();
+			final String tmMetricQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) +
+				MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + tmResourceId.getResourceIdString();
 
-			final CompletableFuture<Optional<Tuple2<ResourceID, String>>> metricQueryServicePathFuture = taskExecutorGateway
-				.requestMetricQueryServiceAddress(timeout)
-				.thenApply(optional -> optional.map(path -> Tuple2.of(tmResourceId, path)));
-
-			metricQueryServicePathFutures.add(metricQueryServicePathFuture);
+			metricQueryServicePaths.add(Tuple2.of(tmResourceId, tmMetricQueryServicePath));
 		}
 
-		return FutureUtils.combineAll(metricQueryServicePathFutures).thenApply(
-			collection -> collection
-				.stream()
-				.filter(Optional::isPresent)
-				.map(Optional::get)
-				.collect(Collectors.toList()));
+		return CompletableFuture.completedFuture(metricQueryServicePaths);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 3ee7641f717..3a626986361 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -70,10 +70,7 @@
 	 * @throws IOException      Thrown, if the actor system can not bind to the address
 	 * @throws Exception      Thrown is some other error occurs while creating akka actor system
 	 */
-	public static RpcService createRpcService(
-		String hostname,
-		int port,
-		Configuration configuration) throws Exception {
+	public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception {
 		LOG.info("Starting AkkaRpcService at {}.", NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port));
 
 		final ActorSystem actorSystem;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f90e939bc01..33db7e13076 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -102,7 +102,6 @@
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
@@ -158,10 +157,6 @@
 
 	private final BlobCacheService blobCacheService;
 
-	/** The path to metric query service on this Task Manager. */
-	@Nullable
-	private final String metricQueryServicePath;
-
 	// --------- TaskManager services --------
 
 	/** The connection information of this task manager. */
@@ -216,7 +211,6 @@ public TaskExecutor(
 			TaskManagerServices taskExecutorServices,
 			HeartbeatServices heartbeatServices,
 			TaskManagerMetricGroup taskManagerMetricGroup,
-			@Nullable String metricQueryServicePath,
 			BlobCacheService blobCacheService,
 			FatalErrorHandler fatalErrorHandler) {
 
@@ -230,7 +224,6 @@ public TaskExecutor(
 		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 		this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
 		this.blobCacheService = checkNotNull(blobCacheService);
-		this.metricQueryServicePath = metricQueryServicePath;
 
 		this.taskSlotTable = taskExecutorServices.getTaskSlotTable();
 		this.jobManagerTable = taskExecutorServices.getJobManagerTable();
@@ -854,11 +847,6 @@ public void heartbeatFromResourceManager(ResourceID resourceID) {
 		}
 	}
 
-	@Override
-	public CompletableFuture<SerializableOptional<String>> requestMetricQueryServiceAddress(Time timeout) {
-		return CompletableFuture.completedFuture(SerializableOptional.ofNullable(metricQueryServicePath));
-	}
-
 	// ----------------------------------------------------------------------
 	// Disconnection RPCs
 	// ----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index d6b9e152e8f..4f792896216 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -36,7 +36,6 @@
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.types.SerializableOptional;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -196,11 +195,4 @@
 	 * @return Future which is completed with the {@link TransientBlobKey} of the uploaded file.
 	 */
 	CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType, @RpcTimeout Time timeout);
-
-	/**
-	 * Returns the fully qualified address of Metric Query Service on the TaskManager.
-	 *
-	 * @return Future String with Fully qualified (RPC) address of Metric Query Service on the TaskManager.
-	 */
-	CompletableFuture<SerializableOptional<String>> requestMetricQueryServiceAddress(@RpcTimeout Time timeout);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index f830ae1968a..42fe5bf658b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -43,6 +43,7 @@
 import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -100,8 +101,6 @@
 
 	private final RpcService rpcService;
 
-	private final ActorSystem metricQueryServiceActorSystem;
-
 	private final HighAvailabilityServices highAvailabilityServices;
 
 	private final MetricRegistryImpl metricRegistry;
@@ -133,14 +132,14 @@ public TaskManagerRunner(Configuration configuration, ResourceID resourceId) thr
 			HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
 
 		rpcService = createRpcService(configuration, highAvailabilityServices);
-		metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(configuration, rpcService.getAddress(), LOG);
 
 		HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
 
 		metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
 
 		// TODO: Temporary hack until the MetricQueryService has been ported to RpcEndpoint
-		metricRegistry.startQueryService(metricQueryServiceActorSystem, resourceId);
+		final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();
+		metricRegistry.startQueryService(actorSystem, resourceId);
 
 		blobCacheService = new BlobCacheService(
 			configuration, highAvailabilityServices.createBlobStore(), null
@@ -160,7 +159,7 @@ public TaskManagerRunner(Configuration configuration, ResourceID resourceId) thr
 		this.terminationFuture = new CompletableFuture<>();
 		this.shutdown = false;
 
-		MemoryLogger.startIfConfigured(LOG, configuration, metricQueryServiceActorSystem);
+		MemoryLogger.startIfConfigured(LOG, configuration, actorSystem);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -215,10 +214,6 @@ public void start() throws Exception {
 				exception = ExceptionUtils.firstOrSuppressed(e, exception);
 			}
 
-			if (metricQueryServiceActorSystem != null) {
-				terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem));
-			}
-
 			try {
 				highAvailabilityServices.close();
 			} catch (Exception e) {
@@ -378,8 +373,6 @@ public static TaskExecutor startTaskManager(
 
 		TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
 
-		String metricQueryServicePath = metricRegistry.getMetricQueryServicePath();
-
 		return new TaskExecutor(
 			rpcService,
 			taskManagerConfiguration,
@@ -387,7 +380,6 @@ public static TaskExecutor startTaskManager(
 			taskManagerServices,
 			heartbeatServices,
 			taskManagerMetricGroup,
-			metricQueryServicePath,
 			blobCacheService,
 			fatalErrorHandler);
 	}
@@ -424,14 +416,6 @@ public static RpcService createRpcService(
 
 		final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT);
 
-		return bindWithPort(configuration, taskManagerHostname, portRangeDefinition);
-	}
-
-	private static RpcService bindWithPort(
-		Configuration configuration,
-		String taskManagerHostname,
-		String portRangeDefinition) throws Exception{
-
 		// parse port range definition and create port iterator
 		Iterator<Integer> portsIterator;
 		try {
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 2b0c939e574..dcf0fdd9bdf 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -50,12 +50,6 @@ object AkkaUtils {
 
   val INF_TIMEOUT: FiniteDuration = 21474835 seconds
 
-  val FLINK_ACTOR_SYSTEM_NAME = "flink"
-
-  def getFlinkActorSystemName = {
-    FLINK_ACTOR_SYSTEM_NAME
-  }
-
   /**
    * Creates a local actor system without remoting.
    *
@@ -109,19 +103,9 @@ object AkkaUtils {
    * @return created actor system
    */
   def createActorSystem(akkaConfig: Config): ActorSystem = {
-    createActorSystem(FLINK_ACTOR_SYSTEM_NAME, akkaConfig)
-  }
-
-  /**
-    * Creates an actor system with the given akka config.
-    *
-    * @param akkaConfig configuration for the actor system
-    * @return created actor system
-    */
-  def createActorSystem(actorSystemName: String, akkaConfig: Config): ActorSystem = {
     // Initialize slf4j as logger of Akka's Netty instead of java.util.logging (FLINK-1650)
     InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory)
-    ActorSystem.create(actorSystemName, akkaConfig)
+    ActorSystem.create("flink", akkaConfig)
   }
 
   /**
@@ -135,23 +119,7 @@ object AkkaUtils {
   }
 
   /**
-    * Returns a remote Akka config for the given configuration values.
-    *
-    * @param configuration containing the user provided configuration values
-    * @param hostname to bind against. If null, then the loopback interface is used
-    * @param port to bind against
-    * @param executorMode containing the user specified mode of executor
-    * @return A remote Akka config
-    */
-  def getAkkaConfig(configuration: Configuration,
-                    hostname: String,
-                    port: Int,
-                    executorConfig: Config): Config = {
-    getAkkaConfig(configuration, Some((hostname, port)), executorConfig)
-  }
-
-  /**
-    * Returns a remote Akka config for the given configuration values.
+    * Return a remote Akka config for the given configuration values.
     *
     * @param configuration containing the user provided configuration values
     * @param hostname to bind against. If null, then the loopback interface is used
@@ -187,25 +155,7 @@ object AkkaUtils {
   @throws(classOf[UnknownHostException])
   def getAkkaConfig(configuration: Configuration,
                     externalAddress: Option[(String, Int)]): Config = {
-    getAkkaConfig(configuration, externalAddress, getForkJoinExecutorConfig(configuration))
-  }
-
-  /**
-    * Creates an akka config with the provided configuration values. If the listening address is
-    * specified, then the actor system will listen on the respective address.
-    *
-    * @param configuration instance containing the user provided configuration values
-    * @param externalAddress optional tuple of bindAddress and port to be reachable at.
-    *                        If None is given, then an Akka config for local actor system
-    *                        will be returned
-    * @param executorConfig config defining the used executor by the default dispatcher
-    * @return Akka config
-    */
-  @throws(classOf[UnknownHostException])
-  def getAkkaConfig(configuration: Configuration,
-                    externalAddress: Option[(String, Int)],
-                    executorConfig: Config): Config = {
-    val defaultConfig = getBasicAkkaConfig(configuration).withFallback(executorConfig)
+    val defaultConfig = getBasicAkkaConfig(configuration)
 
     externalAddress match {
 
@@ -257,6 +207,24 @@ object AkkaUtils {
     val supervisorStrategy = classOf[StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
       .getCanonicalName
 
+    val forkJoinExecutorParallelismFactor =
+      configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR)
+
+    val forkJoinExecutorParallelismMin =
+      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN)
+
+    val forkJoinExecutorParallelismMax =
+      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX)
+
+    val forkJoinExecutorConfig =
+      s"""
+         | fork-join-executor {
+         |   parallelism-factor = $forkJoinExecutorParallelismFactor
+         |   parallelism-min = $forkJoinExecutorParallelismMin
+         |   parallelism-max = $forkJoinExecutorParallelismMax
+         | }
+       """.stripMargin
+
     val config =
       s"""
         |akka {
@@ -283,6 +251,8 @@ object AkkaUtils {
         |
         |   default-dispatcher {
         |     throughput = $akkaThroughput
+        |
+        |   $forkJoinExecutorConfig
         |   }
         | }
         |}
@@ -291,53 +261,6 @@ object AkkaUtils {
     ConfigFactory.parseString(config)
   }
 
-  def getThreadPoolExecutorConfig: Config = {
-    val configString = s"""
-       |akka {
-       |  actor {
-       |    default-dispatcher {
-       |      executor = "thread-pool-executor"
-       |      thread-pool-executor {
-       |        core-pool-size-min = 2
-       |        core-pool-size-factor = 2.0
-       |        core-pool-size-max = 4
-       |      }
-       |    }
-       |  }
-       |}
-        """.
-      stripMargin
-
-    ConfigFactory.parseString(configString)
-  }
-
-  def getForkJoinExecutorConfig(configuration: Configuration): Config = {
-    val forkJoinExecutorParallelismFactor =
-      configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR)
-
-    val forkJoinExecutorParallelismMin =
-      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN)
-
-    val forkJoinExecutorParallelismMax =
-      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX)
-
-    val configString = s"""
-       |akka {
-       |  actor {
-       |    default-dispatcher {
-       |      executor = "fork-join-executor"
-       |      fork-join-executor {
-       |        parallelism-factor = $forkJoinExecutorParallelismFactor
-       |        parallelism-min = $forkJoinExecutorParallelismMin
-       |        parallelism-max = $forkJoinExecutorParallelismMax
-       |      }
-       |    }
-       |  }
-       |}""".stripMargin
-
-    ConfigFactory.parseString(configString)
-  }
-
   def testDispatcherConfig: Config = {
     val config =
       s"""
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 3650f43066d..b113e12ef69 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1494,8 +1494,8 @@ public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
 		assertTrue(pending.isDiscarded());
 		assertTrue(savepointFuture.isDone());
 
-		// the now we should have a completed checkpoint
-		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		// the now the savepoint should be completed but not added to the completed checkpoint store
+		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 
 		// validate that the relevant tasks got a confirmation message
@@ -1510,7 +1510,7 @@ public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
 			verify(subtaskState2, times(1)).registerSharedStates(any(SharedStateRegistry.class));
 		}
 
-		CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
+		CompletedCheckpoint success = savepointFuture.get();
 		assertEquals(jid, success.getJobId());
 		assertEquals(timestamp, success.getTimestamp());
 		assertEquals(pending.getCheckpointId(), success.getCheckpointID());
@@ -1528,9 +1528,9 @@ public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
-		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
-		CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
+		CompletedCheckpoint successNew = savepointFuture.get();
 		assertEquals(jid, successNew.getJobId());
 		assertEquals(timestampNew, successNew.getTimestamp());
 		assertEquals(checkpointIdNew, successNew.getCheckpointID());
@@ -1557,7 +1557,7 @@ public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
 	 * Triggers a savepoint and two checkpoints. The second checkpoint completes
 	 * and subsumes the first checkpoint, but not the first savepoint. Then we
 	 * trigger another checkpoint and savepoint. The 2nd savepoint completes and
-	 * subsumes the last checkpoint, but not the first savepoint.
+	 * does neither subsume the last checkpoint nor the first savepoint.
 	 */
 	@Test
 	public void testSavepointsAreNotSubsumed() throws Exception {
@@ -1614,18 +1614,19 @@ public void testSavepointsAreNotSubsumed() throws Exception {
 		assertFalse(savepointFuture1.isDone());
 
 		assertTrue(coord.triggerCheckpoint(timestamp + 3, false));
+		long checkpointId3 = counter.getLast();
 		assertEquals(2, coord.getNumberOfPendingCheckpoints());
 
 		CompletableFuture<CompletedCheckpoint> savepointFuture2 = coord.triggerSavepoint(timestamp + 4, savepointDir);
 		long savepointId2 = counter.getLast();
 		assertEquals(3, coord.getNumberOfPendingCheckpoints());
 
-		// 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint
+		// 2nd savepoint should not subsume the last checkpoint and the 1st savepoint
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId2));
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId2));
 
-		assertEquals(1, coord.getNumberOfPendingCheckpoints());
-		assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(2, coord.getNumberOfPendingCheckpoints());
+		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
 		assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
 
 		assertFalse(savepointFuture1.isDone());
@@ -1635,9 +1636,15 @@ public void testSavepointsAreNotSubsumed() throws Exception {
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId1));
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId1));
 
-		assertEquals(0, coord.getNumberOfPendingCheckpoints());
-		assertEquals(3, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
 		assertTrue(savepointFuture1.isDone());
+
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId3));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId3));
+
+		assertEquals(0, coord.getNumberOfPendingCheckpoints());
+		assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints());
 	}
 
 	private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
@@ -3460,6 +3467,92 @@ public void testCheckpointStatsTrackerRestoreCallback() throws Exception {
 			.reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
 	}
 
+	/**
+	 * FLINK-6328
+	 *
+	 * Tests that savepoints are not added to the {@link CompletedCheckpointStore} and,
+	 * thus, are not subject to job recovery. The reason that we don't want that (until
+	 * FLINK-4815 has been finished) is that the lifecycle of savepoints is not controlled
+	 * by the {@link CheckpointCoordinator}.
+	 */
+	@Test
+	public void testSavepointsAreNotAddedToCompletedCheckpointStore() throws Exception {
+		final JobID jobId = new JobID();
+		final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+		final ExecutionVertex vertex1 = mockExecutionVertex(executionAttemptId);
+		final CompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
+		final long checkpointTimestamp1 = 1L;
+		final long savepointTimestamp = 2L;
+		final long checkpointTimestamp2 = 3L;
+		final String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+
+		final StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
+
+		CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(
+			jobId,
+			600000L,
+			600000L,
+			0L,
+			Integer.MAX_VALUE,
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+			new ExecutionVertex[]{vertex1},
+			new ExecutionVertex[]{vertex1},
+			new ExecutionVertex[]{vertex1},
+			checkpointIDCounter,
+			completedCheckpointStore,
+			new MemoryStateBackend(),
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
+
+		// trigger a first checkpoint
+		assertTrue(
+			"Triggering of a checkpoint should work.",
+			checkpointCoordinator.triggerCheckpoint(checkpointTimestamp1, false));
+
+		assertTrue(0 == completedCheckpointStore.getNumberOfRetainedCheckpoints());
+
+		// complete the 1st checkpoint
+		checkpointCoordinator.receiveAcknowledgeMessage(
+			new AcknowledgeCheckpoint(
+				jobId,
+				executionAttemptId,
+				checkpointIDCounter.getLast()));
+
+		// check that the checkpoint has been completed
+		assertTrue(1 == completedCheckpointStore.getNumberOfRetainedCheckpoints());
+
+		// trigger a savepoint --> this should not have any effect on the CompletedCheckpointStore
+		CompletableFuture<CompletedCheckpoint> savepointFuture = checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir);
+
+		checkpointCoordinator.receiveAcknowledgeMessage(
+			new AcknowledgeCheckpoint(
+				jobId,
+				executionAttemptId,
+				checkpointIDCounter.getLast()));
+
+		// check that no errors occurred
+		final CompletedCheckpoint savepoint = savepointFuture.get();
+
+		assertFalse(
+			"The savepoint should not have been added to the completed checkpoint store",
+			savepoint.getCheckpointID() == completedCheckpointStore.getLatestCheckpoint().getCheckpointID());
+
+		assertTrue(
+			"Triggering of a checkpoint should work.",
+			checkpointCoordinator.triggerCheckpoint(checkpointTimestamp2, false));
+
+		// complete the 2nd checkpoint
+		checkpointCoordinator.receiveAcknowledgeMessage(
+			new AcknowledgeCheckpoint(
+				jobId,
+				executionAttemptId,
+				checkpointIDCounter.getLast()));
+
+		assertTrue(
+			"The latest completed (proper) checkpoint should have been added to the completed checkpoint store.",
+			completedCheckpointStore.getLatestCheckpoint().getCheckpointID() == checkpointIDCounter.getLast());
+	}
+
 	@Test
 	public void testSharedStateRegistrationOnRestore() throws Exception {
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 3ee1b927b53..6e0f9c5c638 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -165,7 +165,6 @@ public void testSlotAllocation() throws Exception {
 			taskManagerServices,
 			heartbeatServices,
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			null,
 			new BlobCacheService(
 				configuration,
 				new VoidBlobStore(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 179dea27835..4af052978f9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -279,7 +279,6 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception {
 			taskManagerServices,
 			heartbeatServices,
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			null,
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -370,7 +369,6 @@ public void testHeartbeatTimeoutWithResourceManager() throws Exception {
 			taskManagerServices,
 			heartbeatServices,
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			null,
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -486,7 +484,6 @@ public void testHeartbeatSlotReporting() throws Exception {
 			taskManagerServices,
 			heartbeatServices,
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			null,
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -565,7 +562,6 @@ public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
 			taskManagerServices,
 			new HeartbeatServices(1000L, 1000L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			null,
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -629,7 +625,6 @@ public void testTriggerRegistrationOnLeaderChange() throws Exception {
 			taskManagerServices,
 			new HeartbeatServices(1000L, 1000L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			null,
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -752,7 +747,6 @@ public void testTaskSubmission() throws Exception {
 			taskManagerServices,
 			new HeartbeatServices(1000L, 1000L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			null,
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -852,7 +846,6 @@ public void testJobLeaderDetection() throws Exception {
 			taskManagerServices,
 			new HeartbeatServices(1000L, 1000L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			null,
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -884,7 +877,7 @@ public void testJobLeaderDetection() throws Exception {
 			// the job leader should get the allocation id offered
 			verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(
 					any(ResourceID.class),
-					(Collection<SlotOffer>)Matchers.argThat(contains(slotOffer)),
+					(Collection<SlotOffer>) Matchers.argThat(contains(slotOffer)),
 					any(Time.class));
 		} finally {
 			RpcUtils.terminateRpcEndpoint(taskManager, timeout);
@@ -967,7 +960,6 @@ public void testSlotAcceptance() throws Exception {
 			taskManagerServices,
 			new HeartbeatServices(1000L, 1000L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			null,
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -1062,7 +1054,6 @@ public void testSubmitTaskBeforeAcceptSlot() throws Exception {
 			taskManagerServices,
 			new HeartbeatServices(1000L, 1000L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			null,
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -1169,7 +1160,6 @@ public void testFilterOutDuplicateJobMasterRegistrations() throws Exception {
 			taskManagerServices,
 			heartbeatServicesMock,
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			null,
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -1243,7 +1233,6 @@ public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception {
 			taskManagerServices,
 			heartbeatServices,
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			null,
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -1300,7 +1289,6 @@ public void testRemoveJobFromJobLeaderService() throws Exception {
 			taskManagerServices,
 			new HeartbeatServices(1000L, 1000L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			null,
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -1393,7 +1381,6 @@ public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exceptio
 			taskManagerServices,
 			new HeartbeatServices(heartbeatInterval, 10L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			null,
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -1501,7 +1488,6 @@ public void testReconnectionAttemptIfExplicitlyDisconnected() throws Exception {
 				.build(),
 			new HeartbeatServices(heartbeatInterval, 1000L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			null,
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 
@@ -1713,7 +1699,6 @@ private TaskExecutor createTaskExecutor(TaskManagerServices taskManagerServices)
 			taskManagerServices,
 			new HeartbeatServices(1000L, 1000L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-			null,
 			dummyBlobCacheService,
 			testingFatalErrorHandler);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 5fd12a84ac4..a9e99495e34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -34,7 +34,6 @@
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
-import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.CompletableFuture;
@@ -150,11 +149,6 @@ public void disconnectResourceManager(Exception cause) {
 		return FutureUtils.completedExceptionally(new UnsupportedOperationException());
 	}
 
-	@Override
-	public CompletableFuture<SerializableOptional<String>> requestMetricQueryServiceAddress(Time timeout) {
-		return CompletableFuture.completedFuture(SerializableOptional.of(address));
-	}
-
 	@Override
 	public String getAddress() {
 		return address;
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index e5c1668df0a..d02a55483bf 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.akka
 
-import java.net.{InetAddress, InetSocketAddress}
+import java.net.InetSocketAddress
 
 import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException}
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
@@ -167,30 +167,4 @@ class AkkaUtilsTest
     akkaConfig.getString("akka.remote.netty.tcp.hostname") should
       equal(NetUtils.unresolvedHostToNormalizedString(hostname))
   }
-
-  test("null hostname should go to localhost") {
-    val configure = AkkaUtils.getAkkaConfig(new Configuration(), Some((null, 1772)))
-
-    val hostname = configure.getString("akka.remote.netty.tcp.hostname")
-
-    InetAddress.getByName(hostname).isLoopbackAddress should be(true)
-  }
-
-  test("getAkkaConfig defaults to fork-join-executor") {
-    val akkaConfig = AkkaUtils.getAkkaConfig(new Configuration())
-
-    akkaConfig.getString("akka.actor.default-dispatcher.executor") should
-      equal("fork-join-executor")
-  }
-
-  test("getAkkaConfig respects executor config") {
-    val akkaConfig = AkkaUtils.getAkkaConfig(
-      new Configuration(),
-      "localhost",
-      1234,
-      AkkaUtils.getThreadPoolExecutorConfig)
-
-    akkaConfig.getString("akka.actor.default-dispatcher.executor") should
-      equal("thread-pool-executor")
-  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services