You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:50 UTC
[45/50] [abbrv] flink git commit: [FLINK-4606] integrate features of
old ResourceManager
[FLINK-4606] integrate features of old ResourceManager
This closes #2540
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eebe2c38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eebe2c38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eebe2c38
Branch: refs/heads/flip-6
Commit: eebe2c387a876aa3934f8a58eb62a3106eae41e1
Parents: be43f6f
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Sep 27 10:38:02 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:45:06 2016 +0200
----------------------------------------------------------------------
.../InfoMessageListenerRpcGateway.java | 1 -
.../resourcemanager/ResourceManager.java | 146 ++++++++++++-------
.../resourcemanager/ResourceManagerGateway.java | 6 +-
.../ResourceManagerServices.java | 44 ++++++
.../StandaloneResourceManager.java | 19 ++-
.../TaskExecutorRegistration.java | 51 -------
.../registration/TaskExecutorRegistration.java | 51 +++++++
.../slotmanager/SimpleSlotManager.java | 6 -
.../slotmanager/SlotManager.java | 63 ++++++--
.../slotmanager/SlotManagerTest.java | 25 +++-
.../slotmanager/SlotProtocolTest.java | 42 +++---
11 files changed, 295 insertions(+), 159 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/eebe2c38/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
index c1eeefa..d1373ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
-import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.rpc.RpcGateway;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/eebe2c38/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 83dc4db..190a4de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -20,14 +20,18 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
@@ -48,11 +52,10 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import scala.concurrent.duration.FiniteDuration;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -64,36 +67,43 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* It offers the following methods as part of its rpc interface to interact with the him remotely:
* <ul>
* <li>{@link #registerJobMaster(UUID, UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li>
- * <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
+ * <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li>
* </ul>
*/
-public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint implements LeaderContender {
+public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
+ extends RpcEndpoint<ResourceManagerGateway>
+ implements LeaderContender {
/** The exit code with which the process is stopped in case of a fatal error */
protected static final int EXIT_CODE_FATAL_ERROR = -13;
private final Map<JobID, JobMasterGateway> jobMasterGateways;
- private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners;
+ private final Map<JobID, JobMasterLeaderListener> jobMasterLeaderRetrievalListeners;
private final Map<ResourceID, WorkerType> taskExecutorGateways;
private final HighAvailabilityServices highAvailabilityServices;
- private LeaderElectionService leaderElectionService;
-
private final SlotManager slotManager;
+ private LeaderElectionService leaderElectionService;
+
private UUID leaderSessionID;
private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners;
- public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices, SlotManager slotManager) {
+ private final Time timeout = Time.seconds(5);
+
+ public ResourceManager(
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ SlotManager slotManager) {
super(rpcService);
this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
this.jobMasterGateways = new HashMap<>();
this.slotManager = checkNotNull(slotManager);
- this.jobMasterLeaderRetrievalListeners = new HashSet<>();
+ this.jobMasterLeaderRetrievalListeners = new HashMap<>();
this.taskExecutorGateways = new HashMap<>();
infoMessageListeners = new HashMap<>();
}
@@ -105,6 +115,7 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
super.start();
leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
leaderElectionService.start(this);
+ slotManager.setupResourceManagerServices(new DefaultResourceManagerServices());
// framework specific initialization
initialize();
} catch (Throwable e) {
@@ -117,7 +128,7 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
public void shutDown() {
try {
leaderElectionService.stop();
- for(JobID jobID : jobMasterGateways.keySet()) {
+ for (JobID jobID : jobMasterGateways.keySet()) {
highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
}
super.shutDown();
@@ -189,15 +200,17 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
if (throwable != null) {
return new RegistrationResponse.Decline(throwable.getMessage());
} else {
- JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
- try {
- LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
- jobMasterLeaderRetriever.start(jobMasterLeaderListener);
- } catch (Exception e) {
- log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
- return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
+ if (!jobMasterLeaderRetrievalListeners.containsKey(jobID)) {
+ JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
+ try {
+ LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
+ jobMasterLeaderRetriever.start(jobMasterLeaderListener);
+ } catch (Exception e) {
+ log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+ return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
+ }
+ jobMasterLeaderRetrievalListeners.put(jobID, jobMasterLeaderListener);
}
- jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
if (existingGateway != null) {
log.info("Replacing gateway for registered JobID {}.", jobID);
@@ -232,7 +245,6 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
throw new Exception("Invalid leader session id");
}
-
return getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class).get(5, TimeUnit.SECONDS);
}
}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
@@ -241,24 +253,14 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
if (throwable != null) {
return new RegistrationResponse.Decline(throwable.getMessage());
} else {
- WorkerType startedWorker = taskExecutorGateways.get(resourceID);
- if(startedWorker != null) {
- String oldWorkerAddress = startedWorker.getTaskExecutorGateway().getAddress();
- if (taskExecutorAddress.equals(oldWorkerAddress)) {
- log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
- } else {
- log.warn("Receive a duplicate registration from TaskExecutor {} at different address, previous ({}), new ({})",
- resourceID, oldWorkerAddress, taskExecutorAddress);
- // TODO :: suggest old taskExecutor to stop itself
- slotManager.notifyTaskManagerFailure(resourceID);
- startedWorker = workerStarted(resourceID, taskExecutorGateway);
- taskExecutorGateways.put(resourceID, startedWorker);
- }
- } else {
- startedWorker = workerStarted(resourceID, taskExecutorGateway);
- taskExecutorGateways.put(resourceID, startedWorker);
+ WorkerType oldWorker = taskExecutorGateways.remove(resourceID);
+ if (oldWorker != null) {
+ // TODO :: suggest old taskExecutor to stop itself
+ slotManager.notifyTaskManagerFailure(resourceID);
}
- return new TaskExecutorRegistrationSuccess(startedWorker.getInstanceID(), 5000);
+ WorkerType newWorker = workerStarted(resourceID);
+ taskExecutorGateways.put(resourceID, newWorker);
+ return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
}
}
}, getMainThreadExecutor());
@@ -271,11 +273,20 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
* @return Slot assignment
*/
@RpcMethod
- public SlotRequestReply requestSlot(SlotRequest slotRequest) {
- final JobID jobId = slotRequest.getJobId();
- final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
+ public SlotRequestReply requestSlot(
+ UUID jobMasterLeaderID,
+ UUID resourceManagerLeaderID,
+ SlotRequest slotRequest) {
+
+ JobID jobId = slotRequest.getJobId();
+ JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
+ JobMasterLeaderListener jobMasterLeaderListener = jobMasterLeaderRetrievalListeners.get(jobId);
+
+ UUID leaderID = jobMasterLeaderListener.getLeaderID();
- if (jobMasterGateway != null) {
+ if (jobMasterGateway != null
+ && jobMasterLeaderID.equals(leaderID)
+ && resourceManagerLeaderID.equals(leaderSessionID)) {
return slotManager.requestSlot(slotRequest);
} else {
log.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
@@ -379,7 +390,7 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
}
/**
- * Shutdowns cluster
+ * Cleanup application and shut down cluster
*
* @param finalStatus
* @param optionalDiagnostics
@@ -446,17 +457,11 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
protected abstract void initialize() throws Exception;
/**
- * Callback when a task executor register.
+ * Notifies the resource master of a fatal error.
*
- * @param resourceID The worker resource id
- * @param taskExecutorGateway the task executor gateway
- */
- protected abstract WorkerType workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway);
-
- /**
- * Callback when a resource manager faced a fatal error
- * @param message
- * @param error
+ * <p><b>IMPORTANT:</b> This should not cleanly shut down this master, but exit it in
+ * such a way that a high-availability setting would restart this or fail over
+ * to another master.
*/
protected abstract void fatalError(String message, Throwable error);
@@ -472,6 +477,19 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
*/
protected abstract void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics);
+ /**
+ * Allocates a resource using the resource profile.
+ * @param resourceProfile The resource description
+ */
+ @VisibleForTesting
+ public abstract void startNewWorker(ResourceProfile resourceProfile);
+
+ /**
+ * Callback when a worker was started.
+ * @param resourceID The worker resource id
+ */
+ protected abstract WorkerType workerStarted(ResourceID resourceID);
+
// ------------------------------------------------------------------------
// Info messaging
// ------------------------------------------------------------------------
@@ -489,6 +507,24 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
});
}
+ private class DefaultResourceManagerServices implements ResourceManagerServices {
+
+ @Override
+ public void allocateResource(ResourceProfile resourceProfile) {
+ ResourceManager.this.startNewWorker(resourceProfile);
+ }
+
+ @Override
+ public Executor getAsyncExecutor() {
+ return ResourceManager.this.getRpcService().getExecutor();
+ }
+
+ @Override
+ public Executor getExecutor() {
+ return ResourceManager.this.getMainThreadExecutor();
+ }
+ }
+
private static class JobMasterLeaderListener implements LeaderRetrievalListener {
private final JobID jobID;
@@ -498,6 +534,14 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends
this.jobID = jobID;
}
+ public JobID getJobID() {
+ return jobID;
+ }
+
+ public UUID getLeaderID() {
+ return leaderID;
+ }
+
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
this.leaderID = leaderSessionID;
http://git-wip-us.apache.org/repos/asf/flink/blob/eebe2c38/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 7c44006..87303a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -59,7 +59,11 @@ public interface ResourceManagerGateway extends RpcGateway {
* @param slotRequest Slot request
* @return Future slot assignment
*/
- Future<SlotRequestReply> requestSlot(SlotRequest slotRequest);
+ Future<SlotRequestReply> requestSlot(
+ UUID jobMasterLeaderID,
+ UUID resourceManagerLeaderID,
+ SlotRequest slotRequest,
+ @RpcTimeout Time timeout);
/**
* Register a {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager.
http://git-wip-us.apache.org/repos/asf/flink/blob/eebe2c38/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
new file mode 100644
index 0000000..30994dc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Interface which provides access to services of the ResourceManager.
+ */
+public interface ResourceManagerServices {
+
+ /**
+ * Allocates a resource according to the resource profile.
+ */
+ void allocateResource(ResourceProfile resourceProfile);
+
+ /**
+ * Gets the async excutor which executes outside of the main thread of the ResourceManager
+ */
+ Executor getAsyncExecutor();
+
+ /**
+ * Gets the executor which executes in the main thread of the ResourceManager
+ */
+ Executor getExecutor();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/eebe2c38/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 84db1ee..deca8d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -20,17 +20,18 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
/**
* A standalone implementation of the resource manager. Used when the system is started in
* standalone mode (via scripts), rather than via a resource framework like YARN or Mesos.
+ *
+ * This ResourceManager doesn't acquire new resources.
*/
-public class StandaloneResourceManager extends ResourceManager<ResourceManagerGateway, TaskExecutorRegistration> {
+public class StandaloneResourceManager extends ResourceManager<ResourceID> {
public StandaloneResourceManager(RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
@@ -51,14 +52,16 @@ public class StandaloneResourceManager extends ResourceManager<ResourceManagerGa
}
@Override
- protected TaskExecutorRegistration workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway) {
- InstanceID instanceID = new InstanceID();
- TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(taskExecutorGateway, instanceID);
- return taskExecutorRegistration;
+ protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
}
@Override
- protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+ public void startNewWorker(ResourceProfile resourceProfile) {
+ }
+ @Override
+ protected ResourceID workerStarted(ResourceID resourceID) {
+ return resourceID;
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eebe2c38/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
deleted file mode 100644
index f8dfdc7..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
-
-import java.io.Serializable;
-
-/**
- * This class is responsible for group the TaskExecutorGateway and the InstanceID of a registered task executor.
- */
-public class TaskExecutorRegistration implements Serializable {
-
- private static final long serialVersionUID = -2062957799469434614L;
-
- private TaskExecutorGateway taskExecutorGateway;
-
- private InstanceID instanceID;
-
- public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway,
- InstanceID instanceID) {
- this.taskExecutorGateway = taskExecutorGateway;
- this.instanceID = instanceID;
- }
-
- public InstanceID getInstanceID() {
- return instanceID;
- }
-
- public TaskExecutorGateway getTaskExecutorGateway() {
- return taskExecutorGateway;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/eebe2c38/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
new file mode 100644
index 0000000..6b21f5c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.registration;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+import java.io.Serializable;
+
+/**
+ * This class is responsible for group the TaskExecutorGateway and the InstanceID of a registered task executor.
+ */
+public class TaskExecutorRegistration implements Serializable {
+
+ private static final long serialVersionUID = -2062957799469434614L;
+
+ private TaskExecutorGateway taskExecutorGateway;
+
+ private InstanceID instanceID;
+
+ public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway,
+ InstanceID instanceID) {
+ this.taskExecutorGateway = taskExecutorGateway;
+ this.instanceID = instanceID;
+ }
+
+ public InstanceID getInstanceID() {
+ return instanceID;
+ }
+
+ public TaskExecutorGateway getTaskExecutorGateway() {
+ return taskExecutorGateway;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/eebe2c38/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
index ef5ce31..ae1de5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -51,9 +50,4 @@ public class SimpleSlotManager extends SlotManager {
}
}
- @Override
- protected void allocateContainer(ResourceProfile resourceProfile) {
- // TODO
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eebe2c38/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index a6d2196..a56b2f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -22,16 +22,18 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +84,9 @@ public abstract class SlotManager {
/** The current leader id set by the ResourceManager */
private UUID leaderID;
+ /** The Resource allocation provider */
+ private ResourceManagerServices resourceManagerServices;
+
public SlotManager() {
this.registeredSlots = new HashMap<>(16);
this.pendingSlotRequests = new LinkedHashMap<>(16);
@@ -91,6 +96,16 @@ public abstract class SlotManager {
this.timeout = Time.seconds(10);
}
+ /**
+ * Initializes the resource supplier which is needed to request new resources.
+ */
+ public void setupResourceManagerServices(ResourceManagerServices resourceManagerServices) {
+ if (this.resourceManagerServices != null) {
+ throw new IllegalStateException("ResourceManagerServices may only be set once.");
+ }
+ this.resourceManagerServices = resourceManagerServices;
+ }
+
// ------------------------------------------------------------------------
// slot managements
@@ -120,17 +135,32 @@ public abstract class SlotManager {
// record this allocation in bookkeeping
allocationMap.addAllocation(slot.getSlotId(), allocationId);
-
// remove selected slot from free pool
- freeSlots.remove(slot.getSlotId());
+ final ResourceSlot removedSlot = freeSlots.remove(slot.getSlotId());
final Future<SlotRequestReply> slotRequestReplyFuture =
slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
- // TODO handle timeouts and response
+
+ slotRequestReplyFuture.handleAsync(new BiFunction<SlotRequestReply, Throwable, Object>() {
+ @Override
+ public Object apply(SlotRequestReply slotRequestReply, Throwable throwable) {
+ if (throwable != null) {
+ // we failed, put the slot and the request back again
+ if (allocationMap.isAllocated(slot.getSlotId())) {
+ // only re-add if the slot hasn't been removed in the meantime
+ freeSlots.put(slot.getSlotId(), removedSlot);
+ }
+ pendingSlotRequests.put(allocationId, request);
+ }
+ return null;
+ }
+ }, resourceManagerServices.getExecutor());
} else {
LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " +
"AllocationID:{}, JobID:{}", allocationId, request.getJobId());
- allocateContainer(request.getResourceProfile());
+ Preconditions.checkState(resourceManagerServices != null,
+ "Attempted to allocate resources but no ResourceManagerServices set.");
+ resourceManagerServices.allocateResource(request.getResourceProfile());
pendingSlotRequests.put(allocationId, request);
}
@@ -343,7 +373,7 @@ public abstract class SlotManager {
if (chosenRequest != null) {
final AllocationID allocationId = chosenRequest.getAllocationId();
- pendingSlotRequests.remove(allocationId);
+ final SlotRequest removedSlotRequest = pendingSlotRequests.remove(allocationId);
LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(),
allocationId, chosenRequest.getJobId());
@@ -351,7 +381,19 @@ public abstract class SlotManager {
final Future<SlotRequestReply> slotRequestReplyFuture =
freeSlot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
- // TODO handle timeouts and response
+
+ slotRequestReplyFuture.handleAsync(new BiFunction<SlotRequestReply, Throwable, Object>() {
+ @Override
+ public Object apply(SlotRequestReply slotRequestReply, Throwable throwable) {
+ if (throwable != null) {
+ // we failed, add the request back again
+ if (allocationMap.isAllocated(freeSlot.getSlotId())) {
+ pendingSlotRequests.put(allocationId, removedSlotRequest);
+ }
+ }
+ return null;
+ }
+ }, resourceManagerServices.getExecutor());
} else {
freeSlots.put(freeSlot.getSlotId(), freeSlot);
}
@@ -417,13 +459,6 @@ public abstract class SlotManager {
protected abstract SlotRequest chooseRequestToFulfill(final ResourceSlot offeredSlot,
final Map<AllocationID, SlotRequest> pendingRequests);
- /**
- * The framework specific code for allocating a container for specified resource profile.
- *
- * @param resourceProfile The resource profile
- */
- protected abstract void allocateContainer(final ResourceProfile resourceProfile);
-
// ------------------------------------------------------------------------
// Helper classes
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/eebe2c38/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 9ee9690..0fed79e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -19,12 +19,16 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.junit.BeforeClass;
@@ -34,10 +38,13 @@ import org.mockito.Mockito;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
public class SlotManagerTest {
@@ -57,6 +64,8 @@ public class SlotManagerTest {
@BeforeClass
public static void setUp() {
taskExecutorGateway = Mockito.mock(TaskExecutorGateway.class);
+ Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), any(UUID.class), any(Time.class)))
+ .thenReturn(new FlinkCompletableFuture<SlotRequestReply>());
}
/**
@@ -498,12 +507,13 @@ public class SlotManagerTest {
// testing classes
// ------------------------------------------------------------------------
- private static class TestingSlotManager extends SlotManager {
+ private static class TestingSlotManager extends SlotManager implements ResourceManagerServices {
private final List<ResourceProfile> allocatedContainers;
TestingSlotManager() {
this.allocatedContainers = new LinkedList<>();
+ setupResourceManagerServices(this);
}
/**
@@ -543,12 +553,23 @@ public class SlotManagerTest {
}
@Override
- protected void allocateContainer(ResourceProfile resourceProfile) {
+ public void allocateResource(ResourceProfile resourceProfile) {
allocatedContainers.add(resourceProfile);
}
+ @Override
+ public Executor getAsyncExecutor() {
+ return Mockito.mock(Executor.class);
+ }
+
+ @Override
+ public Executor getExecutor() {
+ return Mockito.mock(Executor.class);
+ }
+
List<ResourceProfile> getAllocatedContainers() {
return allocatedContainers;
}
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eebe2c38/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index ff25897..e3018c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -24,18 +24,14 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.resourcemanager.*;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
@@ -47,9 +43,12 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.util.Collections;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.any;
@@ -99,9 +98,9 @@ public class SlotProtocolTest extends TestLogger {
TestingLeaderElectionService rmLeaderElectionService =
configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID);
- TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
+ SlotManager slotManager = Mockito.spy(new SimpleSlotManager());
ResourceManager resourceManager =
- new StandaloneResourceManager(testRpcService, testingHaServices, slotManager);
+ Mockito.spy(new StandaloneResourceManager(testRpcService, testingHaServices, slotManager));
resourceManager.start();
rmLeaderElectionService.isLeader(rmLeaderID);
@@ -118,7 +117,7 @@ public class SlotProtocolTest extends TestLogger {
SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
SlotRequestReply slotRequestReply =
- resourceManager.requestSlot(slotRequest);
+ resourceManager.requestSlot(jmLeaderID, rmLeaderID, slotRequest);
// 1) SlotRequest is routed to the SlotManager
verify(slotManager).requestSlot(slotRequest);
@@ -129,13 +128,15 @@ public class SlotProtocolTest extends TestLogger {
allocationID);
// 3) SlotRequest leads to a container allocation
- verify(slotManager, timeout(5000)).allocateContainer(resourceProfile);
+ verify(resourceManager, timeout(5000)).startNewWorker(resourceProfile);
Assert.assertFalse(slotManager.isAllocated(allocationID));
// slot becomes available
final String tmAddress = "/tm1";
TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+ Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), any(UUID.class), any(Time.class)))
+ .thenReturn(new FlinkCompletableFuture<SlotRequestReply>());
testRpcService.registerGateway(tmAddress, taskExecutorGateway);
final ResourceID resourceID = ResourceID.generate();
@@ -176,11 +177,13 @@ public class SlotProtocolTest extends TestLogger {
configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID);
TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+ Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), any(UUID.class), any(Time.class)))
+ .thenReturn(new FlinkCompletableFuture<SlotRequestReply>());
testRpcService.registerGateway(tmAddress, taskExecutorGateway);
- TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
+ SlotManager slotManager = Mockito.spy(new SimpleSlotManager());
ResourceManager resourceManager =
- new StandaloneResourceManager(testRpcService, testingHaServices, slotManager);
+ Mockito.spy(new StandaloneResourceManager(testRpcService, testingHaServices, slotManager));
resourceManager.start();
rmLeaderElectionService.isLeader(rmLeaderID);
@@ -207,7 +210,7 @@ public class SlotProtocolTest extends TestLogger {
SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
SlotRequestReply slotRequestReply =
- resourceManager.requestSlot(slotRequest);
+ resourceManager.requestSlot(jmLeaderID, rmLeaderID, slotRequest);
// 1) a SlotRequest is routed to the SlotManager
verify(slotManager).requestSlot(slotRequest);
@@ -241,15 +244,4 @@ public class SlotProtocolTest extends TestLogger {
return rmLeaderElectionService;
}
- private static class TestingSlotManager extends SimpleSlotManager {
-
- // change visibility of function to public for testing
- @Override
- public void allocateContainer(ResourceProfile resourceProfile) {
- super.allocateContainer(resourceProfile);
- }
-
-
- }
-
}