You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/08/31 15:18:19 UTC
[1/3] flink git commit: [FLINK-4347][cluster management] Implement
SlotManager core
Repository: flink
Updated Branches:
refs/heads/flip-6 5d71a552a -> 454bf51b0
[FLINK-4347][cluster management] Implement SlotManager core
This closes #2388
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4abfd9b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4abfd9b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4abfd9b
Branch: refs/heads/flip-6
Commit: c4abfd9bc92989712faab7201d8d847454d5ed7b
Parents: 806f2f1
Author: Kurt Young <yk...@gmail.com>
Authored: Thu Aug 18 15:48:30 2016 +0800
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Aug 31 17:17:46 2016 +0200
----------------------------------------------------------------------
.../runtime/clusterframework/SlotManager.java | 525 ++++++++++++++++++
.../clusterframework/types/ResourceID.java | 4 +-
.../clusterframework/types/ResourceProfile.java | 5 +
.../clusterframework/types/ResourceSlot.java | 66 +++
.../runtime/clusterframework/types/SlotID.java | 14 +-
.../rpc/resourcemanager/SlotRequest.java | 51 +-
.../runtime/rpc/taskexecutor/SlotReport.java | 56 ++
.../runtime/rpc/taskexecutor/SlotStatus.java | 129 +++++
.../clusterframework/SlotManagerTest.java | 540 +++++++++++++++++++
9 files changed, 1382 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c4abfd9b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java
new file mode 100644
index 0000000..cc140a1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
+import org.apache.flink.runtime.rpc.taskexecutor.SlotStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request
+ * slots from registered TaskManagers and issues container allocation requests in case of there are not
+ * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat.
+ * <p>
+ * The main operation principle of SlotManager is:
+ * <ul>
+ * <li>1. All slot allocation status should be synced with TaskManager, which is the ground truth.</li>
+ * <li>2. All slots that have registered must be tracked, either by free pool or allocated pool.</li>
+ * <li>3. All slot requests will be handled by best efforts, there is no guarantee that one request will be
+ * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should
+ * be handled outside SlotManager. SlotManager will make each decision based on the information it currently
+ * holds.</li>
+ * </ul>
+ * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
+ */
+public abstract class SlotManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);
+
+ /** Gateway to communicate with ResourceManager */
+ private final ResourceManagerGateway resourceManagerGateway;
+
+ /** All registered slots, including free and allocated slots */
+ private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots;
+
+ /** All pending slot requests, waiting available slots to fulfil */
+ private final Map<AllocationID, SlotRequest> pendingSlotRequests;
+
+ /** All free slots that can be used to be allocated */
+ private final Map<SlotID, ResourceSlot> freeSlots;
+
+ /** All allocations, we can lookup allocations either by SlotID or AllocationID */
+ private final AllocationMap allocationMap;
+
+ public SlotManager(ResourceManagerGateway resourceManagerGateway) {
+ this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
+ this.registeredSlots = new HashMap<>(16);
+ this.pendingSlotRequests = new LinkedHashMap<>(16);
+ this.freeSlots = new HashMap<>(16);
+ this.allocationMap = new AllocationMap();
+ }
+
+ // ------------------------------------------------------------------------
+ // slot managements
+ // ------------------------------------------------------------------------
+
+ /**
+ * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container
+ * allocation if we don't have enough resource. If we have free slot which can match the request, record
+ * this allocation and forward the request to TaskManager through ResourceManager (we want this done by
+ * RPC's main thread to avoid race condition).
+ *
+ * @param request The detailed request of the slot
+ */
+ public void requestSlot(final SlotRequest request) {
+ if (isRequestDuplicated(request)) {
+ LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId());
+ return;
+ }
+
+ // try to fulfil the request with current free slots
+ ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+ if (slot != null) {
+ LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(),
+ request.getAllocationId(), request.getJobId());
+
+ // record this allocation in bookkeeping
+ allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId());
+
+ // remove selected slot from free pool
+ freeSlots.remove(slot.getSlotId());
+
+ // TODO: send slot request to TaskManager
+ } else {
+ LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " +
+ "AllocationID:{}, JobID:{}", request.getAllocationId(), request.getJobId());
+ allocateContainer(request.getResourceProfile());
+ pendingSlotRequests.put(request.getAllocationId(), request);
+ }
+ }
+
+ /**
+ * Sync slot status with TaskManager's SlotReport.
+ */
+ public void updateSlotStatus(final SlotReport slotReport) {
+ for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
+ updateSlotStatus(slotStatus);
+ }
+ }
+
+ /**
+ * The slot request to TaskManager may be either failed by rpc communication (timeout, network error, etc.)
+ * or really rejected by TaskManager. We shall retry this request by:
+ * <ul>
+ * <li>1. verify and clear all the previous allocate information for this request
+ * <li>2. try to request slot again
+ * </ul>
+ * <p>
+ * This may cause some duplicate allocation, e.g. the slot request to TaskManager is successful but the response
+ * is lost somehow, so we may request a slot in another TaskManager, this causes two slots assigned to one request,
+ * but it can be taken care of by rejecting registration at JobManager.
+ *
+ * @param originalRequest The original slot request
+ * @param slotId The target SlotID
+ */
+ public void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) {
+ final AllocationID originalAllocationId = originalRequest.getAllocationId();
+ LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}",
+ slotId, originalAllocationId, originalRequest.getJobId());
+
+ // verify the allocation info before we do anything
+ if (freeSlots.containsKey(slotId)) {
+ // this slot is currently empty, no need to de-allocate it from our allocations
+ LOG.info("Original slot is somehow empty, retrying this request");
+
+ // before retry, we should double check whether this request was allocated by some other ways
+ if (!allocationMap.isAllocated(originalAllocationId)) {
+ requestSlot(originalRequest);
+ } else {
+ LOG.info("The failed request has somehow been allocated, SlotID:{}",
+ allocationMap.getSlotID(originalAllocationId));
+ }
+ } else if (allocationMap.isAllocated(slotId)) {
+ final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId);
+
+ // check whether we have an agreement on whom this slot belongs to
+ if (originalAllocationId.equals(currentAllocationId)) {
+ LOG.info("De-allocate this request and retry");
+ allocationMap.removeAllocation(currentAllocationId);
+
+ // put this slot back to free pool
+ ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId));
+ freeSlots.put(slotId, slot);
+
+ // retry the request
+ requestSlot(originalRequest);
+ } else {
+ // the slot is taken by someone else, no need to de-allocate it from our allocations
+ LOG.info("Original slot is taken by someone else, current AllocationID:{}", currentAllocationId);
+
+ // before retry, we should double check whether this request was allocated by some other ways
+ if (!allocationMap.isAllocated(originalAllocationId)) {
+ requestSlot(originalRequest);
+ } else {
+ LOG.info("The failed request is somehow been allocated, SlotID:{}",
+ allocationMap.getSlotID(originalAllocationId));
+ }
+ }
+ } else {
+ LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
+ }
+ }
+
+ /**
+ * Callback for TaskManager failures. In case that a TaskManager fails, we have to clean up all its slots.
+ *
+ * @param resourceId The ResourceID of the TaskManager
+ */
+ public void notifyTaskManagerFailure(final ResourceID resourceId) {
+ LOG.info("Resource:{} been notified failure", resourceId);
+ final Map<SlotID, ResourceSlot> slotIdsToRemove = registeredSlots.remove(resourceId);
+ if (slotIdsToRemove != null) {
+ for (SlotID slotId : slotIdsToRemove.keySet()) {
+ LOG.info("Removing Slot:{} upon resource failure", slotId);
+ if (freeSlots.containsKey(slotId)) {
+ freeSlots.remove(slotId);
+ } else if (allocationMap.isAllocated(slotId)) {
+ allocationMap.removeAllocation(slotId);
+ } else {
+ LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
+ }
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // internal behaviors
+ // ------------------------------------------------------------------------
+
+ /**
+ * Update slot status based on TaskManager's report. There are mainly two situations when we receive the report:
+ * <ul>
+ * <li>1. The slot is newly registered.</li>
+ * <li>2. The slot has registered, it contains its current status.</li>
+ * </ul>
+ * <p>
+ * Regarding 1: It's fairly simple, we just record this slot's status, and trigger schedule if slot is empty.
+ * <p>
+ * Regarding 2: It will cause some weird situation since we may have some time-gap on how the slot's status really
+ * is. We may have some updates on the slot's allocation, but it doesn't reflected by TaskManager's heartbeat yet,
+ * and we may make some wrong decision if we cannot guarantee we have the exact status about all the slots. So
+ * the principle here is: We always trust TaskManager's heartbeat, we will correct our information based on that
+ * and take next action based on the diff between our information and heartbeat status.
+ *
+ * @param reportedStatus Reported slot status
+ */
+ void updateSlotStatus(final SlotStatus reportedStatus) {
+ final SlotID slotId = reportedStatus.getSlotID();
+ final ResourceSlot slot = new ResourceSlot(slotId, reportedStatus.getProfiler());
+
+ if (registerNewSlot(slot)) {
+ // we have a newly registered slot
+ LOG.info("New slot appeared, SlotID:{}, AllocationID:{}", slotId, reportedStatus.getAllocationID());
+
+ if (reportedStatus.getAllocationID() != null) {
+ // slot in use, record this in bookkeeping
+ allocationMap.addAllocation(slotId, reportedStatus.getAllocationID());
+ } else {
+ handleFreeSlot(new ResourceSlot(slotId, reportedStatus.getProfiler()));
+ }
+ } else {
+ // slot exists, update current information
+ if (reportedStatus.getAllocationID() != null) {
+ // slot is reported in use
+ final AllocationID reportedAllocationId = reportedStatus.getAllocationID();
+
+ // check whether we also thought this slot is in use
+ if (allocationMap.isAllocated(slotId)) {
+ // we also think that slot is in use, check whether the AllocationID matches
+ final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId);
+
+ if (!reportedAllocationId.equals(currentAllocationId)) {
+ LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:{}",
+ slotId, currentAllocationId, reportedAllocationId);
+
+ // seems we have a disagreement about the slot assignments, need to correct it
+ allocationMap.removeAllocation(slotId);
+ allocationMap.addAllocation(slotId, reportedAllocationId);
+ }
+ } else {
+ LOG.info("Slot allocation info mismatch! SlotID:{}, current:null, reported:{}",
+ slotId, reportedAllocationId);
+
+ // we thought the slot is free, should correct this information
+ allocationMap.addAllocation(slotId, reportedStatus.getAllocationID());
+
+ // remove this slot from free slots pool
+ freeSlots.remove(slotId);
+ }
+ } else {
+ // slot is reported empty
+
+ // check whether we also thought this slot is empty
+ if (allocationMap.isAllocated(slotId)) {
+ LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:null",
+ slotId, allocationMap.getAllocationID(slotId));
+
+ // we thought the slot is in use, correct it
+ allocationMap.removeAllocation(slotId);
+
+ // we have a free slot!
+ handleFreeSlot(new ResourceSlot(slotId, reportedStatus.getProfiler()));
+ }
+ }
+ }
+ }
+
+ /**
+ * When we have a free slot, try to fulfill the pending request first. If any request can be fulfilled,
+ * record this allocation in bookkeeping and send slot request to TaskManager, else we just add this slot
+ * to the free pool.
+ *
+ * @param freeSlot The free slot
+ */
+ private void handleFreeSlot(final ResourceSlot freeSlot) {
+ SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, pendingSlotRequests);
+
+ if (chosenRequest != null) {
+ pendingSlotRequests.remove(chosenRequest.getAllocationId());
+
+ LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(),
+ chosenRequest.getAllocationId(), chosenRequest.getJobId());
+ allocationMap.addAllocation(freeSlot.getSlotId(), chosenRequest.getAllocationId());
+
+ // TODO: send slot request to TaskManager
+ } else {
+ freeSlots.put(freeSlot.getSlotId(), freeSlot);
+ }
+ }
+
+ /**
+ * Check whether the request is duplicated. We use AllocationID to identify slot request, for each
+ * formerly received slot request, it is either in pending list or already been allocated.
+ *
+ * @param request The slot request
+ * @return <tt>true</tt> if the request is duplicated
+ */
+ private boolean isRequestDuplicated(final SlotRequest request) {
+ final AllocationID allocationId = request.getAllocationId();
+ return pendingSlotRequests.containsKey(allocationId)
+ || allocationMap.isAllocated(allocationId);
+ }
+
+ /**
+ * Try to register slot, and tell if this slot is newly registered.
+ *
+ * @param slot The ResourceSlot which will be checked and registered
+ * @return <tt>true</tt> if we meet a new slot
+ */
+ private boolean registerNewSlot(final ResourceSlot slot) {
+ final SlotID slotId = slot.getSlotId();
+ final ResourceID resourceId = slotId.getResourceID();
+ if (!registeredSlots.containsKey(resourceId)) {
+ registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
+ }
+ return registeredSlots.get(resourceId).put(slotId, slot) == null;
+ }
+
+ private ResourceSlot getRegisteredSlot(final SlotID slotId) {
+ final ResourceID resourceId = slotId.getResourceID();
+ if (!registeredSlots.containsKey(resourceId)) {
+ return null;
+ }
+ return registeredSlots.get(resourceId).get(slotId);
+ }
+
+ // ------------------------------------------------------------------------
+ // Framework specific behavior
+ // ------------------------------------------------------------------------
+
+ /**
+ * Choose a slot to use among all free slots, the behavior is framework specified.
+ *
+ * @param request The slot request
+ * @param freeSlots All slots which can be used
+ * @return The slot we choose to use, <tt>null</tt> if we did not find a match
+ */
+ protected abstract ResourceSlot chooseSlotToUse(final SlotRequest request,
+ final Map<SlotID, ResourceSlot> freeSlots);
+
+ /**
+ * Choose a pending request to fulfill when we have a free slot, the behavior is framework specified.
+ *
+ * @param offeredSlot The free slot
+ * @param pendingRequests All the pending slot requests
+ * @return The chosen SlotRequest, <tt>null</tt> if we did not find a match
+ */
+ protected abstract SlotRequest chooseRequestToFulfill(final ResourceSlot offeredSlot,
+ final Map<AllocationID, SlotRequest> pendingRequests);
+
+ /**
+ * The framework specific code for allocating a container for specified resource profile.
+ *
+ * @param resourceProfile The resource profile
+ */
+ protected abstract void allocateContainer(final ResourceProfile resourceProfile);
+
+
+ // ------------------------------------------------------------------------
+ // Helper classes
+ // ------------------------------------------------------------------------
+
+ /**
+ * We maintain all the allocations with SlotID and AllocationID. We are able to get or remove the allocation info
+ * either by SlotID or AllocationID.
+ */
+ private static class AllocationMap {
+
+ /** All allocated slots (by SlotID) */
+ private final Map<SlotID, AllocationID> allocatedSlots;
+
+ /** All allocated slots (by AllocationID), it'a a inverse view of allocatedSlots */
+ private final Map<AllocationID, SlotID> allocatedSlotsByAllocationId;
+
+ AllocationMap() {
+ this.allocatedSlots = new HashMap<>(16);
+ this.allocatedSlotsByAllocationId = new HashMap<>(16);
+ }
+
+ /**
+ * Add a allocation
+ *
+ * @param slotId The slot id
+ * @param allocationId The allocation id
+ */
+ void addAllocation(final SlotID slotId, final AllocationID allocationId) {
+ allocatedSlots.put(slotId, allocationId);
+ allocatedSlotsByAllocationId.put(allocationId, slotId);
+ }
+
+ /**
+ * De-allocation with slot id
+ *
+ * @param slotId The slot id
+ */
+ void removeAllocation(final SlotID slotId) {
+ if (allocatedSlots.containsKey(slotId)) {
+ final AllocationID allocationId = allocatedSlots.get(slotId);
+ allocatedSlots.remove(slotId);
+ allocatedSlotsByAllocationId.remove(allocationId);
+ }
+ }
+
+ /**
+ * De-allocation with allocation id
+ *
+ * @param allocationId The allocation id
+ */
+ void removeAllocation(final AllocationID allocationId) {
+ if (allocatedSlotsByAllocationId.containsKey(allocationId)) {
+ SlotID slotId = allocatedSlotsByAllocationId.get(allocationId);
+ allocatedSlotsByAllocationId.remove(allocationId);
+ allocatedSlots.remove(slotId);
+ }
+ }
+
+ /**
+ * Check whether allocation exists by slot id
+ *
+ * @param slotId The slot id
+ * @return true if the allocation exists
+ */
+ boolean isAllocated(final SlotID slotId) {
+ return allocatedSlots.containsKey(slotId);
+ }
+
+ /**
+ * Check whether allocation exists by allocation id
+ *
+ * @param allocationId The allocation id
+ * @return true if the allocation exists
+ */
+ boolean isAllocated(final AllocationID allocationId) {
+ return allocatedSlotsByAllocationId.containsKey(allocationId);
+ }
+
+ AllocationID getAllocationID(final SlotID slotId) {
+ return allocatedSlots.get(slotId);
+ }
+
+ SlotID getSlotID(final AllocationID allocationId) {
+ return allocatedSlotsByAllocationId.get(allocationId);
+ }
+
+ public int size() {
+ return allocatedSlots.size();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Testing utilities
+ // ------------------------------------------------------------------------
+
+ @VisibleForTesting
+ boolean isAllocated(final SlotID slotId) {
+ return allocationMap.isAllocated(slotId);
+ }
+
+ @VisibleForTesting
+ boolean isAllocated(final AllocationID allocationId) {
+ return allocationMap.isAllocated(allocationId);
+ }
+
+ /**
+ * Add free slots directly to the free pool, this will not trigger pending requests allocation
+ *
+ * @param slot The resource slot
+ */
+ @VisibleForTesting
+ void addFreeSlot(final ResourceSlot slot) {
+ final ResourceID resourceId = slot.getResourceID();
+ final SlotID slotId = slot.getSlotId();
+
+ if (!registeredSlots.containsKey(resourceId)) {
+ registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
+ }
+ registeredSlots.get(resourceId).put(slot.getSlotId(), slot);
+ freeSlots.put(slotId, slot);
+ }
+
+ @VisibleForTesting
+ int getAllocatedSlotCount() {
+ return allocationMap.size();
+ }
+
+ @VisibleForTesting
+ int getFreeSlotCount() {
+ return freeSlots.size();
+ }
+
+ @VisibleForTesting
+ int getPendingRequestCount() {
+ return pendingSlotRequests.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c4abfd9b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
index 8cf9ccb..6b8a037 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
@@ -63,9 +63,7 @@ public final class ResourceID implements ResourceIDRetrievable, Serializable {
@Override
public String toString() {
- return "ResourceID{" +
- "resourceId='" + resourceId + '\'' +
- '}';
+ return resourceId;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/c4abfd9b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index cbe709f..ff1c4bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -40,6 +40,11 @@ public class ResourceProfile implements Serializable {
this.memoryInMB = memoryInMB;
}
+ public ResourceProfile(ResourceProfile other) {
+ this.cpuCores = other.cpuCores;
+ this.memoryInMB = other.memoryInMB;
+ }
+
/**
* Get the cpu cores needed
* @return The cpu cores, 1.0 means a full cpu thread
http://git-wip-us.apache.org/repos/asf/flink/blob/c4abfd9b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
new file mode 100644
index 0000000..8a6db5f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A ResourceSlot represents a slot located in TaskManager from ResourceManager's view. It has a unique
+ * identification and resource profile which we can compare to the resource request.
+ */
+public class ResourceSlot implements ResourceIDRetrievable, Serializable {
+
+ private static final long serialVersionUID = -5853720153136840674L;
+
+ /** The unique identification of this slot */
+ private final SlotID slotId;
+
+ /** The resource profile of this slot */
+ private final ResourceProfile resourceProfile;
+
+ public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile) {
+ this.slotId = checkNotNull(slotId);
+ this.resourceProfile = checkNotNull(resourceProfile);
+ }
+
+ @Override
+ public ResourceID getResourceID() {
+ return slotId.getResourceID();
+ }
+
+ public SlotID getSlotId() {
+ return slotId;
+ }
+
+ public ResourceProfile getResourceProfile() {
+ return resourceProfile;
+ }
+
+ /**
+ * Check whether required resource profile can be matched by this slot.
+ *
+ * @param required The required resource profile
+ * @return true if requirement can be matched
+ */
+ public boolean isMatchingRequirement(ResourceProfile required) {
+ return resourceProfile.isMatching(required);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c4abfd9b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
index d1b072d..e831a5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
@@ -75,9 +75,15 @@ public class SlotID implements ResourceIDRetrievable, Serializable {
@Override
public String toString() {
- return "SlotID{" +
- "resourceId=" + resourceId +
- ", slotId=" + slotId +
- '}';
+ return resourceId + "_" + slotId;
+ }
+
+ /**
+ * Generate a random slot id.
+ *
+ * @return A random slot id.
+ */
+ public static SlotID generate() {
+ return new SlotID(ResourceID.generate(), 0);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c4abfd9b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
index d8fe268..74c7c39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
@@ -18,8 +18,57 @@
package org.apache.flink.runtime.rpc.resourcemanager;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
import java.io.Serializable;
-public class SlotRequest implements Serializable{
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This describes the requirement of the slot, mainly used by JobManager requesting slot from ResourceManager.
+ */
+public class SlotRequest implements Serializable {
+
private static final long serialVersionUID = -6586877187990445986L;
+
+ /** The JobID of the slot requested for */
+ private final JobID jobId;
+
+ /** The unique identification of this request */
+ private final AllocationID allocationId;
+
+ /** The resource profile of the required slot */
+ private final ResourceProfile resourceProfile;
+
+ public SlotRequest(JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile) {
+ this.jobId = checkNotNull(jobId);
+ this.allocationId = checkNotNull(allocationId);
+ this.resourceProfile = checkNotNull(resourceProfile);
+ }
+
+ /**
+ * Get the JobID of the slot requested for.
+ * @return The job id
+ */
+ public JobID getJobId() {
+ return jobId;
+ }
+
+ /**
+ * Get the unique identification of this request
+ * @return the allocation id
+ */
+ public AllocationID getAllocationId() {
+ return allocationId;
+ }
+
+ /**
+ * Get the resource profile of the desired slot
+ * @return The resource profile
+ */
+ public ResourceProfile getResourceProfile() {
+ return resourceProfile;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c4abfd9b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
new file mode 100644
index 0000000..c372ecb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A report about the current status of all slots of the TaskExecutor, describing
+ * which slots are available and allocated, and what jobs (JobManagers) the allocated slots
+ * have been allocated to.
+ */
+public class SlotReport implements Serializable {
+
+ private static final long serialVersionUID = -3150175198722481689L;
+
+ /** The slots status of the TaskManager */
+ private final List<SlotStatus> slotsStatus;
+
+ /** The resource id which identifies the TaskManager */
+ private final ResourceID resourceID;
+
+ public SlotReport(final List<SlotStatus> slotsStatus, final ResourceID resourceID) {
+ this.slotsStatus = checkNotNull(slotsStatus);
+ this.resourceID = checkNotNull(resourceID);
+ }
+
+ public List<SlotStatus> getSlotsStatus() {
+ return slotsStatus;
+ }
+
+ public ResourceID getResourceID() {
+ return resourceID;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c4abfd9b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
new file mode 100644
index 0000000..e8e2084
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This describes the slot current status which located in TaskManager.
+ */
+public class SlotStatus implements Serializable {
+
+ private static final long serialVersionUID = 5099191707339664493L;
+
+ /** slotID to identify a slot */
+ private final SlotID slotID;
+
+ /** the resource profile of the slot */
+ private final ResourceProfile profiler;
+
+ /** if the slot is allocated, allocationId identify its allocation; else, allocationId is null */
+ private final AllocationID allocationID;
+
+ /** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */
+ private final JobID jobID;
+
+ public SlotStatus(SlotID slotID, ResourceProfile profiler) {
+ this(slotID, profiler, null, null);
+ }
+
+ public SlotStatus(SlotID slotID, ResourceProfile profiler, AllocationID allocationID, JobID jobID) {
+ this.slotID = checkNotNull(slotID, "slotID cannot be null");
+ this.profiler = checkNotNull(profiler, "profile cannot be null");
+ this.allocationID = allocationID;
+ this.jobID = jobID;
+ }
+
+ /**
+ * Get the unique identification of this slot
+ *
+ * @return The slot id
+ */
+ public SlotID getSlotID() {
+ return slotID;
+ }
+
+ /**
+ * Get the resource profile of this slot
+ *
+ * @return The resource profile
+ */
+ public ResourceProfile getProfiler() {
+ return profiler;
+ }
+
+ /**
+ * Get the allocation id of this slot
+ *
+ * @return The allocation id if this slot is allocated, otherwise null
+ */
+ public AllocationID getAllocationID() {
+ return allocationID;
+ }
+
+ /**
+ * Get the job id of the slot allocated for
+ *
+ * @return The job id if this slot is allocated, otherwise null
+ */
+ public JobID getJobID() {
+ return jobID;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SlotStatus that = (SlotStatus) o;
+
+ if (!slotID.equals(that.slotID)) {
+ return false;
+ }
+ if (!profiler.equals(that.profiler)) {
+ return false;
+ }
+ if (allocationID != null ? !allocationID.equals(that.allocationID) : that.allocationID != null) {
+ return false;
+ }
+ return jobID != null ? jobID.equals(that.jobID) : that.jobID == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = slotID.hashCode();
+ result = 31 * result + profiler.hashCode();
+ result = 31 * result + (allocationID != null ? allocationID.hashCode() : 0);
+ result = 31 * result + (jobID != null ? jobID.hashCode() : 0);
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c4abfd9b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
new file mode 100644
index 0000000..2ee280f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.rpc.taskexecutor.SlotStatus;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class SlotManagerTest {
+
+ private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
+
+ private static final long DEFAULT_TESTING_MEMORY = 512;
+
+ private static final ResourceProfile DEFAULT_TESTING_PROFILE =
+ new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY);
+
+ private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
+ new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2);
+
+ private ResourceManagerGateway resourceManagerGateway;
+
+ @Before
+ public void setUp() {
+ resourceManagerGateway = mock(ResourceManagerGateway.class);
+ }
+
+ /**
+ * Tests that there are no free slots when we request, need to allocate from cluster manager master
+ */
+ @Test
+ public void testRequestSlotWithoutFreeSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertEquals(1, slotManager.getAllocatedContainers().size());
+ assertEquals(DEFAULT_TESTING_PROFILE, slotManager.getAllocatedContainers().get(0));
+ }
+
+ /**
+ * Tests that there are some free slots when we request, and the request is fulfilled immediately
+ */
+ @Test
+ public void testRequestSlotWithFreeSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+
+ directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
+ assertEquals(1, slotManager.getFreeSlotCount());
+
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertEquals(0, slotManager.getAllocatedContainers().size());
+ }
+
+ /**
+ * Tests that there are some free slots when we request, but none of them are suitable
+ */
+ @Test
+ public void testRequestSlotWithoutSuitableSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+
+ directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 2);
+ assertEquals(2, slotManager.getFreeSlotCount());
+
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(2, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertEquals(1, slotManager.getAllocatedContainers().size());
+ assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+ }
+
+ /**
+ * Tests that we send duplicated slot request
+ */
+ @Test
+ public void testDuplicatedSlotRequest() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
+
+ SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE);
+
+ slotManager.requestSlot(request1);
+ slotManager.requestSlot(request2);
+ slotManager.requestSlot(request2);
+ slotManager.requestSlot(request1);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertEquals(1, slotManager.getAllocatedContainers().size());
+ assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+ }
+
+ /**
+ * Tests that we send multiple slot requests
+ */
+ @Test
+ public void testRequestMultipleSlots() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 5);
+
+ // request 3 normal slots
+ for (int i = 0; i < 3; ++i) {
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+ }
+
+ // request 2 big slots
+ for (int i = 0; i < 2; ++i) {
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+ }
+
+ // request 1 normal slot again
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+
+ assertEquals(4, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(2, slotManager.getPendingRequestCount());
+ assertEquals(2, slotManager.getAllocatedContainers().size());
+ assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+ assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(1));
+ }
+
+ /**
+ * Tests that a new slot appeared in SlotReport, and we used it to fulfill a pending request
+ */
+ @Test
+ public void testNewlyAppearedFreeSlotFulfillPendingRequest() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+ assertEquals(1, slotManager.getPendingRequestCount());
+
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ }
+
+ /**
+ * Tests that a new slot appeared in SlotReport, but we have no pending request
+ */
+ @Test
+ public void testNewlyAppearedFreeSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ }
+
+ /**
+ * Tests that a new slot appeared in SlotReport, but it't not suitable for all the pending requests
+ */
+ @Test
+ public void testNewlyAppearedFreeSlotNotMatchPendingRequests() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+ assertEquals(1, slotManager.getPendingRequestCount());
+
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertFalse(slotManager.isAllocated(slotId));
+ }
+
+ /**
+ * Tests that a new slot appeared in SlotReport, and it's been reported using by some job
+ */
+ @Test
+ public void testNewlyAppearedInUseSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ }
+
+ /**
+ * Tests that we had a slot in-use, and it's confirmed by SlotReport
+ */
+ @Test
+ public void testExistingInUseSlotUpdateStatus() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request);
+
+ // make this slot in use
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertTrue(slotManager.isAllocated(slotId));
+
+ // slot status is confirmed
+ SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE,
+ request.getAllocationId(), request.getJobId());
+ slotManager.updateSlotStatus(slotStatus2);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ }
+
+ /**
+ * Tests that we had a slot in-use, but it's empty according to the SlotReport
+ */
+ @Test
+ public void testExistingInUseSlotAdjustedToEmpty() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request1);
+
+ // make this slot in use
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ // another request pending
+ SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request2);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ assertTrue(slotManager.isAllocated(request1.getAllocationId()));
+
+
+ // but slot is reported empty again, request2 will be fulfilled, request1 will be missing
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+ }
+
+ /**
+ * Tests that we had a slot in use, and it's also reported in use by TaskManager, but the allocation
+ * information didn't match.
+ */
+ @Test
+ public void testExistingInUseSlotWithDifferentAllocationInfo() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request);
+
+ // make this slot in use
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ assertTrue(slotManager.isAllocated(request.getAllocationId()));
+
+ SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
+ // update slot status with different allocation info
+ slotManager.updateSlotStatus(slotStatus2);
+
+ // original request is missing and won't be allocated
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ assertFalse(slotManager.isAllocated(request.getAllocationId()));
+ assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID()));
+ }
+
+ /**
+ * Tests that we had a free slot, and it's confirmed by SlotReport
+ */
+ @Test
+ public void testExistingEmptySlotUpdateStatus() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
+ slotManager.addFreeSlot(slot);
+
+ SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ }
+
+ /**
+ * Tests that we had a free slot, and it's reported in-use by TaskManager
+ */
+ @Test
+ public void testExistingEmptySlotAdjustedToInUse() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
+ slotManager.addFreeSlot(slot);
+
+ SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE,
+ new AllocationID(), new JobID());
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slot.getSlotId()));
+ }
+
+ /**
+ * Tests that we did some allocation but failed / rejected by TaskManager, request will retry
+ */
+ @Test
+ public void testSlotAllocationFailedAtTaskManager() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
+ slotManager.addFreeSlot(slot);
+
+ SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slot.getSlotId()));
+
+ slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ }
+
+
+ /**
+ * Tests that we did some allocation but failed / rejected by TaskManager, and slot is occupied by another request
+ */
+ @Test
+ public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
+ slotManager.addFreeSlot(slot);
+
+ SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request);
+
+ // slot is set empty by heartbeat
+ SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), slot.getResourceProfile());
+ slotManager.updateSlotStatus(slotStatus);
+
+ // another request took this slot
+ SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request2);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertFalse(slotManager.isAllocated(request.getAllocationId()));
+ assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+
+ // original request should be pended
+ slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertFalse(slotManager.isAllocated(request.getAllocationId()));
+ assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+ }
+
+ @Test
+ public void testNotifyTaskManagerFailure() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+
+ ResourceID resource1 = ResourceID.generate();
+ ResourceID resource2 = ResourceID.generate();
+
+ ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE);
+ ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE);
+ ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE);
+ ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE);
+
+ slotManager.addFreeSlot(slot11);
+ slotManager.addFreeSlot(slot21);
+
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+
+ assertEquals(2, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+
+ slotManager.addFreeSlot(slot12);
+ slotManager.addFreeSlot(slot22);
+
+ assertEquals(2, slotManager.getAllocatedSlotCount());
+ assertEquals(2, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+
+ slotManager.notifyTaskManagerFailure(resource2);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+
+ // notify an not exist resource failure
+ slotManager.notifyTaskManagerFailure(ResourceID.generate());
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ }
+
+ // ------------------------------------------------------------------------
+ // testing utilities
+ // ------------------------------------------------------------------------
+
+ private void directlyProvideFreeSlots(
+ final SlotManager slotManager,
+ final ResourceProfile resourceProfile,
+ final int freeSlotNum)
+ {
+ for (int i = 0; i < freeSlotNum; ++i) {
+ slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile)));
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // testing classes
+ // ------------------------------------------------------------------------
+
+ private static class TestingSlotManager extends SlotManager {
+
+ private final List<ResourceProfile> allocatedContainers;
+
+ TestingSlotManager(ResourceManagerGateway resourceManagerGateway) {
+ super(resourceManagerGateway);
+ this.allocatedContainers = new LinkedList<>();
+ }
+
+ /**
+ * Choose slot randomly if it matches requirement
+ *
+ * @param request The slot request
+ * @param freeSlots All slots which can be used
+ * @return The chosen slot or null if cannot find a match
+ */
+ @Override
+ protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
+ for (ResourceSlot slot : freeSlots.values()) {
+ if (slot.isMatchingRequirement(request.getResourceProfile())) {
+ return slot;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Choose request randomly if offered slot can match its requirement
+ *
+ * @param offeredSlot The free slot
+ * @param pendingRequests All the pending slot requests
+ * @return The chosen request's AllocationID or null if cannot find a match
+ */
+ @Override
+ protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot,
+ Map<AllocationID, SlotRequest> pendingRequests)
+ {
+ for (Map.Entry<AllocationID, SlotRequest> pendingRequest : pendingRequests.entrySet()) {
+ if (offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile())) {
+ return pendingRequest.getValue();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ protected void allocateContainer(ResourceProfile resourceProfile) {
+ allocatedContainers.add(resourceProfile);
+ }
+
+ List<ResourceProfile> getAllocatedContainers() {
+ return allocatedContainers;
+ }
+ }
+}
[3/3] flink git commit: [FLINK-4516] leader election of
resourcemanager
Posted by mx...@apache.org.
[FLINK-4516] leader election of resourcemanager
- add serial rpc service
- add a special rpcService implementation which directly executes the asynchronous calls serially one by one, it is just for testcase
- Change ResourceManagerLeaderContender code and TestingSerialRpcService code
- override shutdown logic to stop leadershipService
- use a mocked RpcService rather than TestingSerialRpcService for resourceManager HA test
This closes #2427
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/454bf51b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/454bf51b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/454bf51b
Branch: refs/heads/flip-6
Commit: 454bf51b063e30075f08d3e400e4db2bf416b969
Parents: c4abfd9
Author: beyond1920 <be...@126.com>
Authored: Sat Aug 27 14:14:28 2016 +0800
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Aug 31 17:17:47 2016 +0200
----------------------------------------------------------------------
.../HighAvailabilityServices.java | 7 +
.../runtime/highavailability/NonHaServices.java | 5 +
.../rpc/resourcemanager/ResourceManager.java | 111 +++++-
.../TestingHighAvailabilityServices.java | 19 +-
.../runtime/rpc/TestingSerialRpcService.java | 369 +++++++++++++++++++
.../resourcemanager/ResourceManagerHATest.java | 76 ++++
6 files changed, 578 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/454bf51b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 73e4f1f..298147c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -40,6 +40,13 @@ public interface HighAvailabilityServices {
LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception;
/**
+ * Gets the leader election service for the cluster's resource manager.
+ * @return
+ * @throws Exception
+ */
+ LeaderElectionService getResourceManagerLeaderElectionService() throws Exception;
+
+ /**
* Gets the leader election service for the given job.
*
* @param jobID The identifier of the job running the election.
http://git-wip-us.apache.org/repos/asf/flink/blob/454bf51b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 3d2769b..292a404 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -61,6 +61,11 @@ public class NonHaServices implements HighAvailabilityServices {
}
@Override
+ public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+ return new StandaloneLeaderElectionService();
+ }
+
+ @Override
public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
return new StandaloneLeaderElectionService();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/454bf51b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
index 6f34465..f7147c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
@@ -20,24 +20,26 @@ package org.apache.flink.runtime.rpc.resourcemanager;
import akka.dispatch.Mapper;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.apache.flink.util.Preconditions;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
import scala.concurrent.Future;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* ResourceManager implementation. The resource manager is responsible for resource de-/allocation
@@ -50,16 +52,51 @@ import java.util.concurrent.ExecutorService;
* </ul>
*/
public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
- private final ExecutionContext executionContext;
private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
+ private final HighAvailabilityServices highAvailabilityServices;
+ private LeaderElectionService leaderElectionService = null;
+ private UUID leaderSessionID = null;
- public ResourceManager(RpcService rpcService, ExecutorService executorService) {
+ public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices) {
super(rpcService);
- this.executionContext = ExecutionContext$.MODULE$.fromExecutor(
- Preconditions.checkNotNull(executorService));
+ this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
this.jobMasterGateways = new HashMap<>();
}
+ @Override
+ public void start() {
+ // start a leader
+ try {
+ super.start();
+ leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
+ leaderElectionService.start(new ResourceManagerLeaderContender());
+ } catch (Throwable e) {
+ log.error("A fatal error happened when starting the ResourceManager", e);
+ throw new RuntimeException("A fatal error happened when starting the ResourceManager", e);
+ }
+ }
+
+ @Override
+ public void shutDown() {
+ try {
+ leaderElectionService.stop();
+ super.shutDown();
+ } catch(Throwable e) {
+ log.error("A fatal error happened when shutdown the ResourceManager", e);
+ throw new RuntimeException("A fatal error happened when shutdown the ResourceManager", e);
+ }
+ }
+
+ /**
+ * Gets the leader session id of current resourceManager.
+ *
+ * @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership.
+ */
+ @VisibleForTesting
+ UUID getLeaderSessionID() {
+ return leaderSessionID;
+ }
+
/**
* Register a {@link JobMaster} at the resource manager.
*
@@ -116,4 +153,62 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
}
+
+ private class ResourceManagerLeaderContender implements LeaderContender {
+
+ /**
+ * Callback method when current resourceManager is granted leadership
+ *
+ * @param leaderSessionID unique leadershipID
+ */
+ @Override
+ public void grantLeadership(final UUID leaderSessionID) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
+ ResourceManager.this.leaderSessionID = leaderSessionID;
+ // confirming the leader session ID might be blocking,
+ leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+ }
+ });
+ }
+
+ /**
+ * Callback method when current resourceManager lose leadership.
+ */
+ @Override
+ public void revokeLeadership() {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.info("ResourceManager {} was revoked leadership.", getAddress());
+ jobMasterGateways.clear();
+ leaderSessionID = null;
+ }
+ });
+ }
+
+ @Override
+ public String getAddress() {
+ return ResourceManager.this.getAddress();
+ }
+
+ /**
+ * Handles error occurring in the leader election service
+ *
+ * @param exception Exception being thrown in the leader election service
+ */
+ @Override
+ public void handleError(final Exception exception) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.error("ResourceManager received an error from the LeaderElectionService.", exception);
+ // terminate ResourceManager in case of an error
+ shutDown();
+ }
+ });
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/454bf51b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 4d654a3..3162f40 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -32,6 +32,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
private volatile LeaderElectionService jobMasterLeaderElectionService;
+ private volatile LeaderElectionService resourceManagerLeaderElectionService;
+
// ------------------------------------------------------------------------
// Setters for mock / testing implementations
@@ -44,7 +46,11 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
public void setJobMasterLeaderElectionService(LeaderElectionService leaderElectionService) {
this.jobMasterLeaderElectionService = leaderElectionService;
}
-
+
+ public void setResourceManagerLeaderElectionService(LeaderElectionService leaderElectionService) {
+ this.resourceManagerLeaderElectionService = leaderElectionService;
+ }
+
// ------------------------------------------------------------------------
// HA Services Methods
// ------------------------------------------------------------------------
@@ -69,4 +75,15 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
throw new IllegalStateException("JobMasterLeaderElectionService has not been set");
}
}
+
+ @Override
+ public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+ LeaderElectionService service = resourceManagerLeaderElectionService;
+
+ if (service != null) {
+ return service;
+ } else {
+ throw new IllegalStateException("ResourceManagerLeaderElectionService has not been set");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/454bf51b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
new file mode 100644
index 0000000..7bdbb99
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.Futures;
+import akka.util.Timeout;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.util.DirectExecutorService;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.BitSet;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * An RPC Service implementation for testing. This RPC service directly executes all asynchronous calls one by one in the main thread.
+ */
+public class TestingSerialRpcService implements RpcService {
+
+ private final DirectExecutorService executorService;
+ private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
+
+ public TestingSerialRpcService() {
+ executorService = new DirectExecutorService();
+ this.registeredConnections = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void scheduleRunnable(final Runnable runnable, final long delay, final TimeUnit unit) {
+ try {
+ unit.sleep(delay);
+ runnable.run();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public ExecutionContext getExecutionContext() {
+ return ExecutionContexts.fromExecutorService(executorService);
+ }
+
+ @Override
+ public void stopService() {
+ executorService.shutdown();
+ registeredConnections.clear();
+ }
+
+ @Override
+ public void stopServer(RpcGateway selfGateway) {
+
+ }
+
+ @Override
+ public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) {
+ final String address = UUID.randomUUID().toString();
+
+ InvocationHandler akkaInvocationHandler = new TestingSerialInvocationHandler(address, rpcEndpoint);
+ ClassLoader classLoader = getClass().getClassLoader();
+
+ @SuppressWarnings("unchecked")
+ C self = (C) Proxy.newProxyInstance(
+ classLoader,
+ new Class<?>[]{
+ rpcEndpoint.getSelfGatewayType(),
+ MainThreadExecutor.class,
+ StartStoppable.class,
+ RpcGateway.class
+ },
+ akkaInvocationHandler);
+
+ return self;
+ }
+
+ @Override
+ public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) {
+ RpcGateway gateway = registeredConnections.get(address);
+
+ if (gateway != null) {
+ if (clazz.isAssignableFrom(gateway.getClass())) {
+ @SuppressWarnings("unchecked")
+ C typedGateway = (C) gateway;
+ return Futures.successful(typedGateway);
+ } else {
+ return Futures.failed(
+ new Exception("Gateway registered under " + address + " is not of type " + clazz));
+ }
+ } else {
+ return Futures.failed(new Exception("No gateway registered under that name"));
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // connections
+ // ------------------------------------------------------------------------
+
+ public void registerGateway(String address, RpcGateway gateway) {
+ checkNotNull(address);
+ checkNotNull(gateway);
+
+ if (registeredConnections.putIfAbsent(address, gateway) != null) {
+ throw new IllegalStateException("a gateway is already registered under " + address);
+ }
+ }
+
+ private static class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable {
+
+ private final T rpcEndpoint;
+
+ /** default timeout for asks */
+ private final Timeout timeout;
+
+ private final String address;
+
+ private TestingSerialInvocationHandler(String address, T rpcEndpoint) {
+ this(address, rpcEndpoint, new Timeout(new FiniteDuration(10, TimeUnit.SECONDS)));
+ }
+
+ private TestingSerialInvocationHandler(String address, T rpcEndpoint, Timeout timeout) {
+ this.rpcEndpoint = rpcEndpoint;
+ this.timeout = timeout;
+ this.address = address;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ Class<?> declaringClass = method.getDeclaringClass();
+ if (declaringClass.equals(MainThreadExecutor.class) ||
+ declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) ||
+ declaringClass.equals(RpcGateway.class)) {
+ return method.invoke(this, args);
+ } else {
+ final String methodName = method.getName();
+ Class<?>[] parameterTypes = method.getParameterTypes();
+ Annotation[][] parameterAnnotations = method.getParameterAnnotations();
+ Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
+
+ final Tuple2<Class<?>[], Object[]> filteredArguments = filterArguments(
+ parameterTypes,
+ parameterAnnotations,
+ args);
+
+ Class<?> returnType = method.getReturnType();
+
+ if (returnType.equals(Future.class)) {
+ try {
+ Object result = handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
+ return Futures.successful(result);
+ } catch (Throwable e) {
+ return Futures.failed(e);
+ }
+ } else {
+ return handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
+ }
+ }
+ }
+
+ /**
+ * Handle rpc invocations by looking up the rpc method on the rpc endpoint and calling this
+ * method with the provided method arguments. If the method has a return value, it is returned
+ * to the sender of the call.
+ */
+ private Object handleRpcInvocationSync(final String methodName,
+ final Class<?>[] parameterTypes,
+ final Object[] args,
+ final Timeout futureTimeout) throws Exception {
+ final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes);
+ Object result = rpcMethod.invoke(rpcEndpoint, args);
+
+ if (result != null && result instanceof Future) {
+ Future<?> future = (Future<?>) result;
+ return Await.result(future, futureTimeout.duration());
+ } else {
+ return result;
+ }
+ }
+
+ @Override
+ public void runAsync(Runnable runnable) {
+ runnable.run();
+ }
+
+ @Override
+ public <V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout) {
+ try {
+ return Futures.successful(callable.call());
+ } catch (Throwable e) {
+ return Futures.failed(e);
+ }
+ }
+
+ @Override
+ public void scheduleRunAsync(final Runnable runnable, final long delay) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(delay);
+ runnable.run();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String getAddress() {
+ return address;
+ }
+
+ @Override
+ public void start() {
+ // do nothing
+ }
+
+ @Override
+ public void stop() {
+ // do nothing
+ }
+
+ /**
+ * Look up the rpc method on the given {@link RpcEndpoint} instance.
+ *
+ * @param methodName Name of the method
+ * @param parameterTypes Parameter types of the method
+ * @return Method of the rpc endpoint
+ * @throws NoSuchMethodException Thrown if the method with the given name and parameter types
+ * cannot be found at the rpc endpoint
+ */
+ private Method lookupRpcMethod(final String methodName,
+ final Class<?>[] parameterTypes) throws NoSuchMethodException {
+ return rpcEndpoint.getClass().getMethod(methodName, parameterTypes);
+ }
+
+ // ------------------------------------------------------------------------
+ // Helper methods
+ // ------------------------------------------------------------------------
+
+ /**
+ * Extracts the {@link RpcTimeout} annotated rpc timeout value from the list of given method
+ * arguments. If no {@link RpcTimeout} annotated parameter could be found, then the default
+ * timeout is returned.
+ *
+ * @param parameterAnnotations Parameter annotations
+ * @param args Array of arguments
+ * @param defaultTimeout Default timeout to return if no {@link RpcTimeout} annotated parameter
+ * has been found
+ * @return Timeout extracted from the array of arguments or the default timeout
+ */
+ private static Timeout extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args,
+ Timeout defaultTimeout) {
+ if (args != null) {
+ Preconditions.checkArgument(parameterAnnotations.length == args.length);
+
+ for (int i = 0; i < parameterAnnotations.length; i++) {
+ if (isRpcTimeout(parameterAnnotations[i])) {
+ if (args[i] instanceof FiniteDuration) {
+ return new Timeout((FiniteDuration) args[i]);
+ } else {
+ throw new RuntimeException("The rpc timeout parameter must be of type " +
+ FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() +
+ " is not supported.");
+ }
+ }
+ }
+ }
+
+ return defaultTimeout;
+ }
+
+ /**
+ * Removes all {@link RpcTimeout} annotated parameters from the parameter type and argument
+ * list.
+ *
+ * @param parameterTypes Array of parameter types
+ * @param parameterAnnotations Array of parameter annotations
+ * @param args Arary of arguments
+ * @return Tuple of filtered parameter types and arguments which no longer contain the
+ * {@link RpcTimeout} annotated parameter types and arguments
+ */
+ private static Tuple2<Class<?>[], Object[]> filterArguments(
+ Class<?>[] parameterTypes,
+ Annotation[][] parameterAnnotations,
+ Object[] args) {
+
+ Class<?>[] filteredParameterTypes;
+ Object[] filteredArgs;
+
+ if (args == null) {
+ filteredParameterTypes = parameterTypes;
+ filteredArgs = null;
+ } else {
+ Preconditions.checkArgument(parameterTypes.length == parameterAnnotations.length);
+ Preconditions.checkArgument(parameterAnnotations.length == args.length);
+
+ BitSet isRpcTimeoutParameter = new BitSet(parameterTypes.length);
+ int numberRpcParameters = parameterTypes.length;
+
+ for (int i = 0; i < parameterTypes.length; i++) {
+ if (isRpcTimeout(parameterAnnotations[i])) {
+ isRpcTimeoutParameter.set(i);
+ numberRpcParameters--;
+ }
+ }
+
+ if (numberRpcParameters == parameterTypes.length) {
+ filteredParameterTypes = parameterTypes;
+ filteredArgs = args;
+ } else {
+ filteredParameterTypes = new Class<?>[numberRpcParameters];
+ filteredArgs = new Object[numberRpcParameters];
+ int counter = 0;
+
+ for (int i = 0; i < parameterTypes.length; i++) {
+ if (!isRpcTimeoutParameter.get(i)) {
+ filteredParameterTypes[counter] = parameterTypes[i];
+ filteredArgs[counter] = args[i];
+ counter++;
+ }
+ }
+ }
+ }
+ return Tuple2.of(filteredParameterTypes, filteredArgs);
+ }
+
+ /**
+ * Checks whether any of the annotations is of type {@link RpcTimeout}
+ *
+ * @param annotations Array of annotations
+ * @return True if {@link RpcTimeout} was found; otherwise false
+ */
+ private static boolean isRpcTimeout(Annotation[] annotations) {
+ for (Annotation annotation : annotations) {
+ if (annotation.annotationType().equals(RpcTimeout.class)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/454bf51b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
new file mode 100644
index 0000000..dfffeda
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.StartStoppable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * resourceManager HA test, including grant leadership and revoke leadership
+ */
+public class ResourceManagerHATest {
+
+ @Test
+ public void testGrantAndRevokeLeadership() throws Exception {
+ // mock a RpcService which will return a special RpcGateway when call its startServer method, the returned RpcGateway directly execute runAsync call
+ TestingResourceManagerGatewayProxy gateway = mock(TestingResourceManagerGatewayProxy.class);
+ doCallRealMethod().when(gateway).runAsync(any(Runnable.class));
+
+ RpcService rpcService = mock(RpcService.class);
+ when(rpcService.startServer(any(RpcEndpoint.class))).thenReturn(gateway);
+
+ TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+ TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+ highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
+
+ final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices);
+ resourceManager.start();
+ // before grant leadership, resourceManager's leaderId is null
+ Assert.assertNull(resourceManager.getLeaderSessionID());
+ final UUID leaderId = UUID.randomUUID();
+ leaderElectionService.isLeader(leaderId);
+ // after grant leadership, resourceManager's leaderId has value
+ Assert.assertEquals(leaderId, resourceManager.getLeaderSessionID());
+ // then revoke leadership, resourceManager's leaderId is null again
+ leaderElectionService.notLeader();
+ Assert.assertNull(resourceManager.getLeaderSessionID());
+ }
+
+ private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutor, StartStoppable, RpcGateway {
+ @Override
+ public void runAsync(Runnable runnable) {
+ runnable.run();
+ }
+ }
+
+}
[2/3] flink git commit: [FLINK-4363] Implement TaskManager basic
startup of all components in java
Posted by mx...@apache.org.
[FLINK-4363] Implement TaskManager basic startup of all components in java
This closes #2400
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/806f2f14
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/806f2f14
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/806f2f14
Branch: refs/heads/flip-6
Commit: 806f2f14832048ad5b9b7e93ce75102f2f656767
Parents: 5d71a55
Author: \u6dd8\u6c5f <ta...@alibaba-inc.com>
Authored: Tue Aug 30 11:28:14 2016 +0800
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Aug 31 17:17:46 2016 +0200
----------------------------------------------------------------------
.../runtime/rpc/taskexecutor/TaskExecutor.java | 686 ++++++++++++++++++-
.../taskexecutor/TaskExecutorConfiguration.java | 151 ++++
.../rpc/taskexecutor/TaskExecutorTest.java | 8 +-
3 files changed, 822 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/806f2f14/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
index f201e00..36d6310 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
@@ -18,15 +18,60 @@
package org.apache.flink.runtime.rpc.taskexecutor;
+import akka.actor.ActorSystem;
+import akka.dispatch.ExecutionContexts$;
+import akka.util.Timeout;
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.HybridMemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
-
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskmanager.MemoryLogger;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.NetUtils;
+
+import scala.Tuple2;
+import scala.Option;
+import scala.Some;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -36,12 +81,29 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
+ private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class);
+
/** The unique resource ID of this TaskExecutor */
private final ResourceID resourceID;
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
+ /** The task manager configuration */
+ private final TaskExecutorConfiguration taskExecutorConfig;
+
+ /** The I/O manager component in the task manager */
+ private final IOManager ioManager;
+
+ /** The memory manager component in the task manager */
+ private final MemoryManager memoryManager;
+
+ /** The network component in the task manager */
+ private final NetworkEnvironment networkEnvironment;
+
+ /** The number of slots in the task manager, should be 1 for YARN */
+ private final int numberOfSlots;
+
// --------- resource manager --------
private TaskExecutorToResourceManagerConnection resourceManagerConnection;
@@ -49,22 +111,24 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
// ------------------------------------------------------------------------
public TaskExecutor(
+ TaskExecutorConfiguration taskExecutorConfig,
+ ResourceID resourceID,
+ MemoryManager memoryManager,
+ IOManager ioManager,
+ NetworkEnvironment networkEnvironment,
+ int numberOfSlots,
RpcService rpcService,
- HighAvailabilityServices haServices,
- ResourceID resourceID) {
+ HighAvailabilityServices haServices) {
super(rpcService);
- this.haServices = checkNotNull(haServices);
+ this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
this.resourceID = checkNotNull(resourceID);
- }
-
- // ------------------------------------------------------------------------
- // Properties
- // ------------------------------------------------------------------------
-
- public ResourceID getResourceID() {
- return resourceID;
+ this.memoryManager = checkNotNull(memoryManager);
+ this.ioManager = checkNotNull(ioManager);
+ this.networkEnvironment = checkNotNull(networkEnvironment);
+ this.numberOfSlots = checkNotNull(numberOfSlots);
+ this.haServices = checkNotNull(haServices);
}
// ------------------------------------------------------------------------
@@ -83,7 +147,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
}
-
// ------------------------------------------------------------------------
// RPC methods - ResourceManager related
// ------------------------------------------------------------------------
@@ -94,12 +157,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
if (newLeaderAddress != null) {
// the resource manager switched to a new leader
log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
- resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress);
+ resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress);
}
else {
// address null means that the current leader is lost without a new leader being there, yet
log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
- resourceManagerConnection.getResourceManagerAddress());
+ resourceManagerConnection.getResourceManagerAddress());
}
// drop the current connection or connection attempt
@@ -112,21 +175,604 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
// establish a connection to the new leader
if (newLeaderAddress != null) {
log.info("Attempting to register at ResourceManager {}", newLeaderAddress);
- resourceManagerConnection =
- new TaskExecutorToResourceManagerConnection(log, this, newLeaderAddress, newLeaderId);
+ resourceManagerConnection =
+ new TaskExecutorToResourceManagerConnection(log, this, newLeaderAddress, newLeaderId);
resourceManagerConnection.start();
}
}
+ /**
+ * Starts and runs the TaskManager.
+ * <p/>
+ * This method first tries to select the network interface to use for the TaskManager
+ * communication. The network interface is used both for the actor communication
+ * (coordination) as well as for the data exchange between task managers. Unless
+ * the hostname/interface is explicitly configured in the configuration, this
+ * method will try out various interfaces and methods to connect to the JobManager
+ * and select the one where the connection attempt is successful.
+ * <p/>
+ * After selecting the network interface, this method brings up an actor system
+ * for the TaskManager and its actors, starts the TaskManager's services
+ * (library cache, shuffle network stack, ...), and starts the TaskManager itself.
+ *
+ * @param configuration The configuration for the TaskManager.
+ * @param resourceID The id of the resource which the task manager will run on.
+ */
+ public static void selectNetworkInterfaceAndRunTaskManager(
+ Configuration configuration,
+ ResourceID resourceID) throws Exception {
+
+ final InetSocketAddress taskManagerAddress = selectNetworkInterfaceAndPort(configuration);
+
+ runTaskManager(taskManagerAddress.getHostName(), resourceID, taskManagerAddress.getPort(), configuration);
+ }
+
+ private static InetSocketAddress selectNetworkInterfaceAndPort(Configuration configuration)
+ throws Exception {
+ String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+ if (taskManagerHostname != null) {
+ LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname);
+ } else {
+ LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+ FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
+
+ InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, lookupTimeout);
+ taskManagerHostname = taskManagerAddress.getHostName();
+ LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
+ taskManagerHostname, taskManagerAddress.getHostAddress());
+ }
+
+ // if no task manager port has been configured, use 0 (system will pick any free port)
+ final int actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+ if (actorSystemPort < 0 || actorSystemPort > 65535) {
+ throw new IllegalConfigurationException("Invalid value for '" +
+ ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
+ "' (port for the TaskManager actor system) : " + actorSystemPort +
+ " - Leave config parameter empty or use 0 to let the system choose a port automatically.");
+ }
+
+ return new InetSocketAddress(taskManagerHostname, actorSystemPort);
+ }
+
+ /**
+ * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and its
+ * actors, starts the TaskManager's services (library cache, shuffle network stack, ...),
+ * and starts the TaskManager itself.
+ * <p/>
+ * This method will also spawn a process reaper for the TaskManager (kill the process if
+ * the actor fails) and optionally start the JVM memory logging thread.
+ *
+ * @param taskManagerHostname The hostname/address of the interface where the actor system
+ * will communicate.
+ * @param resourceID The id of the resource which the task manager will run on.
+ * @param actorSystemPort The port at which the actor system will communicate.
+ * @param configuration The configuration for the TaskManager.
+ */
+ private static void runTaskManager(
+ String taskManagerHostname,
+ ResourceID resourceID,
+ int actorSystemPort,
+ final Configuration configuration) throws Exception {
+
+ LOG.info("Starting TaskManager");
+
+ // Bring up the TaskManager actor system first, bind it to the given address.
+
+ LOG.info("Starting TaskManager actor system at " +
+ NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort));
+
+ final ActorSystem taskManagerSystem;
+ try {
+ Tuple2<String, Object> address = new Tuple2<String, Object>(taskManagerHostname, actorSystemPort);
+ Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new Some<>(address));
+ LOG.debug("Using akka configuration\n " + akkaConfig);
+ taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig);
+ } catch (Throwable t) {
+ if (t instanceof org.jboss.netty.channel.ChannelException) {
+ Throwable cause = t.getCause();
+ if (cause != null && t.getCause() instanceof java.net.BindException) {
+ String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort);
+ throw new IOException("Unable to bind TaskManager actor system to address " +
+ address + " - " + cause.getMessage(), t);
+ }
+ }
+ throw new Exception("Could not create TaskManager actor system", t);
+ }
+
+ // start akka rpc service based on actor system
+ final Timeout timeout = new Timeout(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS);
+ final AkkaRpcService akkaRpcService = new AkkaRpcService(taskManagerSystem, timeout);
+
+ // start high availability service to implement getResourceManagerLeaderRetriever method only
+ final HighAvailabilityServices haServices = new HighAvailabilityServices() {
+ @Override
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+ return LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+ }
+
+ @Override
+ public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+ return null;
+ }
+
+ @Override
+ public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
+ return null;
+ }
+ };
+
+ // start all the TaskManager services (network stack, library cache, ...)
+ // and the TaskManager actor
+ try {
+ LOG.info("Starting TaskManager actor");
+ TaskExecutor taskExecutor = startTaskManagerComponentsAndActor(
+ configuration,
+ resourceID,
+ akkaRpcService,
+ taskManagerHostname,
+ haServices,
+ false);
+
+ taskExecutor.start();
+
+ // if desired, start the logging daemon that periodically logs the memory usage information
+ if (LOG.isInfoEnabled() && configuration.getBoolean(
+ ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
+ ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) {
+ LOG.info("Starting periodic memory usage logger");
+
+ long interval = configuration.getLong(
+ ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
+ ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS);
+
+ MemoryLogger logger = new MemoryLogger(LOG, interval, taskManagerSystem);
+ logger.start();
+ }
+
+ // block until everything is done
+ taskManagerSystem.awaitTermination();
+ } catch (Throwable t) {
+ LOG.error("Error while starting up taskManager", t);
+ try {
+ taskManagerSystem.shutdown();
+ } catch (Throwable tt) {
+ LOG.warn("Could not cleanly shut down actor system", tt);
+ }
+ throw t;
+ }
+ }
+
+ // --------------------------------------------------------------------------
+ // Starting and running the TaskManager
+ // --------------------------------------------------------------------------
+
+ /**
+ * @param configuration The configuration for the TaskManager.
+ * @param resourceID The id of the resource which the task manager will run on.
+ * @param rpcService The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
+ * @param taskManagerHostname The hostname/address that describes the TaskManager's data location.
+ * @param haServices Optionally, a high availability service can be provided. If none is given,
+ * then a HighAvailabilityServices is constructed from the configuration.
+ * @param localTaskManagerCommunication If true, the TaskManager will not initiate the TCP network stack.
+ * @return An ActorRef to the TaskManager actor.
+ * @throws org.apache.flink.configuration.IllegalConfigurationException Thrown, if the given config contains illegal values.
+ * @throws java.io.IOException Thrown, if any of the I/O components (such as buffer pools,
+ * I/O manager, ...) cannot be properly started.
+ * @throws java.lang.Exception Thrown is some other error occurs while parsing the configuration
+ * or starting the TaskManager components.
+ */
+ public static TaskExecutor startTaskManagerComponentsAndActor(
+ Configuration configuration,
+ ResourceID resourceID,
+ RpcService rpcService,
+ String taskManagerHostname,
+ HighAvailabilityServices haServices,
+ boolean localTaskManagerCommunication) throws Exception {
+
+ final TaskExecutorConfiguration taskExecutorConfig = parseTaskManagerConfiguration(
+ configuration, taskManagerHostname, localTaskManagerCommunication);
+
+ MemoryType memType = taskExecutorConfig.getNetworkConfig().memoryType();
+
+ // pre-start checks
+ checkTempDirs(taskExecutorConfig.getTmpDirPaths());
+
+ ExecutionContext executionContext = ExecutionContexts$.MODULE$.fromExecutor(new ForkJoinPool());
+
+ // we start the network first, to make sure it can allocate its buffers first
+ final NetworkEnvironment network = new NetworkEnvironment(
+ executionContext,
+ taskExecutorConfig.getTimeout(),
+ taskExecutorConfig.getNetworkConfig(),
+ taskExecutorConfig.getConnectionInfo());
+
+ // computing the amount of memory to use depends on how much memory is available
+ // it strictly needs to happen AFTER the network stack has been initialized
+
+ // check if a value has been configured
+ long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+ checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
+ ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
+ "MemoryManager needs at least one MB of memory. " +
+ "If you leave this config parameter empty, the system automatically " +
+ "pick a fraction of the available memory.");
+
+ final long memorySize;
+ boolean preAllocateMemory = configuration.getBoolean(
+ ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
+ if (configuredMemory > 0) {
+ if (preAllocateMemory) {
+ LOG.info("Using {} MB for managed memory." , configuredMemory);
+ } else {
+ LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);
+ }
+ memorySize = configuredMemory << 20; // megabytes to bytes
+ } else {
+ float fraction = configuration.getFloat(
+ ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+ ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
+ checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction,
+ ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+ "MemoryManager fraction of the free memory must be between 0.0 and 1.0");
+
+ if (memType == MemoryType.HEAP) {
+ long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction);
+ if (preAllocateMemory) {
+ LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
+ fraction , relativeMemSize >> 20);
+ } else {
+ LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " +
+ "memory will be allocated lazily." , fraction , relativeMemSize >> 20);
+ }
+ memorySize = relativeMemSize;
+ } else if (memType == MemoryType.OFF_HEAP) {
+ // The maximum heap memory has been adjusted according to the fraction
+ long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
+ long directMemorySize = (long) (maxMemory / (1.0 - fraction) * fraction);
+ if (preAllocateMemory) {
+ LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,
+ fraction, directMemorySize >> 20);
+ } else {
+ LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +
+ " memory will be allocated lazily.", fraction, directMemorySize >> 20);
+ }
+ memorySize = directMemorySize;
+ } else {
+ throw new RuntimeException("No supported memory type detected.");
+ }
+ }
+
+ // now start the memory manager
+ final MemoryManager memoryManager;
+ try {
+ memoryManager = new MemoryManager(
+ memorySize,
+ taskExecutorConfig.getNumberOfSlots(),
+ taskExecutorConfig.getNetworkConfig().networkBufferSize(),
+ memType,
+ preAllocateMemory);
+ } catch (OutOfMemoryError e) {
+ if (memType == MemoryType.HEAP) {
+ throw new Exception("OutOfMemory error (" + e.getMessage() +
+ ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);
+ } else if (memType == MemoryType.OFF_HEAP) {
+ throw new Exception("OutOfMemory error (" + e.getMessage() +
+ ") while allocating the TaskManager off-heap memory (" + memorySize +
+ " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);
+ } else {
+ throw e;
+ }
+ }
+
+ // start the I/O manager, it will create some temp directories.
+ final IOManager ioManager = new IOManagerAsync(taskExecutorConfig.getTmpDirPaths());
+
+ final TaskExecutor taskExecutor = new TaskExecutor(
+ taskExecutorConfig,
+ resourceID,
+ memoryManager,
+ ioManager,
+ network,
+ taskExecutorConfig.getNumberOfSlots(),
+ rpcService,
+ haServices);
+
+ return taskExecutor;
+ }
+
+ // --------------------------------------------------------------------------
+ // Parsing and checking the TaskManager Configuration
+ // --------------------------------------------------------------------------
+
+ /**
+ * Utility method to extract TaskManager config parameters from the configuration and to
+ * sanity check them.
+ *
+ * @param configuration The configuration.
+ * @param taskManagerHostname The host name under which the TaskManager communicates.
+ * @param localTaskManagerCommunication True, to skip initializing the network stack.
+ * Use only in cases where only one task manager runs.
+ * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.
+ */
+ private static TaskExecutorConfiguration parseTaskManagerConfiguration(
+ Configuration configuration,
+ String taskManagerHostname,
+ boolean localTaskManagerCommunication) throws Exception {
+
+ // ------- read values from the config and check them ---------
+ // (a lot of them)
+
+ // ----> hosts / ports for communication and data exchange
+
+ int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
+ if (dataport == 0) {
+ dataport = NetUtils.getAvailablePort();
+ }
+ checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+ "Leave config parameter empty or use 0 to let the system choose a port automatically.");
+
+ InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname);
+ final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(taskManagerAddress, dataport);
+
+ // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories
+
+ // we need this because many configs have been written with a "-1" entry
+ int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ if (slots == -1) {
+ slots = 1;
+ }
+ checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+ "Number of task slots must be at least one.");
+
+ final int numNetworkBuffers = configuration.getInteger(
+ ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
+ checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
+ ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, "");
+
+ final int pageSize = configuration.getInteger(
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
+ // check page size of for minimum size
+ checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+ "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);
+ // check page size for power of two
+ checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+ "Memory segment size must be a power of 2.");
+
+ // check whether we use heap or off-heap memory
+ final MemoryType memType;
+ if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) {
+ memType = MemoryType.OFF_HEAP;
+ } else {
+ memType = MemoryType.HEAP;
+ }
+
+ // initialize the memory segment factory accordingly
+ if (memType == MemoryType.HEAP) {
+ if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) {
+ throw new Exception("Memory type is set to heap memory, but memory segment " +
+ "factory has been initialized for off-heap memory segments");
+ }
+ } else {
+ if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) {
+ throw new Exception("Memory type is set to off-heap memory, but memory segment " +
+ "factory has been initialized for heap memory segments");
+ }
+ }
+
+ final String[] tmpDirs = configuration.getString(
+ ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
+
+ final NettyConfig nettyConfig;
+ if (!localTaskManagerCommunication) {
+ nettyConfig = new NettyConfig(connectionInfo.address(), connectionInfo.dataPort(), pageSize, slots, configuration);
+ } else {
+ nettyConfig = null;
+ }
+
+ // Default spill I/O mode for intermediate results
+ final String syncOrAsync = configuration.getString(
+ ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);
+
+ final IOMode ioMode;
+ if (syncOrAsync.equals("async")) {
+ ioMode = IOManager.IOMode.ASYNC;
+ } else {
+ ioMode = IOManager.IOMode.SYNC;
+ }
+
+ final int queryServerPort = configuration.getInteger(
+ ConfigConstants.QUERYABLE_STATE_SERVER_PORT,
+ ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT);
+
+ final int queryServerNetworkThreads = configuration.getInteger(
+ ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS,
+ ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS);
+
+ final int queryServerQueryThreads = configuration.getInteger(
+ ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS,
+ ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS);
+
+ final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration(
+ numNetworkBuffers,
+ pageSize,
+ memType,
+ ioMode,
+ queryServerPort,
+ queryServerNetworkThreads,
+ queryServerQueryThreads,
+ localTaskManagerCommunication ? Option.<NettyConfig>empty() : new Some<>(nettyConfig),
+ new Tuple2<>(500, 3000));
+
+ // ----> timeouts, library caching, profiling
+
+ final FiniteDuration timeout;
+ try {
+ timeout = AkkaUtils.getTimeout(configuration);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ "Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT +
+ "'.Use formats like '50 s' or '1 min' to specify the timeout.");
+ }
+ LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout);
+
+ final long cleanupInterval = configuration.getLong(
+ ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+ ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+
+ final FiniteDuration finiteRegistrationDuration;
+ try {
+ Duration maxRegistrationDuration = Duration.create(configuration.getString(
+ ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
+ ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION));
+ if (maxRegistrationDuration.isFinite()) {
+ finiteRegistrationDuration = new FiniteDuration(maxRegistrationDuration.toSeconds(), TimeUnit.SECONDS);
+ } else {
+ finiteRegistrationDuration = null;
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid format for parameter " +
+ ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e);
+ }
+
+ final FiniteDuration initialRegistrationPause;
+ try {
+ Duration pause = Duration.create(configuration.getString(
+ ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
+ ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE));
+ if (pause.isFinite()) {
+ initialRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
+ } else {
+ throw new IllegalArgumentException("The initial registration pause must be finite: " + pause);
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid format for parameter " +
+ ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+ }
+
+ final FiniteDuration maxRegistrationPause;
+ try {
+ Duration pause = Duration.create(configuration.getString(
+ ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
+ ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE));
+ if (pause.isFinite()) {
+ maxRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
+ } else {
+ throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause);
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid format for parameter " +
+ ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+ }
+
+ final FiniteDuration refusedRegistrationPause;
+ try {
+ Duration pause = Duration.create(configuration.getString(
+ ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
+ ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE));
+ if (pause.isFinite()) {
+ refusedRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
+ } else {
+ throw new IllegalArgumentException("The refused registration pause must be finite: " + pause);
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid format for parameter " +
+ ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+ }
+
+ return new TaskExecutorConfiguration(
+ tmpDirs,
+ cleanupInterval,
+ connectionInfo,
+ networkConfig,
+ timeout,
+ finiteRegistrationDuration,
+ slots,
+ configuration,
+ initialRegistrationPause,
+ maxRegistrationPause,
+ refusedRegistrationPause);
+ }
+
+ /**
+ * Validates a condition for a config parameter and displays a standard exception, if the
+ * the condition does not hold.
+ *
+ * @param condition The condition that must hold. If the condition is false, an exception is thrown.
+ * @param parameter The parameter value. Will be shown in the exception message.
+ * @param name The name of the config parameter. Will be shown in the exception message.
+ * @param errorMessage The optional custom error message to append to the exception message.
+ */
+ private static void checkConfigParameter(
+ boolean condition,
+ Object parameter,
+ String name,
+ String errorMessage) {
+ if (!condition) {
+ throw new IllegalConfigurationException("Invalid configuration value for " + name + " : " + parameter + " - " + errorMessage);
+ }
+ }
+
+ /**
+ * Validates that all the directories denoted by the strings do actually exist, are proper
+ * directories (not files), and are writable.
+ *
+ * @param tmpDirs The array of directory paths to check.
+ * @throws Exception Thrown if any of the directories does not exist or is not writable
+ * or is a file, rather than a directory.
+ */
+ private static void checkTempDirs(String[] tmpDirs) throws IOException {
+ for (String dir : tmpDirs) {
+ if (dir != null && !dir.equals("")) {
+ File file = new File(dir);
+ if (!file.exists()) {
+ throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist.");
+ }
+ if (!file.isDirectory()) {
+ throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory.");
+ }
+ if (!file.canWrite()) {
+ throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not writable.");
+ }
+
+ if (LOG.isInfoEnabled()) {
+ long totalSpaceGb = file.getTotalSpace() >> 30;
+ long usableSpaceGb = file.getUsableSpace() >> 30;
+ double usablePercentage = (double)usableSpaceGb / totalSpaceGb * 100;
+ String path = file.getAbsolutePath();
+ LOG.info(String.format("Temporary file directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)",
+ path, totalSpaceGb, usableSpaceGb, usablePercentage));
+ }
+ } else {
+ throw new IllegalArgumentException("Temporary file directory #$id is null.");
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Properties
+ // ------------------------------------------------------------------------
+
+ public ResourceID getResourceID() {
+ return resourceID;
+ }
+
// ------------------------------------------------------------------------
- // Error handling
+ // Error Handling
// ------------------------------------------------------------------------
/**
* Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed.
* This method should be used when asynchronous threads want to notify the
* TaskExecutor of a fatal error.
- *
+ *
* @param t The exception describing the fatal error
*/
void onFatalErrorAsync(final Throwable t) {
@@ -141,7 +787,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
/**
* Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed.
* This method must only be called from within the TaskExecutor's main thread.
- *
+ *
* @param t The exception describing the fatal error
*/
void onFatalError(Throwable t) {
http://git-wip-us.apache.org/repos/asf/flink/blob/806f2f14/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorConfiguration.java
new file mode 100644
index 0000000..32484e1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorConfiguration.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link TaskExecutor} Configuration
+ */
+public class TaskExecutorConfiguration implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String[] tmpDirPaths;
+
+ private final long cleanupInterval;
+
+ private final int numberOfSlots;
+
+ private final Configuration configuration;
+
+ private final FiniteDuration timeout;
+ private final FiniteDuration maxRegistrationDuration;
+ private final FiniteDuration initialRegistrationPause;
+ private final FiniteDuration maxRegistrationPause;
+ private final FiniteDuration refusedRegistrationPause;
+
+ private final NetworkEnvironmentConfiguration networkConfig;
+
+ private final InstanceConnectionInfo connectionInfo;
+
+ public TaskExecutorConfiguration(
+ String[] tmpDirPaths,
+ long cleanupInterval,
+ InstanceConnectionInfo connectionInfo,
+ NetworkEnvironmentConfiguration networkConfig,
+ FiniteDuration timeout,
+ FiniteDuration maxRegistrationDuration,
+ int numberOfSlots,
+ Configuration configuration) {
+
+ this (tmpDirPaths,
+ cleanupInterval,
+ connectionInfo,
+ networkConfig,
+ timeout,
+ maxRegistrationDuration,
+ numberOfSlots,
+ configuration,
+ new FiniteDuration(500, TimeUnit.MILLISECONDS),
+ new FiniteDuration(30, TimeUnit.SECONDS),
+ new FiniteDuration(10, TimeUnit.SECONDS));
+ }
+
+ public TaskExecutorConfiguration(
+ String[] tmpDirPaths,
+ long cleanupInterval,
+ InstanceConnectionInfo connectionInfo,
+ NetworkEnvironmentConfiguration networkConfig,
+ FiniteDuration timeout,
+ FiniteDuration maxRegistrationDuration,
+ int numberOfSlots,
+ Configuration configuration,
+ FiniteDuration initialRegistrationPause,
+ FiniteDuration maxRegistrationPause,
+ FiniteDuration refusedRegistrationPause) {
+
+ this.tmpDirPaths = checkNotNull(tmpDirPaths);
+ this.cleanupInterval = checkNotNull(cleanupInterval);
+ this.connectionInfo = checkNotNull(connectionInfo);
+ this.networkConfig = checkNotNull(networkConfig);
+ this.timeout = checkNotNull(timeout);
+ this.maxRegistrationDuration = maxRegistrationDuration;
+ this.numberOfSlots = checkNotNull(numberOfSlots);
+ this.configuration = checkNotNull(configuration);
+ this.initialRegistrationPause = checkNotNull(initialRegistrationPause);
+ this.maxRegistrationPause = checkNotNull(maxRegistrationPause);
+ this.refusedRegistrationPause = checkNotNull(refusedRegistrationPause);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Properties
+ // --------------------------------------------------------------------------------------------
+
+ public String[] getTmpDirPaths() {
+ return tmpDirPaths;
+ }
+
+ public long getCleanupInterval() {
+ return cleanupInterval;
+ }
+
+ public InstanceConnectionInfo getConnectionInfo() { return connectionInfo; }
+
+ public NetworkEnvironmentConfiguration getNetworkConfig() { return networkConfig; }
+
+ public FiniteDuration getTimeout() {
+ return timeout;
+ }
+
+ public FiniteDuration getMaxRegistrationDuration() {
+ return maxRegistrationDuration;
+ }
+
+ public int getNumberOfSlots() {
+ return numberOfSlots;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public FiniteDuration getInitialRegistrationPause() {
+ return initialRegistrationPause;
+ }
+
+ public FiniteDuration getMaxRegistrationPause() {
+ return maxRegistrationPause;
+ }
+
+ public FiniteDuration getRefusedRegistrationPause() {
+ return refusedRegistrationPause;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/806f2f14/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
index b831ead..25a670c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.rpc.taskexecutor;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.NonHaServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -49,9 +50,9 @@ public class TaskExecutorTest extends TestLogger {
rpc.registerGateway(resourceManagerAddress, rmGateway);
NonHaServices haServices = new NonHaServices(resourceManagerAddress);
- TaskExecutor taskManager = new TaskExecutor(rpc, haServices, resourceID);
+ TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor(
+ new Configuration(), resourceID, rpc, "localhost", haServices, true);
String taskManagerAddress = taskManager.getAddress();
-
taskManager.start();
verify(rmGateway, timeout(5000)).registerTaskExecutor(
@@ -84,7 +85,8 @@ public class TaskExecutorTest extends TestLogger {
TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
haServices.setResourceManagerLeaderRetriever(testLeaderService);
- TaskExecutor taskManager = new TaskExecutor(rpc, haServices, resourceID);
+ TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor(
+ new Configuration(), resourceID, rpc, "localhost", haServices, true);
String taskManagerAddress = taskManager.getAddress();
taskManager.start();