You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:51 UTC
[46/50] [abbrv] flink git commit: [FLINK-4606] Integrate the new
ResourceManager with the existed FlinkResourceManager
[FLINK-4606] Integrate the new ResourceManager with the existed FlinkResourceManager
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bace895
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bace895
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bace895
Branch: refs/heads/flip-6
Commit: 6bace895907e4a6259367d8b0333f994aefd1f18
Parents: f5ab959
Author: beyond1920 <be...@126.com>
Authored: Fri Sep 9 09:11:24 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:45:06 2016 +0200
----------------------------------------------------------------------
.../InfoMessageListenerRpcGateway.java | 35 +++
.../resourcemanager/ResourceManager.java | 214 ++++++++++++++++---
.../resourcemanager/ResourceManagerGateway.java | 23 ++
.../StandaloneResourceManager.java | 64 ++++++
.../resourcemanager/ResourceManagerHATest.java | 2 +-
.../ResourceManagerJobMasterTest.java | 2 +-
.../ResourceManagerTaskExecutorTest.java | 2 +-
.../slotmanager/SlotProtocolTest.java | 5 +-
8 files changed, 318 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6bace895/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
new file mode 100644
index 0000000..c1eeefa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.RpcGateway;
+
+/**
+ * A gateway to listen for info messages from {@link ResourceManager}
+ */
+public interface InfoMessageListenerRpcGateway extends RpcGateway {
+
+ /**
+ * Notifies when resource manager need to notify listener about InfoMessage
+ * @param infoMessage
+ */
+ void notifyInfoMessage(InfoMessage infoMessage);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bace895/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 88b8a11..83dc4db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -20,19 +20,22 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -42,8 +45,6 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
import java.util.HashMap;
@@ -66,15 +67,16 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
* </ul>
*/
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
+public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint implements LeaderContender {
- private final Logger LOG = LoggerFactory.getLogger(getClass());
+ /** The exit code with which the process is stopped in case of a fatal error */
+ protected static final int EXIT_CODE_FATAL_ERROR = -13;
private final Map<JobID, JobMasterGateway> jobMasterGateways;
private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners;
- private final Map<ResourceID, TaskExecutorRegistration> taskExecutorGateways;
+ private final Map<ResourceID, WorkerType> taskExecutorGateways;
private final HighAvailabilityServices highAvailabilityServices;
@@ -84,16 +86,16 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
private UUID leaderSessionID;
- public ResourceManager(
- RpcService rpcService,
- HighAvailabilityServices highAvailabilityServices,
- SlotManager slotManager) {
+ private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners;
+
+ public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices, SlotManager slotManager) {
super(rpcService);
this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
this.jobMasterGateways = new HashMap<>();
- this.slotManager = slotManager;
+ this.slotManager = checkNotNull(slotManager);
this.jobMasterLeaderRetrievalListeners = new HashSet<>();
this.taskExecutorGateways = new HashMap<>();
+ infoMessageListeners = new HashMap<>();
}
@Override
@@ -103,6 +105,8 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
super.start();
leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
leaderElectionService.start(this);
+ // framework specific initialization
+ initialize();
} catch (Throwable e) {
log.error("A fatal error happened when starting the ResourceManager", e);
throw new RuntimeException("A fatal error happened when starting the ResourceManager", e);
@@ -166,12 +170,12 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS));
} catch (Exception e) {
- LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+ log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
throw new Exception("Failed to retrieve JobMasterLeaderRetriever");
}
if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
- LOG.info("Declining registration request from non-leading JobManager {}", jobMasterAddress);
+ log.info("Declining registration request from non-leading JobManager {}", jobMasterAddress);
throw new Exception("JobManager is not leading");
}
@@ -190,7 +194,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
jobMasterLeaderRetriever.start(jobMasterLeaderListener);
} catch (Exception e) {
- LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+ log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
}
jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
@@ -237,13 +241,24 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
if (throwable != null) {
return new RegistrationResponse.Decline(throwable.getMessage());
} else {
- InstanceID id = new InstanceID();
- TaskExecutorRegistration oldTaskExecutor =
- taskExecutorGateways.put(resourceID, new TaskExecutorRegistration(taskExecutorGateway, id));
- if (oldTaskExecutor != null) {
- log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
+ WorkerType startedWorker = taskExecutorGateways.get(resourceID);
+ if(startedWorker != null) {
+ String oldWorkerAddress = startedWorker.getTaskExecutorGateway().getAddress();
+ if (taskExecutorAddress.equals(oldWorkerAddress)) {
+ log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
+ } else {
+ log.warn("Receive a duplicate registration from TaskExecutor {} at different address, previous ({}), new ({})",
+ resourceID, oldWorkerAddress, taskExecutorAddress);
+ // TODO :: suggest old taskExecutor to stop itself
+ slotManager.notifyTaskManagerFailure(resourceID);
+ startedWorker = workerStarted(resourceID, taskExecutorGateway);
+ taskExecutorGateways.put(resourceID, startedWorker);
+ }
+ } else {
+ startedWorker = workerStarted(resourceID, taskExecutorGateway);
+ taskExecutorGateways.put(resourceID, startedWorker);
}
- return new TaskExecutorRegistrationSuccess(id, 5000);
+ return new TaskExecutorRegistrationSuccess(startedWorker.getInstanceID(), 5000);
}
}
}, getMainThreadExecutor());
@@ -263,14 +278,12 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
if (jobMasterGateway != null) {
return slotManager.requestSlot(slotRequest);
} else {
- LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
+ log.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
return new SlotRequestRejected(slotRequest.getAllocationId());
}
}
-
-
// ------------------------------------------------------------------------
// Leader Contender
// ------------------------------------------------------------------------
@@ -324,6 +337,158 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
shutDown();
}
+ /**
+ * Registers an infoMessage listener
+ *
+ * @param infoMessageListenerAddress address of infoMessage listener to register to this resource manager
+ */
+ @RpcMethod
+ public void registerInfoMessageListener(final String infoMessageListenerAddress) {
+ if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
+ log.warn("Receive a duplicate registration from info message listener on ({})", infoMessageListenerAddress);
+ } else {
+ Future<InfoMessageListenerRpcGateway> infoMessageListenerRpcGatewayFuture = getRpcService().connect(infoMessageListenerAddress, InfoMessageListenerRpcGateway.class);
+
+ infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new AcceptFunction<InfoMessageListenerRpcGateway>() {
+ @Override
+ public void accept(InfoMessageListenerRpcGateway gateway) {
+ log.info("Receive a registration from info message listener on ({})", infoMessageListenerAddress);
+ infoMessageListeners.put(infoMessageListenerAddress, gateway);
+ }
+ }, getMainThreadExecutor());
+
+ infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+ @Override
+ public Void apply(Throwable failure) {
+ log.warn("Receive a registration from unreachable info message listener on ({})", infoMessageListenerAddress);
+ return null;
+ }
+ }, getMainThreadExecutor());
+ }
+ }
+
+ /**
+ * Unregisters an infoMessage listener
+ *
+ * @param infoMessageListenerAddress address of infoMessage listener to unregister from this resource manager
+ *
+ */
+ @RpcMethod
+ public void unRegisterInfoMessageListener(final String infoMessageListenerAddress) {
+ infoMessageListeners.remove(infoMessageListenerAddress);
+ }
+
+ /**
+ * Shutdowns cluster
+ *
+ * @param finalStatus
+ * @param optionalDiagnostics
+ */
+ @RpcMethod
+ public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) {
+ log.info("shut down cluster because application is in {}, diagnostics {}", finalStatus, optionalDiagnostics);
+ shutDownApplication(finalStatus, optionalDiagnostics);
+ }
+
+ /**
+ * This method should be called by the framework once it detects that a currently registered task executor has failed.
+ *
+ * @param resourceID Id of the worker that has failed.
+ * @param message An informational message that explains why the worker failed.
+ */
+ public void notifyWorkerFailed(final ResourceID resourceID, String message) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ WorkerType worker = taskExecutorGateways.remove(resourceID);
+ if (worker != null) {
+ // TODO :: suggest failed task executor to stop itself
+ slotManager.notifyTaskManagerFailure(resourceID);
+ }
+ }
+ });
+ }
+
+ /**
+ * Gets the number of currently started TaskManagers.
+ *
+ * @return The number of currently started TaskManagers.
+ */
+ public int getNumberOfStartedTaskManagers() {
+ return taskExecutorGateways.size();
+ }
+
+ /**
+ * Notifies the resource manager of a fatal error.
+ *
+ * <p><b>IMPORTANT:</b> This should not cleanly shut down this master, but exit it in
+ * such a way that a high-availability setting would restart this or fail over
+ * to another master.
+ */
+ public void onFatalError(final String message, final Throwable error) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ fatalError(message, error);
+ }
+ });
+ }
+
+ // ------------------------------------------------------------------------
+ // Framework specific behavior
+ // ------------------------------------------------------------------------
+
+ /**
+ * Initializes the framework specific components.
+ *
+ * @throws Exception Exceptions during initialization cause the resource manager to fail.
+ */
+ protected abstract void initialize() throws Exception;
+
+ /**
+ * Callback when a task executor register.
+ *
+ * @param resourceID The worker resource id
+ * @param taskExecutorGateway the task executor gateway
+ */
+ protected abstract WorkerType workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway);
+
+ /**
+ * Callback when a resource manager faced a fatal error
+ * @param message
+ * @param error
+ */
+ protected abstract void fatalError(String message, Throwable error);
+
+ /**
+ * The framework specific code for shutting down the application. This should report the
+ * application's final status and shut down the resource manager cleanly.
+ *
+ * This method also needs to make sure all pending containers that are not registered
+ * yet are returned.
+ *
+ * @param finalStatus The application status to report.
+ * @param optionalDiagnostics An optional diagnostics message.
+ */
+ protected abstract void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics);
+
+ // ------------------------------------------------------------------------
+ // Info messaging
+ // ------------------------------------------------------------------------
+
+ public void sendInfoMessage(final String message) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ InfoMessage infoMessage = new InfoMessage(message);
+ for (InfoMessageListenerRpcGateway listenerRpcGateway : infoMessageListeners.values()) {
+ listenerRpcGateway
+ .notifyInfoMessage(infoMessage);
+ }
+ }
+ });
+ }
+
private static class JobMasterLeaderListener implements LeaderRetrievalListener {
private final JobID jobID;
@@ -343,5 +508,6 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
// TODO
}
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bace895/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 484cea7..7c44006 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.RpcGateway;
@@ -75,4 +76,26 @@ public interface ResourceManagerGateway extends RpcGateway {
String taskExecutorAddress,
ResourceID resourceID,
@RpcTimeout Time timeout);
+
+ /**
+ * Registers an infoMessage listener
+ *
+ * @param infoMessageListenerAddress address of infoMessage listener to register to this resource manager
+ */
+ void registerInfoMessageListener(String infoMessageListenerAddress);
+
+ /**
+ * Unregisters an infoMessage listener
+ *
+ * @param infoMessageListenerAddress address of infoMessage listener to unregister from this resource manager
+ *
+ */
+ void unRegisterInfoMessageListener(String infoMessageListenerAddress);
+
+ /**
+ * shutdown cluster
+ * @param finalStatus
+ * @param optionalDiagnostics
+ */
+ void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bace895/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
new file mode 100644
index 0000000..84db1ee
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+/**
+ * A standalone implementation of the resource manager. Used when the system is started in
+ * standalone mode (via scripts), rather than via a resource framework like YARN or Mesos.
+ */
+public class StandaloneResourceManager extends ResourceManager<ResourceManagerGateway, TaskExecutorRegistration> {
+
+ public StandaloneResourceManager(RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ SlotManager slotManager) {
+ super(rpcService, highAvailabilityServices, slotManager);
+ }
+
+ @Override
+ protected void initialize() throws Exception {
+ // nothing to initialize
+ }
+
+ @Override
+ protected void fatalError(final String message, final Throwable error) {
+ log.error("FATAL ERROR IN RESOURCE MANAGER: " + message, error);
+ // kill this process
+ System.exit(EXIT_CODE_FATAL_ERROR);
+ }
+
+ @Override
+ protected TaskExecutorRegistration workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway) {
+ InstanceID instanceID = new InstanceID();
+ TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(taskExecutorGateway, instanceID);
+ return taskExecutorRegistration;
+ }
+
+ @Override
+ protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bace895/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 64a1191..fdb83f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -55,7 +55,7 @@ public class ResourceManagerHATest {
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
SlotManager slotManager = mock(SlotManager.class);
- final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, slotManager);
+ final ResourceManager resourceManager = new StandaloneResourceManager(rpcService, highAvailabilityServices, slotManager);
resourceManager.start();
// before grant leadership, resourceManager's leaderId is null
Assert.assertNull(resourceManager.getLeaderSessionID());
http://git-wip-us.apache.org/repos/asf/flink/blob/6bace895/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 332c093..8f09152 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -160,7 +160,7 @@ public class ResourceManagerJobMasterTest {
TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService);
- ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
+ ResourceManager resourceManager = new StandaloneResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
resourceManager.start();
return resourceManager;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bace895/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index ed7c7d7..e6d1ed5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -121,7 +121,7 @@ public class ResourceManagerTaskExecutorTest {
private ResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService) {
TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
- ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
+ ResourceManager resourceManager = new StandaloneResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
resourceManager.start();
return resourceManager;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bace895/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 0232fab..ff25897 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
@@ -100,7 +101,7 @@ public class SlotProtocolTest extends TestLogger {
TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
ResourceManager resourceManager =
- new ResourceManager(testRpcService, testingHaServices, slotManager);
+ new StandaloneResourceManager(testRpcService, testingHaServices, slotManager);
resourceManager.start();
rmLeaderElectionService.isLeader(rmLeaderID);
@@ -179,7 +180,7 @@ public class SlotProtocolTest extends TestLogger {
TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
ResourceManager resourceManager =
- new ResourceManager(testRpcService, testingHaServices, slotManager);
+ new StandaloneResourceManager(testRpcService, testingHaServices, slotManager);
resourceManager.start();
rmLeaderElectionService.isLeader(rmLeaderID);