You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ss...@apache.org on 2013/02/26 04:32:11 UTC
svn commit: r1450007 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-serv...
Author: sseth
Date: Tue Feb 26 03:32:10 2013
New Revision: 1450007
URL: http://svn.apache.org/r1450007
Log:
YARN-365. Change NM heartbeat handling to not generate a scheduler event on each heartbeat. (Contributed by Xuan Gong)
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/UpdatedContainerInfo.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1450007&r1=1450006&r2=1450007&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Feb 26 03:32:10 2013
@@ -22,6 +22,9 @@ Release 2.0.4-beta - UNRELEASED
IMPROVEMENTS
+ YARN-365. Change NM heartbeat handling to not generate a scheduler event
+ on each heartbeat. (Xuan Gong via sseth)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java?rev=1450007&r1=1450006&r2=1450007&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java Tue Feb 26 03:32:10 2013
@@ -106,4 +106,13 @@ public interface RMNode {
public List<ApplicationId> getAppsToCleanup();
public HeartbeatResponse getLastHeartBeatResponse();
+
+ /**
+ * Get and clear the list of containerUpdates accumulated across NM
+ * heartbeats.
+ *
+ * @return containerUpdates accumulated across NM heartbeats.
+ */
+ public List<UpdatedContainerInfo> pullContainerUpdates();
+
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1450007&r1=1450006&r2=1450007&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Tue Feb 26 03:32:10 2013
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -60,6 +61,8 @@ import org.apache.hadoop.yarn.state.Stat
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.BuilderUtils.ContainerIdComparator;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* This class is used to keep track of all the applications/containers
* running on a node.
@@ -78,6 +81,9 @@ public class RMNodeImpl implements RMNod
private final ReadLock readLock;
private final WriteLock writeLock;
+ private final ConcurrentLinkedQueue<UpdatedContainerInfo> nodeUpdateQueue;
+ private volatile boolean nextHeartBeat = true;
+
private final NodeId nodeId;
private final RMContext context;
private final String hostName;
@@ -186,6 +192,7 @@ public class RMNodeImpl implements RMNod
this.stateMachine = stateMachineFactory.make(this);
+ this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();
}
@Override
@@ -400,6 +407,7 @@ public class RMNodeImpl implements RMNod
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Kill containers since node is rejoining.
+ rmNode.nodeUpdateQueue.clear();
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
@@ -458,6 +466,7 @@ public class RMNodeImpl implements RMNod
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
+ rmNode.nodeUpdateQueue.clear();
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
rmNode.context.getDispatcher().getEventHandler().handle(
@@ -489,6 +498,7 @@ public class RMNodeImpl implements RMNod
statusEvent.getNodeHealthStatus();
rmNode.setNodeHealthStatus(remoteNodeHealthStatus);
if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
+ rmNode.nodeUpdateQueue.clear();
// Inform the scheduler
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
@@ -538,10 +548,16 @@ public class RMNodeImpl implements RMNod
completedContainers.add(remoteContainer);
}
}
-
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodeUpdateSchedulerEvent(rmNode, newlyLaunchedContainers,
- completedContainers));
+ if(newlyLaunchedContainers.size() != 0
+ || completedContainers.size() != 0) {
+ rmNode.nodeUpdateQueue.add(new UpdatedContainerInfo
+ (newlyLaunchedContainers, completedContainers));
+ }
+ if(rmNode.nextHeartBeat) {
+ rmNode.nextHeartBeat = false;
+ rmNode.context.getDispatcher().getEventHandler().handle(
+ new NodeUpdateSchedulerEvent(rmNode));
+ }
rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
statusEvent.getKeepAliveAppIds());
@@ -584,4 +600,25 @@ public class RMNodeImpl implements RMNod
return NodeState.UNHEALTHY;
}
}
+
+ @Override
+ public List<UpdatedContainerInfo> pullContainerUpdates() {
+ List<UpdatedContainerInfo> latestContainerInfoList =
+ new ArrayList<UpdatedContainerInfo>();
+ while(nodeUpdateQueue.peek() != null){
+ latestContainerInfoList.add(nodeUpdateQueue.poll());
+ }
+ this.nextHeartBeat = true;
+ return latestContainerInfoList;
+ }
+
+ @VisibleForTesting
+ public void setNextHeartBeat(boolean nextHeartBeat) {
+ this.nextHeartBeat = nextHeartBeat;
+ }
+
+ @VisibleForTesting
+ public int getQueueSize() {
+ return nodeUpdateQueue.size();
+ }
}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/UpdatedContainerInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/UpdatedContainerInfo.java?rev=1450007&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/UpdatedContainerInfo.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/UpdatedContainerInfo.java Tue Feb 26 03:32:10 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+public class UpdatedContainerInfo {
+ private List<ContainerStatus> newlyLaunchedContainers;
+ private List<ContainerStatus> completedContainers;
+
+ public UpdatedContainerInfo() {
+ }
+
+ public UpdatedContainerInfo(List<ContainerStatus> newlyLaunchedContainers
+ , List<ContainerStatus> completedContainers) {
+ this.newlyLaunchedContainers = newlyLaunchedContainers;
+ this.completedContainers = completedContainers;
+ }
+
+ public List<ContainerStatus> getNewlyLaunchedContainers() {
+ return this.newlyLaunchedContainers;
+ }
+
+ public List<ContainerStatus> getCompletedContainers() {
+ return this.completedContainers;
+ }
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1450007&r1=1450006&r2=1450007&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Tue Feb 26 03:32:10 2013
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -562,15 +563,20 @@ implements ResourceScheduler, CapacitySc
return root.getQueueUserAclInfo(user);
}
- private synchronized void nodeUpdate(RMNode nm,
- List<ContainerStatus> newlyLaunchedContainers,
- List<ContainerStatus> completedContainers) {
+ private synchronized void nodeUpdate(RMNode nm) {
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
}
-
- FiCaSchedulerNode node = getNode(nm.getNodeID());
+ FiCaSchedulerNode node = getNode(nm.getNodeID());
+ List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
+ List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
+ List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
+ for(UpdatedContainerInfo containerInfo : containerInfoList) {
+ newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
+ completedContainers.addAll(containerInfo.getCompletedContainers());
+ }
+
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
@@ -666,9 +672,7 @@ implements ResourceScheduler, CapacitySc
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
- nodeUpdate(nodeUpdatedEvent.getRMNode(),
- nodeUpdatedEvent.getNewlyLaunchedContainers(),
- nodeUpdatedEvent.getCompletedContainers());
+ nodeUpdate(nodeUpdatedEvent.getRMNode());
}
break;
case APP_ADDED:
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java?rev=1450007&r1=1450006&r2=1450007&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java Tue Feb 26 03:32:10 2013
@@ -18,35 +18,18 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
-import java.util.List;
-
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class NodeUpdateSchedulerEvent extends SchedulerEvent {
private final RMNode rmNode;
- private final List<ContainerStatus> newlyLaunchedContainers;
- private final List<ContainerStatus> completedContainersStatuses;
- public NodeUpdateSchedulerEvent(RMNode rmNode,
- List<ContainerStatus> newlyLaunchedContainers,
- List<ContainerStatus> completedContainers) {
+ public NodeUpdateSchedulerEvent(RMNode rmNode) {
super(SchedulerEventType.NODE_UPDATE);
this.rmNode = rmNode;
- this.newlyLaunchedContainers = newlyLaunchedContainers;
- this.completedContainersStatuses = completedContainers;
}
public RMNode getRMNode() {
return rmNode;
}
-
- public List<ContainerStatus> getNewlyLaunchedContainers() {
- return newlyLaunchedContainers;
- }
-
- public List<ContainerStatus> getCompletedContainers() {
- return completedContainersStatuses;
- }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1450007&r1=1450006&r2=1450007&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Tue Feb 26 03:32:10 2013
@@ -33,7 +33,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock;
@@ -61,6 +60,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -750,15 +750,20 @@ public class FairScheduler implements Re
/**
* Process a heartbeat update from a node.
*/
- private synchronized void nodeUpdate(RMNode nm,
- List<ContainerStatus> newlyLaunchedContainers,
- List<ContainerStatus> completedContainers) {
+ private synchronized void nodeUpdate(RMNode nm) {
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
}
eventLog.log("HEARTBEAT", nm.getHostName());
FSSchedulerNode node = nodes.get(nm.getNodeID());
+ List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
+ List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
+ List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
+ for(UpdatedContainerInfo containerInfo : containerInfoList) {
+ newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
+ completedContainers.addAll(containerInfo.getCompletedContainers());
+ }
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
@@ -864,9 +869,7 @@ public class FairScheduler implements Re
throw new RuntimeException("Unexpected event type: " + event);
}
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
- nodeUpdate(nodeUpdatedEvent.getRMNode(),
- nodeUpdatedEvent.getNewlyLaunchedContainers(),
- nodeUpdatedEvent.getCompletedContainers());
+ nodeUpdate(nodeUpdatedEvent.getRMNode());
break;
case APP_ADDED:
if (!(event instanceof AppAddedSchedulerEvent)) {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1450007&r1=1450006&r2=1450007&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Tue Feb 26 03:32:10 2013
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -576,11 +577,16 @@ public class FifoScheduler implements Re
return assignedContainers;
}
- private synchronized void nodeUpdate(RMNode rmNode,
- List<ContainerStatus> newlyLaunchedContainers,
- List<ContainerStatus> completedContainers) {
+ private synchronized void nodeUpdate(RMNode rmNode) {
FiCaSchedulerNode node = getNode(rmNode.getNodeID());
+ List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
+ List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
+ List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
+ for(UpdatedContainerInfo containerInfo : containerInfoList) {
+ newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
+ completedContainers.addAll(containerInfo.getCompletedContainers());
+ }
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
@@ -628,9 +634,7 @@ public class FifoScheduler implements Re
{
NodeUpdateSchedulerEvent nodeUpdatedEvent =
(NodeUpdateSchedulerEvent)event;
- nodeUpdate(nodeUpdatedEvent.getRMNode(),
- nodeUpdatedEvent.getNewlyLaunchedContainers(),
- nodeUpdatedEvent.getCompletedContainers());
+ nodeUpdate(nodeUpdatedEvent.getRMNode());
}
break;
case APP_ADDED:
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1450007&r1=1450006&r2=1450007&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java Tue Feb 26 03:32:10 2013
@@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -31,6 +33,7 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import com.google.common.collect.Lists;
@@ -187,6 +190,11 @@ public class MockNodes {
public HeartbeatResponse getLastHeartBeatResponse() {
return null;
}
+
+ @Override
+ public List<UpdatedContainerInfo> pullContainerUpdates() {
+ return new ArrayList<UpdatedContainerInfo>();
+ }
};
private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr) {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1450007&r1=1450006&r2=1450007&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Tue Feb 26 03:32:10 2013
@@ -201,7 +201,7 @@ public class TestFifoScheduler {
testMinimumAllocation(conf, allocMB / 2);
}
- @Test
+ @Test (timeout = 5000)
public void testReconnectedNode() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
conf.setQueues("default", new String[] {"default"});
@@ -215,19 +215,19 @@ public class TestFifoScheduler {
fs.handle(new NodeAddedSchedulerEvent(n1));
fs.handle(new NodeAddedSchedulerEvent(n2));
List<ContainerStatus> emptyList = new ArrayList<ContainerStatus>();
- fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList));
+ fs.handle(new NodeUpdateSchedulerEvent(n1));
Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB());
// reconnect n1 with downgraded memory
n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
fs.handle(new NodeRemovedSchedulerEvent(n1));
fs.handle(new NodeAddedSchedulerEvent(n1));
- fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList));
+ fs.handle(new NodeUpdateSchedulerEvent(n1));
Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
}
- @Test
+ @Test (timeout = 5000)
public void testHeadroom() throws Exception {
Configuration conf = new Configuration();
@@ -275,7 +275,7 @@ public class TestFifoScheduler {
fs.allocate(appAttemptId2, ask2, emptyId);
// Trigger container assignment
- fs.handle(new NodeUpdateSchedulerEvent(n1, emptyStatus, emptyStatus));
+ fs.handle(new NodeUpdateSchedulerEvent(n1));
// Get the allocation for the applications and verify headroom
Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1450007&r1=1450006&r2=1450007&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Tue Feb 26 03:32:10 2013
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.doAnsw
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
import java.util.ArrayList;
import java.util.Collections;
@@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -63,7 +65,7 @@ public class TestRMNodeTransitions {
private YarnScheduler scheduler;
private SchedulerEventType eventType;
- private List<ContainerStatus> completedContainers;
+ private List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
private final class TestSchedulerEventDispatcher implements
EventHandler<SchedulerEvent> {
@@ -89,10 +91,11 @@ public class TestRMNodeTransitions {
final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
eventType = event.getType();
if (eventType == SchedulerEventType.NODE_UPDATE) {
- completedContainers =
- ((NodeUpdateSchedulerEvent)event).getCompletedContainers();
- } else {
- completedContainers = null;
+ List<UpdatedContainerInfo> lastestContainersInfoList =
+ ((NodeUpdateSchedulerEvent)event).getRMNode().pullContainerUpdates();
+ for(UpdatedContainerInfo lastestContainersInfo : lastestContainersInfoList) {
+ completedContainers.addAll(lastestContainersInfo.getCompletedContainers());
+ }
}
return null;
}
@@ -125,16 +128,16 @@ public class TestRMNodeTransitions {
return event;
}
- @Test
+ @Test (timeout = 5000)
public void testExpiredContainer() {
// Start the node
node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED));
verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
// Expire a container
- ContainerId completedContainerId = BuilderUtils.newContainerId(
- BuilderUtils.newApplicationAttemptId(
- BuilderUtils.newApplicationId(0, 0), 0), 0);
+ ContainerId completedContainerId = BuilderUtils.newContainerId(
+ BuilderUtils.newApplicationAttemptId(
+ BuilderUtils.newApplicationId(0, 0), 0), 0);
node.handle(new RMNodeCleanContainerEvent(null, completedContainerId));
Assert.assertEquals(1, node.getContainersToCleanUp().size());
@@ -146,9 +149,110 @@ public class TestRMNodeTransitions {
doReturn(Collections.singletonList(containerStatus)).
when(statusEvent).getContainers();
node.handle(statusEvent);
- Assert.assertEquals(0, completedContainers.size());
+ /* Expect the scheduler call handle function 2 times
+ * 1. RMNode status from new to Running, handle the add_node event
+ * 2. handle the node update event
+ */
+ verify(scheduler,times(2)).handle(any(NodeUpdateSchedulerEvent.class));
}
-
+
+ @Test (timeout = 5000)
+ public void testContainerUpdate() throws InterruptedException{
+ //Start the node
+ node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+
+ NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
+ RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null);
+ node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+
+ ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
+ BuilderUtils.newApplicationAttemptId(
+ BuilderUtils.newApplicationId(0, 0), 0), 0);
+ ContainerId completedContainerIdFromNode2_1 = BuilderUtils.newContainerId(
+ BuilderUtils.newApplicationAttemptId(
+ BuilderUtils.newApplicationId(1, 1), 1), 1);
+ ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId(
+ BuilderUtils.newApplicationAttemptId(
+ BuilderUtils.newApplicationId(1, 1), 1), 2);
+
+ RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent();
+ RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent();
+ RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent();
+
+ ContainerStatus containerStatusFromNode1 = mock(ContainerStatus.class);
+ ContainerStatus containerStatusFromNode2_1 = mock(ContainerStatus.class);
+ ContainerStatus containerStatusFromNode2_2 = mock(ContainerStatus.class);
+
+ doReturn(completedContainerIdFromNode1).when(containerStatusFromNode1)
+ .getContainerId();
+ doReturn(Collections.singletonList(containerStatusFromNode1))
+ .when(statusEventFromNode1).getContainers();
+ node.handle(statusEventFromNode1);
+ Assert.assertEquals(1, completedContainers.size());
+ Assert.assertEquals(completedContainerIdFromNode1,
+ completedContainers.get(0).getContainerId());
+
+ completedContainers.clear();
+
+ doReturn(completedContainerIdFromNode2_1).when(containerStatusFromNode2_1)
+ .getContainerId();
+ doReturn(Collections.singletonList(containerStatusFromNode2_1))
+ .when(statusEventFromNode2_1).getContainers();
+
+ doReturn(completedContainerIdFromNode2_2).when(containerStatusFromNode2_2)
+ .getContainerId();
+ doReturn(Collections.singletonList(containerStatusFromNode2_2))
+ .when(statusEventFromNode2_2).getContainers();
+
+ node2.setNextHeartBeat(false);
+ node2.handle(statusEventFromNode2_1);
+ node2.setNextHeartBeat(true);
+ node2.handle(statusEventFromNode2_2);
+
+ Assert.assertEquals(2, completedContainers.size());
+ Assert.assertEquals(completedContainerIdFromNode2_1,completedContainers.get(0)
+ .getContainerId());
+ Assert.assertEquals(completedContainerIdFromNode2_2,completedContainers.get(1)
+ .getContainerId());
+ }
+
+ @Test (timeout = 5000)
+ public void testStatusChange(){
+ //Start the node
+ node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+ //Add info to the queue first
+ node.setNextHeartBeat(false);
+
+ ContainerId completedContainerId1 = BuilderUtils.newContainerId(
+ BuilderUtils.newApplicationAttemptId(
+ BuilderUtils.newApplicationId(0, 0), 0), 0);
+ ContainerId completedContainerId2 = BuilderUtils.newContainerId(
+ BuilderUtils.newApplicationAttemptId(
+ BuilderUtils.newApplicationId(1, 1), 1), 1);
+
+ RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent();
+ RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent();
+
+ ContainerStatus containerStatus1 = mock(ContainerStatus.class);
+ ContainerStatus containerStatus2 = mock(ContainerStatus.class);
+
+ doReturn(completedContainerId1).when(containerStatus1).getContainerId();
+ doReturn(Collections.singletonList(containerStatus1))
+ .when(statusEvent1).getContainers();
+
+ doReturn(completedContainerId2).when(containerStatus2).getContainerId();
+ doReturn(Collections.singletonList(containerStatus2))
+ .when(statusEvent2).getContainers();
+
+ verify(scheduler,times(1)).handle(any(NodeUpdateSchedulerEvent.class));
+ node.handle(statusEvent1);
+ node.handle(statusEvent2);
+ verify(scheduler,times(1)).handle(any(NodeUpdateSchedulerEvent.class));
+ Assert.assertEquals(2, node.getQueueSize());
+ node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE));
+ Assert.assertEquals(0, node.getQueueSize());
+ }
+
@Test
public void testRunningExpire() {
RMNodeImpl node = getRunningNode();
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1450007&r1=1450006&r2=1450007&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Tue Feb 26 03:32:10 2013
@@ -30,7 +30,6 @@ import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -276,7 +275,7 @@ public class TestFairScheduler {
Assert.assertEquals(3, queueManager.getLeafQueues().size());
}
- @Test
+ @Test (timeout = 5000)
public void testSimpleContainerAllocation() {
// Add a node
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
@@ -292,8 +291,7 @@ public class TestFairScheduler {
scheduler.update();
- NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
- new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
// Asked for less than min_allocation.
@@ -301,15 +299,14 @@ public class TestFairScheduler {
scheduler.getQueueManager().getQueue("queue1").
getResourceUsage().getMemory());
- NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2,
- new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+ NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
scheduler.handle(updateEvent2);
assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
getResourceUsage().getMemory());
}
- @Test
+ @Test (timeout = 5000)
public void testSimpleContainerReservation() throws InterruptedException {
// Add a node
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
@@ -319,8 +316,7 @@ public class TestFairScheduler {
// Queue 1 requests full capacity of node
createSchedulingRequest(1024, "queue1", "user1", 1);
scheduler.update();
- NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
- new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
// Make sure queue 1 is allocated app capacity
@@ -340,8 +336,7 @@ public class TestFairScheduler {
// Now another node checks in with capacity
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
- NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2,
- new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+ NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
scheduler.handle(updateEvent2);
@@ -738,7 +733,7 @@ public class TestFairScheduler {
assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
}
- @Test
+ @Test (timeout = 5000)
public void testIsStarvedForMinShare() throws Exception {
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
@@ -767,8 +762,7 @@ public class TestFairScheduler {
// Queue A wants 3 * 1024. Node update gives this all to A
createSchedulingRequest(3 * 1024, "queueA", "user1");
scheduler.update();
- NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1,
- new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+ NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeEvent2);
// Queue B arrives and wants 1 * 1024
@@ -797,7 +791,7 @@ public class TestFairScheduler {
}
}
- @Test
+ @Test (timeout = 5000)
public void testIsStarvedForFairShare() throws Exception {
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
@@ -826,8 +820,7 @@ public class TestFairScheduler {
// Queue A wants 3 * 1024. Node update gives this all to A
createSchedulingRequest(3 * 1024, "queueA", "user1");
scheduler.update();
- NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1,
- new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+ NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeEvent2);
// Queue B arrives and wants 1 * 1024
@@ -857,7 +850,7 @@ public class TestFairScheduler {
}
}
- @Test
+ @Test (timeout = 5000)
/**
* Make sure containers are chosen to be preempted in the correct order. Right
* now this means decreasing order of priority.
@@ -921,16 +914,13 @@ public class TestFairScheduler {
// Sufficient node check-ins to fully schedule containers
for (int i = 0; i < 2; i++) {
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1,
- new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeUpdate1);
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2,
- new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
scheduler.handle(nodeUpdate2);
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3,
- new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+ NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
scheduler.handle(nodeUpdate3);
}
@@ -991,7 +981,7 @@ public class TestFairScheduler {
assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
}
- @Test
+ @Test (timeout = 5000)
/**
* Tests the timing of decision to preempt tasks.
*/
@@ -1062,16 +1052,13 @@ public class TestFairScheduler {
// Sufficient node check-ins to fully schedule containers
for (int i = 0; i < 2; i++) {
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1,
- new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeUpdate1);
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2,
- new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
scheduler.handle(nodeUpdate2);
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3,
- new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+ NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
scheduler.handle(nodeUpdate3);
}
@@ -1119,7 +1106,7 @@ public class TestFairScheduler {
Resources.createResource(1536), scheduler.resToPreempt(schedD, clock.getTime())));
}
- @Test
+ @Test (timeout = 5000)
public void testMultipleContainersWaitingForReservation() {
// Add a node
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
@@ -1129,8 +1116,7 @@ public class TestFairScheduler {
// Request full capacity of node
createSchedulingRequest(1024, "queue1", "user1", 1);
scheduler.update();
- NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
- new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue2", "user2", 1);
@@ -1146,7 +1132,7 @@ public class TestFairScheduler {
scheduler.applications.get(attId2).getCurrentReservation().getMemory());
}
- @Test
+ @Test (timeout = 5000)
public void testUserMaxRunningApps() throws Exception {
// Set max running apps
Configuration conf = createConfiguration();
@@ -1175,8 +1161,7 @@ public class TestFairScheduler {
"user1", 1);
scheduler.update();
- NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
- new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
// App 1 should be running
@@ -1201,7 +1186,7 @@ public class TestFairScheduler {
assertEquals(2, scheduler.applications.get(attId1).getLiveContainers().size());
}
- @Test
+ @Test (timeout = 5000)
public void testReservationWhileMultiplePriorities() {
// Add a node
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
@@ -1211,8 +1196,7 @@ public class TestFairScheduler {
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
"user1", 1, 2);
scheduler.update();
- NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
- new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
FSSchedulerApp app = scheduler.applications.get(attId);
@@ -1285,7 +1269,7 @@ public class TestFairScheduler {
assertNull("The application was allowed", app2);
}
- @Test
+ @Test (timeout = 5000)
public void testMultipleNodesSingleRackRequest() throws Exception {
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
@@ -1312,22 +1296,20 @@ public class TestFairScheduler {
// node 1 checks in
scheduler.update();
- NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1,
- new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+ NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent1);
// should assign node local
assertEquals(1, scheduler.applications.get(appId).getLiveContainers().size());
// node 2 checks in
scheduler.update();
- NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2,
- new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+ NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
scheduler.handle(updateEvent2);
// should assign rack local
assertEquals(2, scheduler.applications.get(appId).getLiveContainers().size());
}
- @Test
+ @Test (timeout = 5000)
public void testFifoWithinQueue() throws Exception {
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(3072));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
@@ -1351,8 +1333,7 @@ public class TestFairScheduler {
// Because tests set assignmultiple to false, each heartbeat assigns a single
// container.
- NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
- new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
assertEquals(1, app1.getLiveContainers().size());