You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/03/17 21:21:54 UTC
svn commit: r1082677 [6/38] - in /hadoop/mapreduce/branches/MR-279: ./
assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/
mr-client/hadoop-mapreduce-client-app/src/
mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapreduc...
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,239 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.launcher;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMasterConstants;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerManager;
+import org.apache.hadoop.yarn.ContainerToken;
+
+/**
+ * This class is responsible for launching of containers.
+ */
+public class ContainerLauncherImpl extends AbstractService implements
+ ContainerLauncher {
+
+ private static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class);
+
+ private AppContext context;
+ private ThreadPoolExecutor launcherPool;
+ private Thread eventHandlingThread;
+ private BlockingQueue<ContainerLauncherEvent> eventQueue =
+ new LinkedBlockingQueue<ContainerLauncherEvent>();
+
+ public ContainerLauncherImpl(AppContext context) {
+ super(ContainerLauncherImpl.class.getName());
+ this.context = context;
+ }
+
+ @Override
+ public synchronized void init(Configuration conf) {
+ // Clone configuration for this component so that the SecurityInfo setting
+ // doesn't affect the original configuration
+ Configuration myLocalConfig = new Configuration(conf);
+ myLocalConfig.setClass(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_INFO_CLASS_NAME,
+ ContainerManagerSecurityInfo.class, SecurityInfo.class);
+ super.init(myLocalConfig);
+ }
+
+ public void start() {
+ launcherPool =
+ new ThreadPoolExecutor(getConfig().getInt(
+ MRAppMasterConstants.CONTAINERLAUNCHER_THREADPOOL_SIZE, 10),
+ Integer.MAX_VALUE, 1, TimeUnit.HOURS,
+ new LinkedBlockingQueue<Runnable>());
+ launcherPool.prestartAllCoreThreads(); // Wait for work.
+ eventHandlingThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ ContainerLauncherEvent event = null;
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ event = eventQueue.take();
+ } catch (InterruptedException e) {
+ LOG.error("Returning, interrupted : " + e);
+ return;
+ }
+ // the events from the queue are handled in parallel
+ // using a thread pool
+ launcherPool.execute(new EventProcessor(event));
+
+ // TODO: Group launching of multiple containers to a single
+ // NodeManager into a single connection
+ }
+ }
+ });
+ eventHandlingThread.start();
+ super.start();
+ }
+
+ public void stop() {
+ eventHandlingThread.interrupt();
+ launcherPool.shutdown();
+ super.stop();
+ }
+
+ protected ContainerManager getCMProxy(ContainerID containerID,
+ final String containerManagerBindAddr, ContainerToken containerToken)
+ throws IOException {
+
+ UserGroupInformation user = UserGroupInformation.getLoginUser();
+ if (UserGroupInformation.isSecurityEnabled()) {
+ Token<ContainerTokenIdentifier> token =
+ new Token<ContainerTokenIdentifier>(
+ containerToken.identifier.array(),
+ containerToken.password.array(), new Text(
+ containerToken.kind.toString()), new Text(
+ containerToken.service.toString()));
+ user.addToken(token);
+ }
+ ContainerManager proxy =
+ user.doAs(new PrivilegedAction<ContainerManager>() {
+ @Override
+ public ContainerManager run() {
+ YarnRPC rpc = YarnRPC.create(getConfig());
+ return (ContainerManager) rpc.getProxy(ContainerManager.class,
+ NetUtils.createSocketAddr(containerManagerBindAddr),
+ getConfig());
+ }
+ });
+ return proxy;
+ }
+
+ /**
+ * Setup and start the container on remote nodemanager.
+ */
+ private class EventProcessor implements Runnable {
+ private ContainerLauncherEvent event;
+
+ EventProcessor(ContainerLauncherEvent event) {
+ this.event = event;
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Processing the event " + event.toString());
+
+ // Load ContainerManager tokens before creating a connection.
+ // TODO: Do it only once per NodeManager.
+ final String containerManagerBindAddr = event.getContainerMgrAddress();
+ ContainerID containerID = event.getContainerID();
+ ContainerToken containerToken = event.getContainerToken();
+
+ switch(event.getType()) {
+
+ case CONTAINER_REMOTE_LAUNCH:
+ ContainerRemoteLaunchEvent launchEv = (ContainerRemoteLaunchEvent) event;
+
+ try {
+
+ ContainerManager proxy =
+ getCMProxy(containerID, containerManagerBindAddr, containerToken);
+
+ // Construct the actual Container
+ ContainerLaunchContext containerLaunchContext =
+ launchEv.getContainer();
+
+ // TODO: Make sure that child's mapred-local-dir is set correctly.
+
+ // Now launch the actual container
+ proxy.startContainer(containerLaunchContext);
+
+ // after launching send launched event to taskattempt
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(launchEv.getTaskAttemptID(),
+ TaskAttemptEventType.TA_CONTAINER_LAUNCHED));
+ } catch (Exception e) {
+ LOG.error("Container launch failed", e);
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(launchEv.getTaskAttemptID(),
+ TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
+ }
+
+ break;
+ case CONTAINER_REMOTE_CLEANUP:
+
+ // We will have to remove the launch event if it is still in eventQueue
+ // and not yet processed
+ if (eventQueue.contains(event)) {
+ eventQueue.remove(event); // TODO: Any synchro needed?
+ // TODO: raise any event?
+ } else {
+ try {
+ ContainerManager proxy =
+ getCMProxy(containerID, containerManagerBindAddr, containerToken);
+ // TODO:check whether container is launched
+
+ // kill the remote container if already launched
+ proxy.stopContainer(event.getContainerID());
+ proxy.cleanupContainer(event.getContainerID());
+ } catch (Exception e) {
+ //ignore the cleanup failure
+ LOG.warn("cleanup failed for container " + event.getContainerID() ,
+ e);
+ }
+
+ // after killing send killed event to taskattempt
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(event.getTaskAttemptID(),
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ }
+ break;
+ }
+ }
+
+ }
+
+ @Override
+ public void handle(ContainerLauncherEvent event) {
+ try {
+ eventQueue.put(event);
+ } catch (InterruptedException e) {
+ throw new YarnException(e);
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,40 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.launcher;
+
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerToken;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+public abstract class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
+
+ public ContainerRemoteLaunchEvent(TaskAttemptID taskAttemptID,
+ ContainerID containerID, String containerMgrAddress,
+ ContainerToken containerToken) {
+ super(taskAttemptID, containerID, containerMgrAddress,
+ containerToken,
+ ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH);
+ }
+ public abstract ContainerLaunchContext getContainer();
+
+ public abstract Task getRemoteTask();
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocator.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocator.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,31 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface ContainerAllocator extends EventHandler<ContainerAllocatorEvent>{
+
+ enum EventType {
+
+ CONTAINER_REQ,
+ CONTAINER_DEALLOCATE
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocatorEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocatorEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocatorEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocatorEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+public class ContainerAllocatorEvent extends
+ AbstractEvent<ContainerAllocator.EventType> {
+
+ private TaskAttemptID attemptID;
+
+ public ContainerAllocatorEvent(TaskAttemptID attemptID,
+ ContainerAllocator.EventType type) {
+ super(type);
+ this.attemptID = attemptID;
+ }
+
+ public TaskAttemptID getAttemptID() {
+ return attemptID;
+ }
+}
\ No newline at end of file
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+public class ContainerRequestEvent extends ContainerAllocatorEvent {
+
+ private Priority priority;
+ private Resource capability;
+ private String[] hosts;
+ private String[] racks;
+
+ public ContainerRequestEvent(TaskAttemptID attemptID,
+ Resource capability, int priority,
+ String[] hosts, String[] racks) {
+ super(attemptID, ContainerAllocator.EventType.CONTAINER_REQ);
+ this.capability = capability;
+ this.priority = new Priority();
+ this.priority.priority = priority;
+ this.hosts = hosts;
+ this.racks = racks;
+ }
+
+ public Resource getCapability() {
+ return capability;
+ }
+
+ public Priority getPriority() {
+ return priority;
+ }
+
+ public String[] getHosts() {
+ return hosts;
+ }
+
+ public String[] getRacks() {
+ return racks;
+ }
+}
\ No newline at end of file
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,471 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.AMRMProtocol;
+import org.apache.hadoop.yarn.AMResponse;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationState;
+import org.apache.hadoop.yarn.ApplicationStatus;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerState;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceRequest;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+/**
+ * Allocates the container from the ResourceManager scheduler.
+ */
+public class RMContainerAllocator extends AbstractService
+implements ContainerAllocator {
+ private static final Log LOG =
+ LogFactory.getLog(RMContainerAllocator.class);
+ private static final String ANY = "*";
+ private static int rmPollInterval;//millis
+ private ApplicationID applicationId;
+ private EventHandler eventHandler;
+ private volatile boolean stopped;
+ protected Thread allocatorThread;
+ private ApplicationMaster applicationMaster;
+ private AMRMProtocol scheduler;
+ private final ClientService clientService;
+ private int lastResponseID = 0;
+
+ //mapping for assigned containers
+ private final Map<ContainerID, TaskAttemptID> assignedMap =
+ new HashMap<ContainerID, TaskAttemptID>();
+
+ private final Map<Priority,
+ Map<Resource,LinkedList<ContainerRequestEvent>>> localRequestsQueue =
+ new HashMap<Priority, Map<Resource,LinkedList<ContainerRequestEvent>>>();
+
+ //Key -> Priority
+ //Value -> Map
+ //Key->ResourceName (eg. hostname, rackname, *)
+ //Value->Map
+ //Key->Resource Capability
+ //Value->ResourceReqeust
+ private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
+ remoteRequestsTable =
+ new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
+
+
+ private final Set<ResourceRequest> ask =new TreeSet<ResourceRequest>();
+ private final Set<Container> release = new TreeSet<Container>();
+
+ public RMContainerAllocator(ClientService clientService, AppContext context) {
+ super("RMContainerAllocator");
+ this.clientService = clientService;
+ this.applicationId = context.getApplicationID();
+ this.eventHandler = context.getEventHandler();
+ this.applicationMaster = new ApplicationMaster();
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ rmPollInterval = conf.getInt(YarnConfiguration.AM_EXPIRY_INTERVAL, 10000)/3;
+ }
+
+ @Override
+ public void start() {
+ scheduler= createSchedulerProxy();
+ //LOG.info("Scheduler is " + scheduler);
+ register();
+ startAllocatorThread();
+ super.start();
+ }
+
+ protected void register() {
+ //Register
+ applicationMaster.applicationId = applicationId;
+ applicationMaster.host =
+ clientService.getBindAddress().getAddress().getHostAddress();
+ applicationMaster.rpcPort = clientService.getBindAddress().getPort();
+ applicationMaster.state = ApplicationState.RUNNING;
+ applicationMaster.httpPort = clientService.getHttpPort();
+ applicationMaster.status = new ApplicationStatus();
+ applicationMaster.status.applicationId = applicationId;
+ applicationMaster.status.progress = 0.0f;
+ try {
+ scheduler.registerApplicationMaster(applicationMaster);
+ } catch(Exception are) {
+ LOG.info("Exception while registering", are);
+ throw new YarnException(are);
+ }
+ }
+
+ protected void unregister() {
+ try {
+ applicationMaster.state = ApplicationState.COMPLETED;
+ scheduler.finishApplicationMaster(applicationMaster);
+ } catch(Exception are) {
+ LOG.info("Error while unregistering ", are);
+ }
+ }
+
+ @Override
+ public void stop() {
+ stopped = true;
+ allocatorThread.interrupt();
+ try {
+ allocatorThread.join();
+ } catch (InterruptedException ie) {
+ LOG.info("Interruped Exception while stopping", ie);
+ }
+ unregister();
+ super.stop();
+ }
+
+ protected void startAllocatorThread() {
+ allocatorThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ try {
+ Thread.sleep(rmPollInterval);
+ try {
+ allocate();
+ } catch (Exception e) {
+ LOG.error("ERROR IN CONTACTING RM.", e);
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Allocated thread interrupted. Returning");
+ return;
+ }
+ }
+ }
+ });
+ allocatorThread.start();
+ }
+
+ protected AMRMProtocol createSchedulerProxy() {
+ final YarnRPC rpc = YarnRPC.create(getConfig());
+ final Configuration conf = new Configuration(getConfig());
+ final String serviceAddr = conf.get(
+ YarnConfiguration.SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_SCHEDULER_BIND_ADDRESS);
+
+ UserGroupInformation currentUser;
+ try {
+ currentUser = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+ SchedulerSecurityInfo.class, SecurityInfo.class);
+
+ String tokenURLEncodedStr =
+ System.getenv().get(
+ YarnConfiguration.APPLICATION_MASTER_TOKEN_ENV_NAME);
+ LOG.debug("AppMasterToken is " + tokenURLEncodedStr);
+ Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+
+ try {
+ token.decodeFromUrlString(tokenURLEncodedStr);
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+
+ currentUser.addToken(token);
+ }
+
+ return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+ @Override
+ public AMRMProtocol run() {
+ return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
+ NetUtils.createSocketAddr(serviceAddr), conf);
+ }
+ });
+ }
+
+ // TODO: Need finer synchronization.
+ protected synchronized void allocate() throws Exception {
+ assign(getResources());
+ }
+
+ @Override
+ public synchronized void handle(ContainerAllocatorEvent event) {
+ LOG.info("Processing the event " + event.toString());
+ //TODO: can be replaced by switch instead of if-else
+ if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
+ requestContainer((ContainerRequestEvent) event);
+ } else if (
+ event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
+ //TODO: handle deallocation
+ }
+ }
+
+ protected synchronized void requestContainer(ContainerRequestEvent event) {
+ //add to the localRequestsQueue
+ //localRequests Queue is hashed by Resource and Priority for easy lookups
+ Map<Resource, LinkedList<ContainerRequestEvent>> eventMap =
+ this.localRequestsQueue.get(event.getPriority());
+ if (eventMap == null) {
+ eventMap = new HashMap<Resource, LinkedList<ContainerRequestEvent>>();
+ this.localRequestsQueue.put(event.getPriority(), eventMap);
+ }
+
+ LinkedList<ContainerRequestEvent> eventList =
+ eventMap.get(event.getCapability());
+ if (eventList == null) {
+ eventList = new LinkedList<ContainerRequestEvent>();
+ eventMap.put(event.getCapability(), eventList);
+ }
+ eventList.add(event);
+
+ // Create resource requests
+ for (String host : event.getHosts()) {
+ // Data-local
+ addResourceRequest(event.getPriority(), host, event.getCapability());
+ }
+
+ // Nothing Rack-local for now
+ for (String rack : event.getRacks()) {
+ addResourceRequest(event.getPriority(), rack, event.getCapability());
+ }
+
+ // Off-switch
+ addResourceRequest(event.getPriority(), ANY, event.getCapability());
+
+ }
+
+ private void addResourceRequest(Priority priority, String resourceName,
+ Resource capability) {
+ Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+ this.remoteRequestsTable.get(priority);
+ if (remoteRequests == null) {
+ remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
+ this.remoteRequestsTable.put(priority, remoteRequests);
+ LOG.info("Added priority=" + priority);
+ }
+ Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+ if (reqMap == null) {
+ reqMap = new HashMap<Resource, ResourceRequest>();
+ remoteRequests.put(resourceName, reqMap);
+ }
+ ResourceRequest remoteRequest = reqMap.get(capability);
+ if (remoteRequest == null) {
+ remoteRequest = new ResourceRequest();
+ remoteRequest.priority = priority;
+ remoteRequest.hostName = resourceName;
+ remoteRequest.capability = capability;
+ remoteRequest.numContainers = 0;
+ reqMap.put(capability, remoteRequest);
+ }
+ remoteRequest.numContainers++;
+
+ // Note this down for next interaction with ResourceManager
+ ask.add(remoteRequest);
+ LOG.info("addResourceRequest:" + " applicationId=" + applicationId.id
+ + " priority=" + priority.priority + " resourceName=" + resourceName
+ + " numContainers=" + remoteRequest.numContainers + " #asks="
+ + ask.size());
+ }
+
+ private void decResourceRequest(Priority priority, String resourceName,
+ Resource capability) {
+ Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+ this.remoteRequestsTable.get(priority);
+ Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+ ResourceRequest remoteRequest = reqMap.get(capability);
+
+ LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.id
+ + " priority=" + priority.priority + " resourceName=" + resourceName
+ + " numContainers=" + remoteRequest.numContainers + " #asks="
+ + ask.size());
+
+ remoteRequest.numContainers--;
+ if (remoteRequest.numContainers == 0) {
+ reqMap.remove(capability);
+ if (reqMap.size() == 0) {
+ remoteRequests.remove(resourceName);
+ }
+ if (remoteRequests.size() == 0) {
+ remoteRequestsTable.remove(priority);
+ }
+ //remove from ask if it may have
+ ask.remove(remoteRequest);
+ } else {
+ ask.add(remoteRequest);//this will override the request if ask doesn't
+ //already have it.
+ }
+
+ LOG.info("AFTER decResourceRequest:" + " applicationId=" + applicationId.id
+ + " priority=" + priority.priority + " resourceName=" + resourceName
+ + " numContainers=" + remoteRequest.numContainers + " #asks="
+ + ask.size());
+ }
+
+ private List<Container> getResources() throws Exception {
+ ApplicationStatus status = new ApplicationStatus();
+ status.applicationId = applicationId;
+ status.responseID = lastResponseID;
+ AMResponse response =
+ scheduler.allocate(status,
+ new ArrayList(ask), new ArrayList(release));
+ lastResponseID = response.responseId;
+ List<Container> allContainers = response.containers;
+ ask.clear();
+ release.clear();
+
+ LOG.info("getResources() for " + applicationId + ":" +
+ " ask=" + ask.size() +
+ " release= "+ release.size() +
+ " recieved=" + allContainers.size());
+ List<Container> allocatedContainers = new ArrayList<Container>();
+ for (Container cont : allContainers) {
+ if (cont.state != ContainerState.COMPLETE) {
+ allocatedContainers.add(cont);
+ LOG.debug("Received Container :" + cont);
+ } else {
+ LOG.info("Received completed container " + cont);
+ TaskAttemptID attemptID = assignedMap.remove(cont.id);
+ if (attemptID == null) {
+ LOG.error("Container complete event for unknown container id " +
+ cont.id);
+ } else {
+ //send the container completed event to Task attempt
+ eventHandler.handle(new TaskAttemptEvent(attemptID,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+ }
+ }
+ LOG.debug("Received Container :" + cont);
+ }
+ return allocatedContainers;
+ }
+
+ private void assign(List<Container> allocatedContainers) {
+ // Schedule in priority order
+ for (Priority priority : localRequestsQueue.keySet()) {
+ LOG.info("Assigning for priority " + priority);
+ assign(priority, allocatedContainers);
+ if (allocatedContainers.isEmpty()) {
+ break;
+ }
+ }
+
+ if (!allocatedContainers.isEmpty()) {
+ //TODO
+ //after the assigment, still containers are left
+ //This can happen if container requests are cancelled by AM, currently
+ //not there. release the unassigned containers??
+
+ //LOG.info("Releasing container " + allocatedContainer);
+ //release.add(allocatedContainer);
+ }
+ }
+
+ private void assign(Priority priority, List<Container> allocatedContainers) {
+ for (Iterator<Container> i=allocatedContainers.iterator(); i.hasNext();) {
+ Container allocatedContainer = i.next();
+ String host = allocatedContainer.hostName.toString();
+ Resource capability = allocatedContainer.resource;
+
+ LinkedList<ContainerRequestEvent> requestList =
+ localRequestsQueue.get(priority).get(capability);
+
+ if (requestList == null) {
+ LOG.info("No request match at priority " + priority);
+ return;
+ }
+
+ ContainerRequestEvent assigned = null;
+ //walk thru the requestList to see if in any host matches
+ Iterator<ContainerRequestEvent> it = requestList.iterator();
+ while (it.hasNext()) {
+ ContainerRequestEvent event = it.next();
+ if (Arrays.asList(event.getHosts()).contains(host)) { // TODO: Fix
+ assigned = event;
+ it.remove();
+ // Update resource requests
+ for (String hostName : event.getHosts()) {
+ decResourceRequest(priority, hostName, capability);
+ }
+ break;
+ }
+ }
+ if (assigned == null) {//host didn't match
+ if (requestList.size() > 0) {
+ //choose the first one in queue
+ assigned = requestList.remove();
+ }
+ }
+
+ if (assigned != null) {
+
+ i.remove(); // Remove from allocated Containers list also.
+
+ // Update resource requests
+ decResourceRequest(priority, ANY, capability);
+
+ //send the container assigned event to Task attempt
+ eventHandler.handle(new TaskAttemptContainerAssignedEvent(assigned
+ .getAttemptID(), allocatedContainer.id,
+ allocatedContainer.hostName.toString(),
+ allocatedContainer.containerToken));
+
+ assignedMap.put(allocatedContainer.id, assigned.getAttemptID());
+
+ LOG.info("Assigned container (" + allocatedContainer + ") " +
+ " to task " + assigned.getAttemptID() + " at priority " + priority +
+ " on node " + allocatedContainer.hostName.toString());
+ }
+ }
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/StaticContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/StaticContainerAllocator.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/StaticContainerAllocator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/StaticContainerAllocator.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,157 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.ContainerID;
+
+/**
+ * Reads the static list of NodeManager from config file and allocate
+ * containers.
+ */
+public class StaticContainerAllocator extends AbstractService
+ implements ContainerAllocator {
+
+ private static final Log LOG =
+ LogFactory.getLog(StaticContainerAllocator.class);
+
+ private AppContext context;
+ private volatile boolean stopped;
+ private BlockingQueue<ContainerAllocatorEvent> eventQueue =
+ new LinkedBlockingQueue<ContainerAllocatorEvent>();
+ private Thread allocatorThread;
+
+ private int containerCount;
+
+ private List<String> containerMgrList;
+ private int nextIndex;
+
+ public StaticContainerAllocator(AppContext context) {
+ super("StaticContainerAllocator");
+ this.context = context;
+ }
+
+ protected List<String> getContainerMgrList(Configuration conf)
+ throws IOException {
+ Path jobSubmitDir = FileContext.getLocalFSFileContext().makeQualified(
+ new Path(new File(YARNApplicationConstants.JOB_SUBMIT_DIR).getAbsolutePath()));
+ Path jobConfFile = new Path(jobSubmitDir, YARNApplicationConstants.JOB_CONF_FILE);
+ conf.addResource(jobConfFile);
+ String[] containerMgrHosts =
+ conf.getStrings(YARNApplicationConstants.NM_HOSTS_CONF_KEY);
+ return Arrays.asList(containerMgrHosts);
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ try {
+ containerMgrList = getContainerMgrList(conf);
+ } catch (IOException e) {
+ throw new YarnException("Cannot get container-managers list ", e);
+ }
+
+ if (containerMgrList.size() == 0) {
+ throw new YarnException("No of Container Managers are zero.");
+ }
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ allocatorThread = new Thread(new Allocator());
+ allocatorThread.start();
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ stopped = true;
+ allocatorThread.interrupt();
+ try {
+ allocatorThread.join();
+ } catch (InterruptedException ie) {
+ LOG.debug("Interruped Exception while stopping", ie);
+ }
+ super.stop();
+ }
+
+ @Override
+ public void handle(ContainerAllocatorEvent event) {
+ try {
+ eventQueue.put(event);
+ } catch (InterruptedException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ private class Allocator implements Runnable {
+ @Override
+ public void run() {
+ ContainerAllocatorEvent event = null;
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ try {
+ event = eventQueue.take();
+ LOG.info("Processing the event " + event.toString());
+ allocate(event);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+
+ private void allocate(ContainerAllocatorEvent event) {
+ // allocate the container in round robin fashion on
+ // container managers
+ if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
+ if (nextIndex < containerMgrList.size()) {
+ String containerMgr = containerMgrList.get(nextIndex);
+ ContainerID containerID = generateContainerID();
+
+ context.getEventHandler().handle(
+ new TaskAttemptContainerAssignedEvent(
+ event.getAttemptID(),
+ containerID, containerMgr, null));
+ }
+ }
+ }
+
+ private ContainerID generateContainerID() {
+ ContainerID cId = new ContainerID();
+ cId.appID = context.getApplicationID();
+ cId.id = containerCount++;
+ return cId;
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,82 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.speculate;
+
+public class DataStatistics {
+ private int count = 0;
+ private double sum = 0;
+ private double sumSquares = 0;
+
+ public DataStatistics() {
+ }
+
+ public DataStatistics(double initNum) {
+ this.count = 1;
+ this.sum = initNum;
+ this.sumSquares = initNum * initNum;
+ }
+
+ public synchronized void add(double newNum) {
+ this.count++;
+ this.sum += newNum;
+ this.sumSquares += newNum * newNum;
+ }
+
+ public void updateStatistics(double old, double update) {
+ sub(old);
+ add(update);
+ }
+
+ private synchronized void sub(double oldNum) {
+ this.count--;
+ this.sum = Math.max(this.sum -= oldNum, 0.0d);
+ this.sumSquares = Math.max(this.sumSquares -= oldNum * oldNum, 0.0d);
+ }
+
+ public synchronized double mean() {
+ return count == 0 ? 0.0 : sum/count;
+ }
+
+ public synchronized double var() {
+ // E(X^2) - E(X)^2
+ double mean = mean();
+ return Math.max((sumSquares/count) - mean * mean, 0.0d);
+ }
+
+ public synchronized double std() {
+ return Math.sqrt(this.var());
+ }
+
+ public synchronized double outlier(float sigma) {
+ if (count != 0.0) {
+ return mean() + std() * sigma;
+ }
+
+ return 0.0;
+ }
+
+ public synchronized double count() {
+ return count;
+ }
+
+ public String toString() {
+ return "DataStatistics: count is " + count + ", sum is " + sum +
+ ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std();
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,503 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.speculate;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.Clock;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+public class DefaultSpeculator extends AbstractService implements
+ Speculator {
+
+ private static final long ON_SCHEDULE = Long.MIN_VALUE;
+ private static final long ALREADY_SPECULATING = Long.MIN_VALUE + 1;
+ private static final long TOO_NEW = Long.MIN_VALUE + 2;
+ private static final long PROGRESS_IS_GOOD = Long.MIN_VALUE + 3;
+ private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
+ private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;
+
+ private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L * 1L;
+ private static final long SOONEST_RETRY_AFTER_SPECULATE = 1000L * 15L;
+
+ private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1;
+ private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01;
+ private static final int MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10;
+
+ private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class);
+
+ private final ConcurrentMap<TaskID, Boolean> runningTasks
+ = new ConcurrentHashMap<TaskID, Boolean>();
+
+ private final Map<Task, AtomicBoolean> pendingSpeculations
+ = new ConcurrentHashMap<Task, AtomicBoolean>();
+
+ // These are the current needs, not the initial needs. For each job, these
+ // record the number of attempts that exist and that are actively
+ // waiting for a container [as opposed to running or finished]
+ private final ConcurrentMap<JobID, AtomicInteger> mapContainerNeeds
+ = new ConcurrentHashMap<JobID, AtomicInteger>();
+ private final ConcurrentMap<JobID, AtomicInteger> reduceContainerNeeds
+ = new ConcurrentHashMap<JobID, AtomicInteger>();
+
+ private final Set<TaskID> mayHaveSpeculated = new HashSet();
+
+ private final Configuration conf;
+ private AppContext context;
+ private Thread speculationBackgroundThread = null;
+ private BlockingQueue<SpeculatorEvent> eventQueue
+ = new LinkedBlockingQueue<SpeculatorEvent>();
+ private TaskRuntimeEstimator estimator;
+
+ private BlockingQueue<Object> scanControl = new LinkedBlockingQueue<Object>();
+
+ private final Clock clock;
+
+ private final EventHandler<TaskEvent> eventHandler;
+
+ public DefaultSpeculator(Configuration conf, AppContext context) {
+ this(conf, context, new Clock());
+ }
+
+ public DefaultSpeculator(Configuration conf, AppContext context, Clock clock) {
+ this(conf, context, getEstimator(conf, context), clock);
+ }
+
+ static private TaskRuntimeEstimator getEstimator
+ (Configuration conf, AppContext context) {
+ TaskRuntimeEstimator estimator;
+
+ try {
+ // "yarn.mapreduce.job.task.runtime.estimator.class"
+ Class<? extends TaskRuntimeEstimator> estimatorClass
+ = conf.getClass(YarnMRJobConfig.TASK_RUNTIME_ESTIMATOR_CLASS,
+ LegacyTaskRuntimeEstimator.class,
+ TaskRuntimeEstimator.class);
+
+ Constructor<? extends TaskRuntimeEstimator> estimatorConstructor
+ = estimatorClass.getConstructor();
+
+ estimator = estimatorConstructor.newInstance();
+
+ estimator.contextualize(conf, context);
+ } catch (InstantiationException ex) {
+ LOG.error("Can't make a speculation runtime extimator" + ex);
+ throw new YarnException(ex);
+ } catch (IllegalAccessException ex) {
+ LOG.error("Can't make a speculation runtime extimator" + ex);
+ throw new YarnException(ex);
+ } catch (InvocationTargetException ex) {
+ LOG.error("Can't make a speculation runtime extimator" + ex);
+ throw new YarnException(ex);
+ } catch (NoSuchMethodException ex) {
+ LOG.error("Can't make a speculation runtime extimator" + ex);
+ throw new YarnException(ex);
+ }
+
+ return estimator;
+ }
+
+ // This constructor is designed to be called by other constructors.
+ // However, it's public because we do use it in the test cases.
+ // Normally we figure out our own estimator.
+ public DefaultSpeculator
+ (Configuration conf, AppContext context,
+ TaskRuntimeEstimator estimator, Clock clock) {
+ super(DefaultSpeculator.class.getName());
+
+ this.conf = conf;
+ this.context = context;
+ this.estimator = estimator;
+ this.clock = clock;
+ this.eventHandler = context.getEventHandler();
+ }
+
+/* ************************************************************* */
+
+ // This is the task-mongering that creates the two new threads -- one for
+ // processing events from the event queue and one for periodically
+ // looking for speculation opportunities
+
+ @Override
+ public void start() {
+ Runnable speculationBackgroundCore
+ = new Runnable() {
+ @Override
+ public void run() {
+ while (!Thread.currentThread().isInterrupted()) {
+ long backgroundRunStartTime = clock.getTime();
+ try {
+ int speculations = computeSpeculations();
+ long mininumRecomp
+ = speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE
+ : SOONEST_RETRY_AFTER_NO_SPECULATE;
+
+ long wait = Math.max(mininumRecomp,
+ clock.getTime() - backgroundRunStartTime);
+
+ if (speculations > 0) {
+ LOG.info("We launched " + speculations
+ + " speculations. Sleeping " + wait + " milliseconds.");
+ }
+
+ Object pollResult
+ = scanControl.poll(wait, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.error("Background thread returning, interrupted : " + e);
+ e.printStackTrace(System.out);
+ return;
+ }
+ }
+ }
+ };
+ speculationBackgroundThread = new Thread
+ (speculationBackgroundCore, "DefaultSpeculator background processing");
+ speculationBackgroundThread.start();
+
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ // this could be called before background thread is established
+ if (speculationBackgroundThread != null) {
+ speculationBackgroundThread.interrupt();
+ }
+ super.stop();
+ }
+
+ @Override
+ public void handleAttempt(TaskAttemptStatus status) {
+ long timestamp = clock.getTime();
+ statusUpdate(status, timestamp);
+ }
+
+ // This section is not part of the Speculator interface; it's used only for
+ // testing
+ public boolean eventQueueEmpty() {
+ return eventQueue.isEmpty();
+ }
+
+ // This interface is intended to be used only for test cases.
+ public void scanForSpeculations() {
+ LOG.info("We got asked to run a debug speculation scan.");
+ // debug
+ System.out.println("We got asked to run a debug speculation scan.");
+ System.out.println("There are " + scanControl.size()
+ + " events stacked already.");
+ scanControl.add(new Object());
+ Thread.yield();
+ }
+
+
+/* ************************************************************* */
+
+ // This section contains the code that gets run for a SpeculatorEvent
+
+ private AtomicInteger containerNeed(TaskID taskID) {
+ JobID jobID = taskID.jobID;
+ TaskType taskType = taskID.taskType;
+
+ ConcurrentMap<JobID, AtomicInteger> relevantMap
+ = taskType == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;
+
+ AtomicInteger result = relevantMap.get(jobID);
+
+ if (result == null) {
+ relevantMap.putIfAbsent(jobID, new AtomicInteger(0));
+ result = relevantMap.get(jobID);
+ }
+
+ return result;
+ }
+
+ private synchronized void processSpeculatorEvent(SpeculatorEvent event) {
+ switch (event.getType()) {
+ case ATTEMPT_STATUS_UPDATE:
+ statusUpdate(event.getReportedStatus(), event.getTimestamp());
+ break;
+
+ case TASK_CONTAINER_NEED_UPDATE:
+ {
+ AtomicInteger need = containerNeed(event.getTaskID());
+ int newNeed = need.addAndGet(event.containersNeededChange());
+ break;
+ }
+
+ case ATTEMPT_START:
+ {
+ estimator.enrollAttempt
+ (event.getReportedStatus(), event.getTimestamp());
+ }
+ }
+ }
+
+ /**
+ * Absorbs one TaskAttemptStatus
+ *
+ * @param reportedStatus the status report that we got from a task attempt
+ * that we want to fold into the speculation data for this job
+ * @param timestamp the time this status corresponds to. This matters
+ * because statuses contain progress.
+ */
+ protected void statusUpdate(TaskAttemptStatus reportedStatus, long timestamp) {
+
+ String stateString = reportedStatus.stateString.toString();
+
+ TaskAttemptID attemptID = reportedStatus.id;
+ TaskID taskID = attemptID.taskID;
+ Job job = context.getJob(taskID.jobID);
+
+ if (job == null) {
+ return;
+ }
+
+ Task task = job.getTask(taskID);
+
+ if (task == null) {
+ return;
+ }
+
+ estimator.updateAttempt(reportedStatus, timestamp);
+
+ // If the task is already known to be speculation-bait, don't do anything
+ if (pendingSpeculations.get(task) != null) {
+ if (pendingSpeculations.get(task).get()) {
+ return;
+ }
+ }
+
+ if (stateString.equals(TaskAttemptState.RUNNING.name())) {
+ runningTasks.putIfAbsent(taskID, Boolean.TRUE);
+ } else {
+ runningTasks.remove(taskID, Boolean.TRUE);
+ }
+ }
+
+/* ************************************************************* */
+
+// This is the code section that runs periodically and adds speculations for
+// those jobs that need them.
+
+
+ // This can return a few magic values for tasks that shouldn't speculate:
+ // returns ON_SCHEDULE if thresholdRuntime(taskID) says that we should not
+ // considering speculating this task
+ // returns ALREADY_SPECULATING if that is true. This has priority.
+ // returns TOO_NEW if our companion task hasn't gotten any information
+ // returns PROGRESS_IS_GOOD if the task is sailing through
+ // returns NOT_RUNNING if the task is not running
+ //
+ // All of these values are negative. Any value that should be allowed to
+ // speculate is 0 or positive.
+ private long speculationValue(TaskID taskID, long now) {
+ Job job = context.getJob(taskID.jobID);
+ Task task = job.getTask(taskID);
+ Map<TaskAttemptID, TaskAttempt> attempts = task.getAttempts();
+ long acceptableRuntime = Long.MIN_VALUE;
+ long result = Long.MIN_VALUE;
+
+ if (!mayHaveSpeculated.contains(taskID)) {
+ acceptableRuntime = estimator.thresholdRuntime(taskID);
+ if (acceptableRuntime == Long.MAX_VALUE) {
+ return ON_SCHEDULE;
+ }
+ }
+
+ TaskAttemptID runningTaskAttemptID = null;
+
+ int numberRunningAttempts = 0;
+
+ for (TaskAttempt taskAttempt : attempts.values()) {
+ if (taskAttempt.getState() == TaskAttemptState.RUNNING
+ || taskAttempt.getState() == TaskAttemptState.ASSIGNED) {
+ if (++numberRunningAttempts > 1) {
+ return ALREADY_SPECULATING;
+ }
+ runningTaskAttemptID = taskAttempt.getID();
+
+ long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);
+
+ long taskAttemptStartTime
+ = estimator.attemptEnrolledTime(runningTaskAttemptID);
+
+ if (taskAttemptStartTime > now) {
+ // This background process ran before we could process the task
+ // attempt status change that chronicles the attempt start
+ return TOO_NEW;
+ }
+
+ long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;
+
+ long estimatedReplacementEndTime
+ = now + estimator.estimatedNewAttemptRuntime(taskID);
+
+ if (estimatedEndTime < now) {
+ return PROGRESS_IS_GOOD;
+ }
+
+ if (estimatedReplacementEndTime >= estimatedEndTime) {
+ return TOO_LATE_TO_SPECULATE;
+ }
+
+ result = estimatedEndTime - estimatedReplacementEndTime;
+ }
+ }
+
+ // If we are here, there's at most one task attempt.
+ if (numberRunningAttempts == 0) {
+ return NOT_RUNNING;
+ }
+
+
+
+ if (acceptableRuntime == Long.MIN_VALUE) {
+ acceptableRuntime = estimator.thresholdRuntime(taskID);
+ if (acceptableRuntime == Long.MAX_VALUE) {
+ return ON_SCHEDULE;
+ }
+ }
+
+ return result;
+ }
+
+ //Add attempt to a given Task.
+ protected void addSpeculativeAttempt(TaskID taskID) {
+ System.out.println
+ ("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
+ eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_ADD_SPEC_ATTEMPT));
+ mayHaveSpeculated.add(taskID);
+ }
+
+ @Override
+ public void handle(SpeculatorEvent event) {
+ processSpeculatorEvent(event);
+ }
+
+
+ private int maybeScheduleAMapSpeculation() {
+ return maybeScheduleASpeculation(TaskType.MAP);
+ }
+
+ private int maybeScheduleAReduceSpeculation() {
+ return maybeScheduleASpeculation(TaskType.REDUCE);
+ }
+
+ private int maybeScheduleASpeculation(TaskType type) {
+ int successes = 0;
+
+ long now = clock.getTime();
+
+ ConcurrentMap<JobID, AtomicInteger> containerNeeds
+ = type == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;
+
+ for (ConcurrentMap.Entry<JobID, AtomicInteger> jobEntry : containerNeeds.entrySet()) {
+ // This race conditon is okay. If we skip a speculation attempt we
+ // should have tried because the event that lowers the number of
+ // containers needed to zero hasn't come through, it will next time.
+ // Also, if we miss the fact that the number of containers needed was
+ // zero but increased due to a failure it's not too bad to launch one
+ // container prematurely.
+ if (jobEntry.getValue().get() > 0) {
+ break;
+ }
+
+ int numberSpeculationsAlready = 0;
+ int numberRunningTasks = 0;
+
+ // loop through the tasks of the kind
+ Job job = context.getJob(jobEntry.getKey());
+
+ Map<TaskID, Task> tasks = job.getTasks(type);
+
+ int numberAllowedSpeculativeTasks
+ = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
+ PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());
+
+ TaskID bestTaskID = null;
+ long bestSpeculationValue = -1L;
+
+ // this loop is potentially pricey.
+ // TODO track the tasks that are potentially worth looking at
+ for (Map.Entry<TaskID, Task> taskEntry : tasks.entrySet()) {
+ long mySpeculationValue = speculationValue(taskEntry.getKey(), now);
+
+ if (mySpeculationValue == ALREADY_SPECULATING) {
+ ++numberSpeculationsAlready;
+ }
+
+ if (mySpeculationValue != NOT_RUNNING) {
+ ++numberRunningTasks;
+ }
+
+ if (mySpeculationValue > bestSpeculationValue) {
+ bestTaskID = taskEntry.getKey();
+ bestSpeculationValue = mySpeculationValue;
+ }
+ }
+
+ numberAllowedSpeculativeTasks
+ = (int) Math.max(numberAllowedSpeculativeTasks,
+ PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);
+
+ // If we found a speculation target, fire it off
+ if (bestTaskID != null
+ && numberAllowedSpeculativeTasks > numberSpeculationsAlready) {
+ addSpeculativeAttempt(bestTaskID);
+ ++successes;
+ }
+ }
+
+ return successes;
+ }
+
+ private int computeSpeculations() {
+ // We'll try to issue one map and one reduce speculation per job per run
+ return maybeScheduleAMapSpeculation() + maybeScheduleAReduceSpeculation();
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,194 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.speculate;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+
+/*
+ * This estimator exponentially smooths the rate of progress vrs. wallclock
+ * time. Conceivably we could write an estimator that smooths time per
+ * unit progress, and get different results.
+ */
+public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase {
+
+ private final ConcurrentMap<TaskAttemptID, AtomicReference<EstimateVector>> estimates
+ = new ConcurrentHashMap<TaskAttemptID, AtomicReference<EstimateVector>>();
+
+ private SmoothedValue smoothedValue;
+
+ private long lambda;
+
+ public enum SmoothedValue {
+ RATE, TIME_PER_UNIT_PROGRESS
+ }
+
+ ExponentiallySmoothedTaskRuntimeEstimator
+ (long lambda, SmoothedValue smoothedValue) {
+ super();
+ this.smoothedValue = smoothedValue;
+ this.lambda = lambda;
+ }
+
+ public ExponentiallySmoothedTaskRuntimeEstimator() {
+ super();
+ }
+
+ // immutable
+ private class EstimateVector {
+ final double value;
+ final float basedOnProgress;
+ final long atTime;
+
+ EstimateVector
+ (double value, float basedOnProgress, long atTime) {
+ this.value = value;
+ this.basedOnProgress = basedOnProgress;
+ this.atTime = atTime;
+ }
+
+ EstimateVector incorporate(float newProgress, long newAtTime) {
+ if (newAtTime <= atTime || newProgress < basedOnProgress) {
+ return this;
+ }
+
+ double oldWeighting
+ = value < 0.0
+ ? 0.0 : Math.exp(((double) (newAtTime - atTime)) / lambda);
+
+ double newRead = (newProgress - basedOnProgress) / (newAtTime - atTime);
+
+ if (smoothedValue == SmoothedValue.TIME_PER_UNIT_PROGRESS) {
+ newRead = 1.0 / newRead;
+ }
+
+ return new EstimateVector
+ (value * oldWeighting + newRead * (1.0 - oldWeighting),
+ newProgress, newAtTime);
+ }
+ }
+
+ private void incorporateReading
+ (TaskAttemptID attemptID, float newProgress, long newTime) {
+ AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
+
+ if (vectorRef == null) {
+ estimates.putIfAbsent(attemptID, new AtomicReference<EstimateVector>(null));
+ incorporateReading(attemptID, newProgress, newTime);
+ }
+
+ EstimateVector oldVector = vectorRef.get();
+
+ if (oldVector == null) {
+ if (vectorRef.compareAndSet(null,
+ new EstimateVector(-1.0, 0.0F, Long.MIN_VALUE))) {
+ return;
+ }
+
+ incorporateReading(attemptID, newProgress, newTime);
+ }
+
+ while (!vectorRef.compareAndSet
+ (oldVector, oldVector.incorporate(newProgress, newTime))) {
+ oldVector = vectorRef.get();
+ }
+ }
+
+ private EstimateVector getEstimateVector(TaskAttemptID attemptID) {
+ AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
+
+ if (vectorRef == null) {
+ return null;
+ }
+
+ return vectorRef.get();
+ }
+
+ private static final long DEFAULT_EXPONENTIAL_SMOOTHING_LAMBDA_MILLISECONDS
+ = 1000L * 60;
+
+ @Override
+ public void contextualize(Configuration conf, AppContext context) {
+ super.contextualize(conf, context);
+
+ lambda
+ = conf.getLong(YarnMRJobConfig.EXPONENTIAL_SMOOTHING_LAMBDA_MILLISECONDS,
+ DEFAULT_EXPONENTIAL_SMOOTHING_LAMBDA_MILLISECONDS);
+ smoothedValue
+ = conf.getBoolean(YarnMRJobConfig.EXPONENTIAL_SMOOTHING_SMOOTH_RATE, true)
+ ? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS;
+ }
+
+ @Override
+ public long estimatedRuntime(TaskAttemptID id) {
+ Long startTime = startTimes.get(id);
+
+ if (startTime == null) {
+ return -1L;
+ }
+
+ EstimateVector vector = getEstimateVector(id);
+
+ if (vector == null) {
+ return -1L;
+ }
+
+ long sunkTime = vector.atTime - startTime;
+
+ double value = vector.value;
+ float progress = vector.basedOnProgress;
+
+ if (value == 0) {
+ return -1L;
+ }
+
+ double rate = smoothedValue == SmoothedValue.RATE ? value : 1.0 / value;
+
+ if (rate == 0.0) {
+ return -1L;
+ }
+
+ double remainingTime = (1.0 - progress) / rate;
+
+ return sunkTime + (long)remainingTime;
+ }
+
+ @Override
+ public long runtimeEstimateVariance(TaskAttemptID id) {
+ return -1L;
+ }
+
+ @Override
+ public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+ TaskAttemptID attemptID = status.id;
+
+ float progress = status.progress;
+
+ incorporateReading(attemptID, progress, timestamp);
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/LegacyTaskRuntimeEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/LegacyTaskRuntimeEstimator.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/LegacyTaskRuntimeEstimator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/LegacyTaskRuntimeEstimator.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,156 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.speculate;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+
+
+
+public class LegacyTaskRuntimeEstimator extends StartEndTimesBase {
+
+ private final Map<TaskAttempt, AtomicLong> attemptRuntimeEstimates
+ = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
+ private final Map<TaskAttempt, AtomicLong> attemptRuntimeEstimateVariances
+ = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
+
+
+ @Override
+ public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+ super.updateAttempt(status, timestamp);
+
+ String stateString = status.stateString.toString();
+
+ TaskAttemptID attemptID = status.id;
+ TaskID taskID = attemptID.taskID;
+ JobID jobID = taskID.jobID;
+ Job job = context.getJob(jobID);
+
+ if (job == null) {
+ return;
+ }
+
+ Task task = job.getTask(taskID);
+
+ if (task == null) {
+ return;
+ }
+
+ TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+ if (taskAttempt == null) {
+ return;
+ }
+
+ Long boxedStart = startTimes.get(attemptID);
+ long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
+
+ // We need to do two things.
+ // 1: If this is a completion, we accumulate statistics in the superclass
+ // 2: If this is not a completion, we learn more about it.
+
+ // This is not a completion, but we're cooking.
+ //
+ if (stateString.equals(TaskAttemptState.RUNNING.name())) {
+ // See if this task is already in the registry
+ AtomicLong estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
+ AtomicLong estimateVarianceContainer
+ = attemptRuntimeEstimateVariances.get(taskAttempt);
+
+ if (estimateContainer == null) {
+ synchronized (attemptRuntimeEstimates) {
+ if (attemptRuntimeEstimates.get(taskAttempt) == null) {
+ attemptRuntimeEstimates.put(taskAttempt, new AtomicLong());
+ }
+
+ estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
+ }
+ }
+
+ if (estimateVarianceContainer == null) {
+ synchronized (attemptRuntimeEstimateVariances) {
+ if (attemptRuntimeEstimateVariances.get(taskAttempt) == null) {
+ attemptRuntimeEstimateVariances.put(taskAttempt, new AtomicLong());
+ }
+
+ estimateVarianceContainer
+ = attemptRuntimeEstimateVariances.get(taskAttempt);
+ }
+ }
+
+ long estimate = -1;
+ long varianceEstimate = -1;
+
+ // This code assumes that we'll never consider starting a third
+ // speculative task attempt if two are already running for this task
+ if (start > 0 && timestamp > start) {
+ estimate = (long) ((timestamp - start) / Math.max(0.0001, status.progress));
+ varianceEstimate = (long) (estimate * status.progress / 10);
+ }
+
+ estimateContainer.set(estimate);
+ estimateVarianceContainer.set(varianceEstimate);
+ }
+ }
+
+ private long storedPerAttemptValue
+ (Map<TaskAttempt, AtomicLong> data, TaskAttemptID attemptID) {
+ TaskID taskID = attemptID.taskID;
+ JobID jobID = taskID.jobID;
+ Job job = context.getJob(jobID);
+
+ Task task = job.getTask(taskID);
+
+ if (task == null) {
+ return -1L;
+ }
+
+ TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+ if (taskAttempt == null) {
+ return -1L;
+ }
+
+ AtomicLong estimate = data.get(taskAttempt);
+
+ return estimate == null ? -1L : estimate.get();
+
+ }
+
+ @Override
+ public long estimatedRuntime(TaskAttemptID attemptID) {
+ return storedPerAttemptValue(attemptRuntimeEstimates, attemptID);
+ }
+
+ @Override
+ public long runtimeEstimateVariance(TaskAttemptID attemptID) {
+ return storedPerAttemptValue(attemptRuntimeEstimateVariances, attemptID);
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,72 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.speculate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+
+/*
+ * This class is provided solely as an exemplae of the values that mean
+ * that nothing needs to be computed. It's not currently used.
+ */
+public class NullTaskRuntimesEngine implements TaskRuntimeEstimator {
+ @Override
+ public void enrollAttempt(TaskAttemptStatus status, long timestamp) {
+ // no code
+ }
+
+ @Override
+ public long attemptEnrolledTime(TaskAttemptID attemptID) {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+ // no code
+ }
+
+ @Override
+ public void contextualize(Configuration conf, AppContext context) {
+ // no code
+ }
+
+ @Override
+ public long thresholdRuntime(TaskID id) {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public long estimatedRuntime(TaskAttemptID id) {
+ return -1L;
+ }
+ @Override
+ public long estimatedNewAttemptRuntime(TaskID id) {
+ return -1L;
+ }
+
+ @Override
+ public long runtimeEstimateVariance(TaskAttemptID id) {
+ return -1L;
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/Speculator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/Speculator.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/Speculator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/Speculator.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,44 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.speculate;
+
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+/**
+ * Speculator component. Task Attempts' status updates are sent to this
+ * component. Concrete implementation runs the speculative algorithm and
+ * sends the TaskEventType.T_ADD_ATTEMPT.
+ *
+ * An implementation also has to arrange for the jobs to be scanned from
+ * time to time, to launch the speculations.
+ */
+public interface Speculator
+ extends EventHandler<SpeculatorEvent> {
+
+ enum EventType {
+ ATTEMPT_STATUS_UPDATE,
+ ATTEMPT_START,
+ TASK_CONTAINER_NEED_UPDATE
+ }
+
+ // This will be implemented if we go to a model where the events are
+ // processed within the TaskAttempts' state transitions' code.
+ public void handleAttempt(TaskAttemptStatus status);
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SpeculatorEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SpeculatorEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SpeculatorEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SpeculatorEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,74 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.speculate;
+
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+
+public class SpeculatorEvent extends AbstractEvent<Speculator.EventType> {
+
+ // valid for ATTEMPT_STATUS_UPDATE
+ private TaskAttemptStatus reportedStatus;
+
+ // valid for TASK_CONTAINER_NEED_UPDATE
+ private TaskID taskID;
+ private int containersNeededChange;
+
+
+
+ public SpeculatorEvent(TaskAttemptStatus reportedStatus, long timestamp) {
+ super(Speculator.EventType.ATTEMPT_STATUS_UPDATE, timestamp);
+ this.reportedStatus = reportedStatus;
+ }
+
+ public SpeculatorEvent(TaskAttemptID attemptID, boolean flag, long timestamp) {
+ super(Speculator.EventType.ATTEMPT_START, timestamp);
+ this.reportedStatus = new TaskAttemptStatus();
+ this.reportedStatus.id = attemptID;
+ }
+
+ /*
+ * This c'tor creates a TASK_CONTAINER_NEED_UPDATE event .
+ * We send a +1 event when a task enters a state where it wants a container,
+ * and a -1 event when it either gets one or withdraws the request.
+ * The per job sum of all these events is the number of containers requested
+ * but not granted. The intent is that we only do speculations when the
+ * speculation wouldn't compete for containers with tasks which need
+ * to be run.
+ */
+ public SpeculatorEvent(TaskID taskID, int containersNeededChange) {
+ super(Speculator.EventType.TASK_CONTAINER_NEED_UPDATE);
+ this.taskID = taskID;
+ this.containersNeededChange = containersNeededChange;
+ }
+
+ public TaskAttemptStatus getReportedStatus() {
+ return reportedStatus;
+ }
+
+ public int containersNeededChange() {
+ return containersNeededChange;
+ }
+
+ public TaskID getTaskID() {
+ return taskID;
+ }
+}
\ No newline at end of file