You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by bi...@apache.org on 2013/06/01 10:23:30 UTC
svn commit: r1488485 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/
hadoop-yarn/hadoop-yarn-client/src...
Author: bikas
Date: Sat Jun 1 08:23:30 2013
New Revision: 1488485
URL: http://svn.apache.org/r1488485
Log:
YARN-660. Improve AMRMClient with matching requests (bikas)
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1488485&r1=1488484&r2=1488485&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Sat Jun 1 08:23:30 2013
@@ -239,6 +239,8 @@ Release 2.0.5-beta - UNRELEASED
YARN-638. Modified ResourceManager to restore RMDelegationTokens after
restarting. (Jian He via vinodkv)
+ YARN-660. Improve AMRMClient with matching requests (bikas)
+
OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1488485&r1=1488484&r2=1488485&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Sat Jun 1 08:23:30 2013
@@ -151,7 +151,7 @@ public class ApplicationMaster {
private YarnRPC rpc;
// Handle to communicate with the Resource Manager
- private AMRMClientAsync resourceManager;
+ private AMRMClientAsync<ContainerRequest> resourceManager;
// Application Attempt Id ( combination of attemptId and fail count )
private ApplicationAttemptId appAttemptID;
@@ -442,7 +442,9 @@ public class ApplicationMaster {
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
- resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
+ resourceManager = new AMRMClientAsync<ContainerRequest>(appAttemptID,
+ 1000,
+ allocListener);
resourceManager.init(conf);
resourceManager.start();
@@ -522,7 +524,8 @@ public class ApplicationMaster {
FinalApplicationStatus appStatus;
String appMessage = null;
success = true;
- if (numFailedContainers.get() == 0) {
+ if (numFailedContainers.get() == 0 &&
+ numCompletedContainers.get() == numTotalContainers) {
appStatus = FinalApplicationStatus.SUCCEEDED;
} else {
appStatus = FinalApplicationStatus.FAILED;
@@ -594,11 +597,6 @@ public class ApplicationMaster {
resourceManager.addContainerRequest(containerAsk);
}
- // set progress to deliver to RM on next heartbeat
- float progress = (float) numCompletedContainers.get()
- / numTotalContainers;
- resourceManager.setProgress(progress);
-
if (numCompletedContainers.get() == numTotalContainers) {
done = true;
}
@@ -637,6 +635,19 @@ public class ApplicationMaster {
@Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {}
+
+ @Override
+ public float getProgress() {
+ // set progress to deliver to RM on next heartbeat
+ float progress = (float) numCompletedContainers.get()
+ / numTotalContainers;
+ return progress;
+ }
+
+ @Override
+ public void onError(Exception e) {
+ done = true;
+ }
}
/**
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java?rev=1488485&r1=1488484&r2=1488485&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java Sat Jun 1 08:23:30 2013
@@ -18,8 +18,9 @@
package org.apache.hadoop.yarn.client;
-
import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -32,9 +33,11 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.service.Service;
+import com.google.common.collect.ImmutableList;
+
@InterfaceAudience.Public
@InterfaceStability.Unstable
-public interface AMRMClient extends Service {
+public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Service {
/**
* Object to represent container request for resources.
@@ -43,20 +46,41 @@ public interface AMRMClient extends Serv
* Can ask for multiple containers of a given type.
*/
public static class ContainerRequest {
- Resource capability;
- String[] hosts;
- String[] racks;
- Priority priority;
- int containerCount;
+ final Resource capability;
+ final ImmutableList<String> hosts;
+ final ImmutableList<String> racks;
+ final Priority priority;
+ final int containerCount;
public ContainerRequest(Resource capability, String[] hosts,
String[] racks, Priority priority, int containerCount) {
this.capability = capability;
- this.hosts = (hosts != null ? hosts.clone() : null);
- this.racks = (racks != null ? racks.clone() : null);
+ this.hosts = (hosts != null ? ImmutableList.copyOf(hosts) : null);
+ this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
this.priority = priority;
this.containerCount = containerCount;
}
+
+ public Resource getCapability() {
+ return capability;
+ }
+
+ public ImmutableList<String> getHosts() {
+ return hosts;
+ }
+
+ public ImmutableList<String> getRacks() {
+ return racks;
+ }
+
+ public Priority getPriority() {
+ return priority;
+ }
+
+ public int getContainerCount() {
+ return containerCount;
+ }
+
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Capability[").append(capability).append("]");
@@ -65,6 +89,22 @@ public interface AMRMClient extends Serv
return sb.toString();
}
}
+
+ /**
+ * This creates a <code>ContainerRequest</code> for 1 container and the
+ * AMRMClient stores this request internally. <code>getMatchingRequests</code>
+ * can be used to retrieve these requests from AMRMClient. These requests may
+ * be matched with an allocated container to determine which request to assign
+ * the container to. <code>removeContainerRequest</code> must be called using
+ * the same assigned <code>StoredContainerRequest</code> object so that
+ * AMRMClient can remove it from its internal store.
+ */
+ public static class StoredContainerRequest extends ContainerRequest {
+ public StoredContainerRequest(Resource capability, String[] hosts,
+ String[] racks, Priority priority) {
+ super(capability, hosts, racks, priority, 1);
+ }
+ }
/**
* Register the application master. This must be called before any
@@ -117,7 +157,7 @@ public interface AMRMClient extends Serv
* Request containers for resources before calling <code>allocate</code>
* @param req Resource request
*/
- public void addContainerRequest(ContainerRequest req);
+ public void addContainerRequest(T req);
/**
* Remove previous container request. The previous container request may have
@@ -126,7 +166,7 @@ public interface AMRMClient extends Serv
* even after the remove request
* @param req Resource request
*/
- public void removeContainerRequest(ContainerRequest req);
+ public void removeContainerRequest(T req);
/**
* Release containers assigned by the Resource Manager. If the app cannot use
@@ -150,4 +190,21 @@ public interface AMRMClient extends Serv
* @return Current number of nodes in the cluster
*/
public int getClusterNodeCount();
+
+ /**
+ * Get outstanding <code>StoredContainerRequest</code>s matching the given
+ * parameters. These StoredContainerRequests should have been added via
+ * <code>addContainerRequest</code> earlier in the lifecycle. For performance,
+ * the AMRMClient may return its internal collection directly without creating
+ * a copy. Users should not perform mutable operations on the return value.
+ * Each collection in the list contains requests with identical
+ * <code>Resource</code> size that fit in the given capability. In a
+ * collection, requests will be returned in the same order as they were added.
+ * @return Collection of request matching the parameters
+ */
+ public List<? extends Collection<T>> getMatchingRequests(
+ Priority priority,
+ String resourceName,
+ Resource capability);
+
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java?rev=1488485&r1=1488484&r2=1488485&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java Sat Jun 1 08:23:30 2013
@@ -19,9 +19,11 @@
package org.apache.hadoop.yarn.client;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,7 +40,9 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.service.AbstractService;
@@ -88,55 +92,50 @@ import com.google.common.annotations.Vis
*/
@Unstable
@Evolving
-public class AMRMClientAsync extends AbstractService {
+public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService {
private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
- private final AMRMClient client;
- private final int intervalMs;
+ private final AMRMClient<T> client;
+ private final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
private final HeartbeatThread heartbeatThread;
private final CallbackHandlerThread handlerThread;
private final CallbackHandler handler;
private final BlockingQueue<AllocateResponse> responseQueue;
+ private final Object unregisterHeartbeatLock = new Object();
+
private volatile boolean keepRunning;
private volatile float progress;
+ private volatile Exception savedException;
+
public AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
CallbackHandler callbackHandler) {
- this(new AMRMClientImpl(id), intervalMs, callbackHandler);
+ this(new AMRMClientImpl<T>(id), intervalMs, callbackHandler);
}
@Private
@VisibleForTesting
- AMRMClientAsync(AMRMClient client, int intervalMs,
+ public AMRMClientAsync(AMRMClient<T> client, int intervalMs,
CallbackHandler callbackHandler) {
super(AMRMClientAsync.class.getName());
this.client = client;
- this.intervalMs = intervalMs;
+ this.heartbeatIntervalMs.set(intervalMs);
handler = callbackHandler;
heartbeatThread = new HeartbeatThread();
handlerThread = new CallbackHandlerThread();
responseQueue = new LinkedBlockingQueue<AllocateResponse>();
keepRunning = true;
+ savedException = null;
}
-
- /**
- * Sets the application's current progress. It will be transmitted to the
- * resource manager on the next heartbeat.
- * @param progress
- * the application's progress so far
- */
- public void setProgress(float progress) {
- this.progress = progress;
- }
-
+
@Override
public void init(Configuration conf) {
super.init(conf);
client.init(conf);
- }
+ }
@Override
public void start() {
@@ -171,6 +170,17 @@ public class AMRMClientAsync extends Abs
super.stop();
}
+ public void setHeartbeatInterval(int interval) {
+ heartbeatIntervalMs.set(interval);
+ }
+
+ public List<? extends Collection<T>> getMatchingRequests(
+ Priority priority,
+ String resourceName,
+ Resource capability) {
+ return client.getMatchingRequests(priority, resourceName, capability);
+ }
+
/**
* Registers this application master with the resource manager. On successful
* registration, starts the heartbeating thread.
@@ -180,8 +190,8 @@ public class AMRMClientAsync extends Abs
public RegisterApplicationMasterResponse registerApplicationMaster(
String appHostName, int appHostPort, String appTrackingUrl)
throws YarnRemoteException, IOException {
- RegisterApplicationMasterResponse response =
- client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
+ RegisterApplicationMasterResponse response = client
+ .registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
heartbeatThread.start();
return response;
}
@@ -195,8 +205,9 @@ public class AMRMClientAsync extends Abs
* @throws IOException
*/
public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
- String appMessage, String appTrackingUrl) throws YarnRemoteException, IOException {
- synchronized (client) {
+ String appMessage, String appTrackingUrl) throws YarnRemoteException,
+ IOException {
+ synchronized (unregisterHeartbeatLock) {
keepRunning = false;
client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
}
@@ -206,7 +217,7 @@ public class AMRMClientAsync extends Abs
* Request containers for resources before calling <code>allocate</code>
* @param req Resource request
*/
- public void addContainerRequest(AMRMClient.ContainerRequest req) {
+ public void addContainerRequest(T req) {
client.addContainerRequest(req);
}
@@ -217,7 +228,7 @@ public class AMRMClientAsync extends Abs
* even after the remove request
* @param req Resource request
*/
- public void removeContainerRequest(AMRMClient.ContainerRequest req) {
+ public void removeContainerRequest(T req) {
client.removeContainerRequest(req);
}
@@ -259,7 +270,7 @@ public class AMRMClientAsync extends Abs
while (true) {
AllocateResponse response = null;
// synchronization ensures we don't send heartbeats after unregistering
- synchronized (client) {
+ synchronized (unregisterHeartbeatLock) {
if (!keepRunning) {
break;
}
@@ -267,9 +278,17 @@ public class AMRMClientAsync extends Abs
try {
response = client.allocate(progress);
} catch (YarnRemoteException ex) {
- LOG.error("Failed to heartbeat", ex);
+ LOG.error("Yarn exception on heartbeat", ex);
+ savedException = ex;
+ // interrupt handler thread in case it waiting on the queue
+ handlerThread.interrupt();
+ break;
} catch (IOException e) {
- LOG.error("Failed to heartbeat", e);
+ LOG.error("IO exception on heartbeat", e);
+ savedException = e;
+ // interrupt handler thread in case it waiting on the queue
+ handlerThread.interrupt();
+ break;
}
}
if (response != null) {
@@ -278,15 +297,15 @@ public class AMRMClientAsync extends Abs
responseQueue.put(response);
break;
} catch (InterruptedException ex) {
- LOG.warn("Interrupted while waiting to put on response queue", ex);
+ LOG.info("Interrupted while waiting to put on response queue", ex);
}
}
}
try {
- Thread.sleep(intervalMs);
+ Thread.sleep(heartbeatIntervalMs.get());
} catch (InterruptedException ex) {
- LOG.warn("Heartbeater interrupted", ex);
+ LOG.info("Heartbeater interrupted", ex);
}
}
}
@@ -301,14 +320,21 @@ public class AMRMClientAsync extends Abs
while (keepRunning) {
AllocateResponse response;
try {
+ if(savedException != null) {
+ LOG.error("Stopping callback due to: ", savedException);
+ handler.onError(savedException);
+ break;
+ }
response = responseQueue.take();
} catch (InterruptedException ex) {
- LOG.info("Interrupted while waiting for queue");
+ LOG.info("Interrupted while waiting for queue", ex);
continue;
}
if (response.getReboot()) {
handler.onRebootRequest();
+ LOG.info("Reboot requested. Stopping callback.");
+ break;
}
List<NodeReport> updatedNodes = response.getUpdatedNodes();
if (!updatedNodes.isEmpty()) {
@@ -325,6 +351,8 @@ public class AMRMClientAsync extends Abs
if (!allocated.isEmpty()) {
handler.onContainersAllocated(allocated);
}
+
+ progress = handler.getProgress();
}
}
}
@@ -347,14 +375,19 @@ public class AMRMClientAsync extends Abs
/**
* Called when the ResourceManager wants the ApplicationMaster to reboot
- * for being out of sync.
+ * for being out of sync. The ApplicationMaster should not unregister with
+ * the RM unless the ApplicationMaster wants to be the last attempt.
*/
public void onRebootRequest();
/**
- * Called when nodes tracked by the ResourceManager have changed in in health,
+ * Called when nodes tracked by the ResourceManager have changed in health,
* availability etc.
*/
public void onNodesUpdated(List<NodeReport> updatedNodes);
+
+ public float getProgress();
+
+ public void onError(Exception e);
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java?rev=1488485&r1=1488484&r2=1488485&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java Sat Jun 1 08:23:30 2013
@@ -22,9 +22,15 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -47,6 +53,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -55,8 +62,11 @@ import org.apache.hadoop.yarn.ipc.YarnRP
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.BuilderUtils;
+// TODO check inputs for null etc. YARN-654
+
@Unstable
-public class AMRMClientImpl extends AbstractService implements AMRMClient {
+public class AMRMClientImpl<T extends ContainerRequest>
+ extends AbstractService implements AMRMClient<T> {
private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
@@ -70,6 +80,57 @@ public class AMRMClientImpl extends Abst
protected Resource clusterAvailableResources;
protected int clusterNodeCount;
+ class ResourceRequestInfo {
+ ResourceRequest remoteRequest;
+ LinkedHashSet<T> containerRequests;
+
+ ResourceRequestInfo(Priority priority, String resourceName,
+ Resource capability) {
+ remoteRequest = BuilderUtils.newResourceRequest(priority, resourceName,
+ capability, 0);
+ containerRequests = new LinkedHashSet<T>();
+ }
+ }
+
+
+ /**
+ * Class compares Resource by memory then cpu in reverse order
+ */
+ class ResourceReverseMemoryThenCpuComparator implements Comparator<Resource> {
+ @Override
+ public int compare(Resource arg0, Resource arg1) {
+ int mem0 = arg0.getMemory();
+ int mem1 = arg1.getMemory();
+ int cpu0 = arg0.getVirtualCores();
+ int cpu1 = arg1.getVirtualCores();
+ if(mem0 == mem1) {
+ if(cpu0 == cpu1) {
+ return 0;
+ }
+ if(cpu0 < cpu1) {
+ return 1;
+ }
+ return -1;
+ }
+ if(mem0 < mem1) {
+ return 1;
+ }
+ return -1;
+ }
+ }
+
+ static boolean canFit(Resource arg0, Resource arg1) {
+ int mem0 = arg0.getMemory();
+ int mem1 = arg1.getMemory();
+ int cpu0 = arg0.getVirtualCores();
+ int cpu1 = arg1.getVirtualCores();
+
+ if(mem0 <= mem1 && cpu0 <= cpu1) {
+ return true;
+ }
+ return false;
+ }
+
//Key -> Priority
//Value -> Map
//Key->ResourceName (e.g., hostname, rackname, *)
@@ -77,9 +138,9 @@ public class AMRMClientImpl extends Abst
//Key->Resource Capability
//Value->ResourceRequest
protected final
- Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
+ Map<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>
remoteRequestsTable =
- new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
+ new TreeMap<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>();
protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
@@ -223,42 +284,47 @@ public class AMRMClientImpl extends Abst
}
@Override
- public synchronized void addContainerRequest(ContainerRequest req) {
+ public synchronized void addContainerRequest(T req) {
// Create resource requests
- if(req.hosts != null) {
+ // add check for dup locations
+ if (req.hosts != null) {
for (String host : req.hosts) {
- addResourceRequest(req.priority, host, req.capability, req.containerCount);
+ addResourceRequest(req.priority, host, req.capability,
+ req.containerCount, req);
}
}
- if(req.racks != null) {
+ if (req.racks != null) {
for (String rack : req.racks) {
- addResourceRequest(req.priority, rack, req.capability, req.containerCount);
+ addResourceRequest(req.priority, rack, req.capability,
+ req.containerCount, req);
}
}
// Off-switch
addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
- req.containerCount);
+ req.containerCount, req);
}
@Override
- public synchronized void removeContainerRequest(ContainerRequest req) {
+ public synchronized void removeContainerRequest(T req) {
// Update resource requests
- if(req.hosts != null) {
+ if (req.hosts != null) {
for (String hostName : req.hosts) {
- decResourceRequest(req.priority, hostName, req.capability, req.containerCount);
+ decResourceRequest(req.priority, hostName, req.capability,
+ req.containerCount, req);
}
}
-
- if(req.racks != null) {
+
+ if (req.racks != null) {
for (String rack : req.racks) {
- decResourceRequest(req.priority, rack, req.capability, req.containerCount);
+ decResourceRequest(req.priority, rack, req.capability,
+ req.containerCount, req);
}
}
-
+
decResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
- req.containerCount);
+ req.containerCount, req);
}
@Override
@@ -276,6 +342,44 @@ public class AMRMClientImpl extends Abst
return clusterNodeCount;
}
+ @Override
+ public synchronized List<? extends Collection<T>> getMatchingRequests(
+ Priority priority,
+ String resourceName,
+ Resource capability) {
+ List<LinkedHashSet<T>> list = new LinkedList<LinkedHashSet<T>>();
+ Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
+ this.remoteRequestsTable.get(priority);
+ if (remoteRequests == null) {
+ return list;
+ }
+ TreeMap<Resource, ResourceRequestInfo> reqMap = remoteRequests
+ .get(resourceName);
+ if (reqMap == null) {
+ return list;
+ }
+
+ ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
+ if (resourceRequestInfo != null) {
+ list.add(resourceRequestInfo.containerRequests);
+ return list;
+ }
+
+ // no exact match. Container may be larger than what was requested.
+ // get all resources <= capability. map is reverse sorted.
+ SortedMap<Resource, ResourceRequestInfo> tailMap =
+ reqMap.tailMap(capability);
+ for(Map.Entry<Resource, ResourceRequestInfo> entry : tailMap.entrySet()) {
+ if(canFit(entry.getKey(), capability)) {
+ // match found that fits in the larger resource
+ list.add(entry.getValue().containerRequests);
+ }
+ }
+
+ // no match found
+ return list;
+ }
+
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
// This code looks weird but is needed because of the following scenario.
// A ResourceRequest is removed from the remoteRequestTable. A 0 container
@@ -294,44 +398,57 @@ public class AMRMClientImpl extends Abst
}
private void addResourceRequest(Priority priority, String resourceName,
- Resource capability, int containerCount) {
- Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+ Resource capability, int containerCount, T req) {
+ Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
this.remoteRequestsTable.get(priority);
if (remoteRequests == null) {
- remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
+ remoteRequests =
+ new HashMap<String, TreeMap<Resource, ResourceRequestInfo>>();
this.remoteRequestsTable.put(priority, remoteRequests);
if (LOG.isDebugEnabled()) {
LOG.debug("Added priority=" + priority);
}
}
- Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+ TreeMap<Resource, ResourceRequestInfo> reqMap =
+ remoteRequests.get(resourceName);
if (reqMap == null) {
- reqMap = new HashMap<Resource, ResourceRequest>();
+ // capabilities are stored in reverse sorted order. smallest last.
+ reqMap = new TreeMap<Resource, ResourceRequestInfo>(
+ new ResourceReverseMemoryThenCpuComparator());
remoteRequests.put(resourceName, reqMap);
}
- ResourceRequest remoteRequest = reqMap.get(capability);
- if (remoteRequest == null) {
- remoteRequest = BuilderUtils.
- newResourceRequest(priority, resourceName, capability, 0);
- reqMap.put(capability, remoteRequest);
+ ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
+ if (resourceRequestInfo == null) {
+ resourceRequestInfo =
+ new ResourceRequestInfo(priority, resourceName, capability);
+ reqMap.put(capability, resourceRequestInfo);
}
- remoteRequest.setNumContainers(remoteRequest.getNumContainers() + containerCount);
+ resourceRequestInfo.remoteRequest.setNumContainers(
+ resourceRequestInfo.remoteRequest.getNumContainers() + containerCount);
+
+ if(req instanceof StoredContainerRequest) {
+ resourceRequestInfo.containerRequests.add(req);
+ }
// Note this down for next interaction with ResourceManager
- addResourceRequestToAsk(remoteRequest);
+ addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
if (LOG.isDebugEnabled()) {
LOG.debug("addResourceRequest:" + " applicationId="
+ appAttemptId + " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
- + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+ + resourceRequestInfo.remoteRequest.getNumContainers()
+ + " #asks=" + ask.size());
}
}
- private void decResourceRequest(Priority priority, String resourceName,
- Resource capability, int containerCount) {
- Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+ private void decResourceRequest(Priority priority,
+ String resourceName,
+ Resource capability,
+ int containerCount,
+ T req) {
+ Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
this.remoteRequestsTable.get(priority);
if(remoteRequests == null) {
@@ -342,7 +459,7 @@ public class AMRMClientImpl extends Abst
return;
}
- Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+ Map<Resource, ResourceRequestInfo> reqMap = remoteRequests.get(resourceName);
if (reqMap == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not decrementing resource as " + resourceName
@@ -350,28 +467,34 @@ public class AMRMClientImpl extends Abst
}
return;
}
- ResourceRequest remoteRequest = reqMap.get(capability);
+ ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
if (LOG.isDebugEnabled()) {
LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+ appAttemptId + " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
- + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+ + resourceRequestInfo.remoteRequest.getNumContainers()
+ + " #asks=" + ask.size());
}
- remoteRequest.
- setNumContainers(remoteRequest.getNumContainers() - containerCount);
- if(remoteRequest.getNumContainers() < 0) {
+ resourceRequestInfo.remoteRequest.setNumContainers(
+ resourceRequestInfo.remoteRequest.getNumContainers() - containerCount);
+
+ if(req instanceof StoredContainerRequest) {
+ resourceRequestInfo.containerRequests.remove(req);
+ }
+
+ if(resourceRequestInfo.remoteRequest.getNumContainers() < 0) {
// guard against spurious removals
- remoteRequest.setNumContainers(0);
+ resourceRequestInfo.remoteRequest.setNumContainers(0);
}
// send the ResourceRequest to RM even if is 0 because it needs to override
// a previously sent value. If ResourceRequest was not sent previously then
// sending 0 aught to be a no-op on RM
- addResourceRequestToAsk(remoteRequest);
+ addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
// delete entries from map if no longer needed
- if (remoteRequest.getNumContainers() == 0) {
+ if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
reqMap.remove(capability);
if (reqMap.size() == 0) {
remoteRequests.remove(resourceName);
@@ -385,7 +508,8 @@ public class AMRMClientImpl extends Abst
LOG.info("AFTER decResourceRequest:" + " applicationId="
+ appAttemptId + " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
- + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+ + resourceRequestInfo.remoteRequest.getNumContainers()
+ + " #asks=" + ask.size());
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java?rev=1488485&r1=1488484&r2=1488485&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java Sat Jun 1 08:23:30 2013
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
@@ -50,27 +51,38 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.AMRMClient.StoredContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestAMRMClient {
- Configuration conf = null;
- MiniYARNCluster yarnCluster = null;
- YarnClientImpl yarnClient = null;
- List<NodeReport> nodeReports = null;
- ApplicationAttemptId attemptId = null;
- int nodeCount = 3;
+ static Configuration conf = null;
+ static MiniYARNCluster yarnCluster = null;
+ static YarnClientImpl yarnClient = null;
+ static List<NodeReport> nodeReports = null;
+ static ApplicationAttemptId attemptId = null;
+ static int nodeCount = 3;
- @Before
- public void setup() throws YarnRemoteException, IOException {
+ static Resource capability;
+ static Priority priority;
+ static String node;
+ static String rack;
+ static String[] nodes;
+ static String[] racks;
+
+ @BeforeClass
+ public static void setup() throws Exception {
// start minicluster
conf = new YarnConfiguration();
yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
@@ -84,7 +96,17 @@ public class TestAMRMClient {
// get node info
nodeReports = yarnClient.getNodeReports();
-
+
+ priority = BuilderUtils.newPriority(1);
+ capability = BuilderUtils.newResource(1024, 1);
+ node = nodeReports.get(0).getNodeId().getHost();
+ rack = nodeReports.get(0).getRackName();
+ nodes = new String[]{ node };
+ racks = new String[]{ rack };
+ }
+
+ @Before
+ public void startApp() throws Exception {
// submit new app
GetNewApplicationResponse newApp = yarnClient.getNewApplication();
ApplicationId appId = newApp.getApplicationId();
@@ -125,7 +147,12 @@ public class TestAMRMClient {
}
@After
- public void tearDown() {
+ public void cancelApp() {
+ attemptId = null;
+ }
+
+ @AfterClass
+ public static void tearDown() {
if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) {
yarnClient.stop();
}
@@ -133,13 +160,235 @@ public class TestAMRMClient {
yarnCluster.stop();
}
}
+
+ @Test (timeout=60000)
+ public void testAMRMClientMatchingFit() throws YarnRemoteException, IOException {
+ AMRMClientImpl<StoredContainerRequest> amClient = null;
+ try {
+ // start am rm client
+ amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId);
+ amClient.init(conf);
+ amClient.start();
+ amClient.registerApplicationMaster("Host", 10000, "");
+
+ Resource capability1 = BuilderUtils.newResource(1024, 2);
+ Resource capability2 = BuilderUtils.newResource(1024, 1);
+ Resource capability3 = BuilderUtils.newResource(1000, 2);
+ Resource capability4 = BuilderUtils.newResource(2000, 1);
+ Resource capability5 = BuilderUtils.newResource(1000, 3);
+ Resource capability6 = BuilderUtils.newResource(2000, 1);
+
+ StoredContainerRequest storedContainer1 =
+ new StoredContainerRequest(capability1, nodes, racks, priority);
+ StoredContainerRequest storedContainer2 =
+ new StoredContainerRequest(capability2, nodes, racks, priority);
+ StoredContainerRequest storedContainer3 =
+ new StoredContainerRequest(capability3, nodes, racks, priority);
+ StoredContainerRequest storedContainer4 =
+ new StoredContainerRequest(capability4, nodes, racks, priority);
+ StoredContainerRequest storedContainer5 =
+ new StoredContainerRequest(capability5, nodes, racks, priority);
+ StoredContainerRequest storedContainer6 =
+ new StoredContainerRequest(capability6, nodes, racks, priority);
+ amClient.addContainerRequest(storedContainer1);
+ amClient.addContainerRequest(storedContainer2);
+ amClient.addContainerRequest(storedContainer3);
+ amClient.addContainerRequest(storedContainer4);
+ amClient.addContainerRequest(storedContainer5);
+ amClient.addContainerRequest(storedContainer6);
+
+ // test matching of containers
+ List<? extends Collection<StoredContainerRequest>> matches;
+ StoredContainerRequest storedRequest;
+ // exact match
+ Resource testCapability1 = BuilderUtils.newResource(1024, 2);
+ matches = amClient.getMatchingRequests(priority, node, testCapability1);
+ verifyMatches(matches, 1);
+ storedRequest = matches.get(0).iterator().next();
+ assertTrue(storedContainer1 == storedRequest);
+ amClient.removeContainerRequest(storedContainer1);
+
+ // exact matching with order maintained
+ Resource testCapability2 = BuilderUtils.newResource(2000, 1);
+ matches = amClient.getMatchingRequests(priority, node, testCapability2);
+ verifyMatches(matches, 2);
+ // must be returned in the order they were made
+ int i = 0;
+ for(StoredContainerRequest storedRequest1 : matches.get(0)) {
+ if(i++ == 0) {
+ assertTrue(storedContainer4 == storedRequest1);
+ } else {
+ assertTrue(storedContainer6 == storedRequest1);
+ }
+ }
+ amClient.removeContainerRequest(storedContainer6);
+
+ // matching with larger container. all requests returned
+ Resource testCapability3 = BuilderUtils.newResource(4000, 4);
+ matches = amClient.getMatchingRequests(priority, node, testCapability3);
+ assert(matches.size() == 4);
+
+ Resource testCapability4 = BuilderUtils.newResource(1024, 2);
+ matches = amClient.getMatchingRequests(priority, node, testCapability4);
+ assert(matches.size() == 2);
+ // verify non-fitting containers are not returned and fitting ones are
+ for(Collection<StoredContainerRequest> testSet : matches) {
+ assertTrue(testSet.size() == 1);
+ StoredContainerRequest testRequest = testSet.iterator().next();
+ assertTrue(testRequest != storedContainer4);
+ assertTrue(testRequest != storedContainer5);
+ assert(testRequest == storedContainer2 ||
+ testRequest == storedContainer3);
+ }
+
+ Resource testCapability5 = BuilderUtils.newResource(512, 4);
+ matches = amClient.getMatchingRequests(priority, node, testCapability5);
+ assert(matches.size() == 0);
+
+ amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
+
+ } finally {
+ if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+ amClient.stop();
+ }
+ }
+ }
+
+ private void verifyMatches(
+ List<? extends Collection<StoredContainerRequest>> matches,
+ int matchSize) {
+ assertTrue(matches.size() == 1);
+ assertTrue(matches.get(0).size() == matchSize);
+ }
+
+ @Test (timeout=60000)
+ public void testAMRMClientMatchStorage() throws YarnRemoteException, IOException {
+ AMRMClientImpl<StoredContainerRequest> amClient = null;
+ try {
+ // start am rm client
+ amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId);
+ amClient.init(conf);
+ amClient.start();
+ amClient.registerApplicationMaster("Host", 10000, "");
+
+ Priority priority1 = Records.newRecord(Priority.class);
+ priority1.setPriority(2);
+
+ StoredContainerRequest storedContainer1 =
+ new StoredContainerRequest(capability, nodes, racks, priority);
+ StoredContainerRequest storedContainer2 =
+ new StoredContainerRequest(capability, nodes, racks, priority);
+ StoredContainerRequest storedContainer3 =
+ new StoredContainerRequest(capability, null, null, priority1);
+ amClient.addContainerRequest(storedContainer1);
+ amClient.addContainerRequest(storedContainer2);
+ amClient.addContainerRequest(storedContainer3);
+
+ // test addition and storage
+ int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
+ .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+ assertTrue(containersRequestedAny == 2);
+ containersRequestedAny = amClient.remoteRequestsTable.get(priority1)
+ .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+ assertTrue(containersRequestedAny == 1);
+ List<? extends Collection<StoredContainerRequest>> matches =
+ amClient.getMatchingRequests(priority, node, capability);
+ verifyMatches(matches, 2);
+ matches = amClient.getMatchingRequests(priority, rack, capability);
+ verifyMatches(matches, 2);
+ matches =
+ amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability);
+ verifyMatches(matches, 2);
+ matches = amClient.getMatchingRequests(priority1, rack, capability);
+ assertTrue(matches.isEmpty());
+ matches =
+ amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability);
+ verifyMatches(matches, 1);
+
+ // test removal
+ amClient.removeContainerRequest(storedContainer3);
+ matches = amClient.getMatchingRequests(priority, node, capability);
+ verifyMatches(matches, 2);
+ amClient.removeContainerRequest(storedContainer2);
+ matches = amClient.getMatchingRequests(priority, node, capability);
+ verifyMatches(matches, 1);
+ matches = amClient.getMatchingRequests(priority, rack, capability);
+ verifyMatches(matches, 1);
+
+ // test matching of containers
+ StoredContainerRequest storedRequest = matches.get(0).iterator().next();
+ assertTrue(storedContainer1 == storedRequest);
+ amClient.removeContainerRequest(storedContainer1);
+ matches =
+ amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability);
+ assertTrue(matches.isEmpty());
+ matches =
+ amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability);
+ assertTrue(matches.isEmpty());
+ // 0 requests left. everything got cleaned up
+ assertTrue(amClient.remoteRequestsTable.isEmpty());
+
+ // go through an exemplary allocation, matching and release cycle
+ amClient.addContainerRequest(storedContainer1);
+ amClient.addContainerRequest(storedContainer3);
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ int iterationsLeft = 2;
+ while (allocatedContainerCount < 2
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertTrue(amClient.ask.size() == 0);
+ assertTrue(amClient.release.size() == 0);
+
+ assertTrue(nodeCount == amClient.getClusterNodeCount());
+ allocatedContainerCount += allocResponse.getAllocatedContainers().size();
+ for(Container container : allocResponse.getAllocatedContainers()) {
+ ContainerRequest expectedRequest =
+ container.getPriority().equals(storedContainer1.getPriority()) ?
+ storedContainer1 : storedContainer3;
+ matches = amClient.getMatchingRequests(container.getPriority(),
+ ResourceRequest.ANY,
+ container.getResource());
+ // test correct matched container is returned
+ verifyMatches(matches, 1);
+ ContainerRequest matchedRequest = matches.get(0).iterator().next();
+ assertTrue(matchedRequest == expectedRequest);
+
+ // assign this container, use it and release it
+ amClient.releaseAssignedContainer(container.getId());
+ }
+ if(allocatedContainerCount < containersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(1000);
+ }
+ }
+
+ assertTrue(allocatedContainerCount == 2);
+ assertTrue(amClient.release.size() == 2);
+ assertTrue(amClient.ask.size() == 0);
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertTrue(amClient.release.size() == 0);
+ assertTrue(amClient.ask.size() == 0);
+ assertTrue(allocResponse.getAllocatedContainers().size() == 0);
+
+
+ amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
+
+ } finally {
+ if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+ amClient.stop();
+ }
+ }
+ }
@Test (timeout=60000)
public void testAMRMClient() throws YarnRemoteException, IOException {
- AMRMClientImpl amClient = null;
+ AMRMClientImpl<ContainerRequest> amClient = null;
try {
// start am rm client
- amClient = new AMRMClientImpl(attemptId);
+ amClient = new AMRMClientImpl<ContainerRequest>(attemptId);
amClient.init(conf);
amClient.start();
@@ -156,36 +405,27 @@ public class TestAMRMClient {
}
}
}
-
-
- private void testAllocation(final AMRMClientImpl amClient)
+
+ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
throws YarnRemoteException, IOException {
// setup container request
- final Resource capability = Records.newRecord(Resource.class);
- final Priority priority = Records.newRecord(Priority.class);
- priority.setPriority(0);
- capability.setMemory(1024);
- String node = nodeReports.get(0).getNodeId().getHost();
- String rack = nodeReports.get(0).getRackName();
- final String[] nodes = { node };
- final String[] racks = { rack };
assertTrue(amClient.ask.size() == 0);
assertTrue(amClient.release.size() == 0);
- amClient.addContainerRequest(new ContainerRequest(capability, nodes,
- racks, priority, 1));
- amClient.addContainerRequest(new ContainerRequest(capability, nodes,
- racks, priority, 3));
- amClient.removeContainerRequest(new ContainerRequest(capability, nodes,
- racks, priority, 2));
+ amClient.addContainerRequest(
+ new ContainerRequest(capability, nodes, racks, priority, 1));
+ amClient.addContainerRequest(
+ new ContainerRequest(capability, nodes, racks, priority, 3));
+ amClient.removeContainerRequest(
+ new ContainerRequest(capability, nodes, racks, priority, 2));
int containersRequestedNode = amClient.remoteRequestsTable.get(priority)
- .get(node).get(capability).getNumContainers();
+ .get(node).get(capability).remoteRequest.getNumContainers();
int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
- .get(rack).get(capability).getNumContainers();
+ .get(rack).get(capability).remoteRequest.getNumContainers();
int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
- .get(ResourceRequest.ANY).get(capability).getNumContainers();
+ .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
assertTrue(containersRequestedNode == 2);
assertTrue(containersRequestedRack == 2);
@@ -221,8 +461,8 @@ public class TestAMRMClient {
assertTrue(amClient.ask.size() == 0);
// need to tell the AMRMClient that we dont need these resources anymore
- amClient.removeContainerRequest(new ContainerRequest(capability, nodes,
- racks, priority, 2));
+ amClient.removeContainerRequest(
+ new ContainerRequest(capability, nodes, racks, priority, 2));
assertTrue(amClient.ask.size() == 3);
// send 0 container count request for resources that are no longer needed
ResourceRequest snoopRequest = amClient.ask.iterator().next();
@@ -241,8 +481,9 @@ public class TestAMRMClient {
new Answer<AllocateResponse>() {
public AllocateResponse answer(InvocationOnMock invocation)
throws Exception {
- amClient.removeContainerRequest(new ContainerRequest(capability,
- nodes, racks, priority, 2));
+ amClient.removeContainerRequest(
+ new ContainerRequest(capability, nodes,
+ racks, priority, 2));
throw new Exception();
}
});
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java?rev=1488485&r1=1488484&r2=1488485&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java Sat Jun 1 08:23:30 2013
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
@@ -39,6 +40,9 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -48,9 +52,11 @@ public class TestAMRMClientAsync {
private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class);
+ @SuppressWarnings("unchecked")
@Test(timeout=10000)
public void testAMRMClientAsync() throws Exception {
Configuration conf = new Configuration();
+ final AtomicBoolean heartbeatBlock = new AtomicBoolean(true);
List<ContainerStatus> completed1 = Arrays.asList(
BuilderUtils.newContainerStatus(
BuilderUtils.newContainerId(0, 0, 0, 0),
@@ -65,20 +71,38 @@ public class TestAMRMClientAsync {
new ArrayList<ContainerStatus>(), new ArrayList<Container>());
TestCallbackHandler callbackHandler = new TestCallbackHandler();
- AMRMClient client = mock(AMRMClient.class);
- final AtomicBoolean secondHeartbeatReceived = new AtomicBoolean(false);
+ final AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+ final AtomicInteger secondHeartbeatSync = new AtomicInteger(0);
when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer(new Answer<AllocateResponse>() {
@Override
public AllocateResponse answer(InvocationOnMock invocation)
throws Throwable {
- secondHeartbeatReceived.set(true);
+ secondHeartbeatSync.incrementAndGet();
+ while(heartbeatBlock.get()) {
+ synchronized(heartbeatBlock) {
+ heartbeatBlock.wait();
+ }
+ }
+ secondHeartbeatSync.incrementAndGet();
return response2;
}
}).thenReturn(emptyResponse);
when(client.registerApplicationMaster(anyString(), anyInt(), anyString()))
.thenReturn(null);
+ when(client.getClusterAvailableResources()).thenAnswer(new Answer<Resource>() {
+ @Override
+ public Resource answer(InvocationOnMock invocation)
+ throws Throwable {
+ // take client lock to simulate behavior of real impl
+ synchronized (client) {
+ Thread.sleep(10);
+ }
+ return null;
+ }
+ });
- AMRMClientAsync asyncClient = new AMRMClientAsync(client, 20, callbackHandler);
+ AMRMClientAsync<ContainerRequest> asyncClient =
+ new AMRMClientAsync<ContainerRequest>(client, 20, callbackHandler);
asyncClient.init(conf);
asyncClient.start();
asyncClient.registerApplicationMaster("localhost", 1234, null);
@@ -86,10 +110,21 @@ public class TestAMRMClientAsync {
// while the CallbackHandler will still only be processing the first response,
// heartbeater thread should still be sending heartbeats.
// To test this, wait for the second heartbeat to be received.
- while (!secondHeartbeatReceived.get()) {
+ while (secondHeartbeatSync.get() < 1) {
Thread.sleep(10);
}
+ // heartbeat will be blocked. make sure we can call client methods at this
+ // time. Checks that heartbeat is not holding onto client lock
+ assert(secondHeartbeatSync.get() < 2);
+ asyncClient.getClusterAvailableResources();
+ // method returned. now unblock heartbeat
+ assert(secondHeartbeatSync.get() < 2);
+ synchronized (heartbeatBlock) {
+ heartbeatBlock.set(false);
+ heartbeatBlock.notifyAll();
+ }
+
// allocated containers should come before completed containers
Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
@@ -110,6 +145,73 @@ public class TestAMRMClientAsync {
Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
}
+ @Test(timeout=10000)
+ public void testAMRMClientAsyncException() throws Exception {
+ Configuration conf = new Configuration();
+ TestCallbackHandler callbackHandler = new TestCallbackHandler();
+ @SuppressWarnings("unchecked")
+ AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+ String exStr = "TestException";
+ YarnRemoteException mockException = mock(YarnRemoteException.class);
+ when(mockException.getMessage()).thenReturn(exStr);
+ when(client.allocate(anyFloat())).thenThrow(mockException);
+
+ AMRMClientAsync<ContainerRequest> asyncClient =
+ new AMRMClientAsync<ContainerRequest>(client, 20, callbackHandler);
+ asyncClient.init(conf);
+ asyncClient.start();
+
+ synchronized (callbackHandler.notifier) {
+ asyncClient.registerApplicationMaster("localhost", 1234, null);
+ while(callbackHandler.savedException == null) {
+ try {
+ callbackHandler.notifier.wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ Assert.assertTrue(callbackHandler.savedException.getMessage().contains(exStr));
+
+ asyncClient.stop();
+ // stopping should have joined all threads and completed all callbacks
+ Assert.assertTrue(callbackHandler.callbackCount == 0);
+ }
+
+ @Test(timeout=10000)
+ public void testAMRMClientAsyncReboot() throws Exception {
+ Configuration conf = new Configuration();
+ TestCallbackHandler callbackHandler = new TestCallbackHandler();
+ @SuppressWarnings("unchecked")
+ AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+
+ final AllocateResponse rebootResponse = createAllocateResponse(
+ new ArrayList<ContainerStatus>(), new ArrayList<Container>());
+ rebootResponse.setReboot(true);
+ when(client.allocate(anyFloat())).thenReturn(rebootResponse);
+
+ AMRMClientAsync<ContainerRequest> asyncClient =
+ new AMRMClientAsync<ContainerRequest>(client, 20, callbackHandler);
+ asyncClient.init(conf);
+ asyncClient.start();
+
+ synchronized (callbackHandler.notifier) {
+ asyncClient.registerApplicationMaster("localhost", 1234, null);
+ while(callbackHandler.reboot == false) {
+ try {
+ callbackHandler.notifier.wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ asyncClient.stop();
+ // stopping should have joined all threads and completed all callbacks
+ Assert.assertTrue(callbackHandler.callbackCount == 0);
+ }
+
private AllocateResponse createAllocateResponse(
List<ContainerStatus> completed, List<Container> allocated) {
AllocateResponse response = BuilderUtils.newAllocateResponse(0, completed, allocated,
@@ -120,6 +222,11 @@ public class TestAMRMClientAsync {
private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler {
private volatile List<ContainerStatus> completedContainers;
private volatile List<Container> allocatedContainers;
+ Exception savedException = null;
+ boolean reboot = false;
+ Object notifier = new Object();
+
+ int callbackCount = 0;
public List<ContainerStatus> takeCompletedContainers() {
List<ContainerStatus> ret = completedContainers;
@@ -176,9 +283,28 @@ public class TestAMRMClientAsync {
}
@Override
- public void onRebootRequest() {}
+ public void onRebootRequest() {
+ reboot = true;
+ synchronized (notifier) {
+ notifier.notifyAll();
+ }
+ }
@Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {}
+
+ @Override
+ public float getProgress() {
+ callbackCount++;
+ return 0.5f;
+ }
+
+ @Override
+ public void onError(Exception e) {
+ savedException = e;
+ synchronized (notifier) {
+ notifier.notifyAll();
+ }
+ }
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java?rev=1488485&r1=1488484&r2=1488485&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java Sat Jun 1 08:23:30 2013
@@ -64,7 +64,7 @@ public class TestNMClient {
Configuration conf = null;
MiniYARNCluster yarnCluster = null;
YarnClientImpl yarnClient = null;
- AMRMClientImpl rmClient = null;
+ AMRMClientImpl<ContainerRequest> rmClient = null;
NMClientImpl nmClient = null;
List<NodeReport> nodeReports = null;
ApplicationAttemptId attemptId = null;
@@ -136,7 +136,7 @@ public class TestNMClient {
}
// start am rm client
- rmClient = new AMRMClientImpl(attemptId);
+ rmClient = new AMRMClientImpl<ContainerRequest>(attemptId);
rmClient.init(conf);
rmClient.start();
assertNotNull(rmClient);
@@ -185,7 +185,8 @@ public class TestNMClient {
null, null);
}
- private Set<Container> allocateContainers(AMRMClientImpl rmClient, int num)
+ private Set<Container> allocateContainers(
+ AMRMClientImpl<ContainerRequest> rmClient, int num)
throws YarnRemoteException, IOException {
// setup container request
Resource capability = Resource.newInstance(1024, 0);
@@ -201,7 +202,8 @@ public class TestNMClient {
}
int containersRequestedAny = rmClient.remoteRequestsTable.get(priority)
- .get(ResourceRequest.ANY).get(capability).getNumContainers();
+ .get(ResourceRequest.ANY).get(capability).remoteRequest
+ .getNumContainers();
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;