You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/03/17 21:21:54 UTC

svn commit: r1082677 [6/38] - in /hadoop/mapreduce/branches/MR-279: ./ assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/ mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapreduc...

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,239 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.launcher;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMasterConstants;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerManager;
+import org.apache.hadoop.yarn.ContainerToken;
+
+/**
+ * This class is responsible for launching of containers.
+ */
+public class ContainerLauncherImpl extends AbstractService implements
+    ContainerLauncher {
+
+  private static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class);
+
+  private AppContext context;
+  private ThreadPoolExecutor launcherPool;
+  private Thread eventHandlingThread;
+  private BlockingQueue<ContainerLauncherEvent> eventQueue =
+      new LinkedBlockingQueue<ContainerLauncherEvent>();
+
+  public ContainerLauncherImpl(AppContext context) {
+    super(ContainerLauncherImpl.class.getName());
+    this.context = context;
+  }
+
+  @Override
+  public synchronized void init(Configuration conf) {
+    // Clone configuration for this component so that the SecurityInfo setting
+    // doesn't affect the original configuration
+    Configuration myLocalConfig = new Configuration(conf);
+    myLocalConfig.setClass(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_INFO_CLASS_NAME,
+        ContainerManagerSecurityInfo.class, SecurityInfo.class);
+    super.init(myLocalConfig);
+  }
+
+  public void start() {
+    launcherPool =
+        new ThreadPoolExecutor(getConfig().getInt(
+            MRAppMasterConstants.CONTAINERLAUNCHER_THREADPOOL_SIZE, 10),
+            Integer.MAX_VALUE, 1, TimeUnit.HOURS,
+            new LinkedBlockingQueue<Runnable>());
+    launcherPool.prestartAllCoreThreads(); // Wait for work.
+    eventHandlingThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        ContainerLauncherEvent event = null;
+        while (!Thread.currentThread().isInterrupted()) {
+          try {
+            event = eventQueue.take();
+          } catch (InterruptedException e) {
+            LOG.error("Returning, interrupted : " + e);
+            return;
+          }
+          // the events from the queue are handled in parallel
+          // using a thread pool
+          launcherPool.execute(new EventProcessor(event));
+
+          // TODO: Group launching of multiple containers to a single
+          // NodeManager into a single connection
+        }
+      }
+    });
+    eventHandlingThread.start();
+    super.start();
+  }
+
+  public void stop() {
+    eventHandlingThread.interrupt();
+    launcherPool.shutdown();
+    super.stop();
+  }
+
+  protected ContainerManager getCMProxy(ContainerID containerID,
+      final String containerManagerBindAddr, ContainerToken containerToken)
+      throws IOException {
+
+    UserGroupInformation user = UserGroupInformation.getLoginUser();
+    if (UserGroupInformation.isSecurityEnabled()) {
+      Token<ContainerTokenIdentifier> token =
+          new Token<ContainerTokenIdentifier>(
+              containerToken.identifier.array(),
+              containerToken.password.array(), new Text(
+                  containerToken.kind.toString()), new Text(
+                  containerToken.service.toString()));
+      user.addToken(token);
+    }
+    ContainerManager proxy =
+        user.doAs(new PrivilegedAction<ContainerManager>() {
+          @Override
+          public ContainerManager run() {
+            YarnRPC rpc = YarnRPC.create(getConfig());
+            return (ContainerManager) rpc.getProxy(ContainerManager.class,
+                NetUtils.createSocketAddr(containerManagerBindAddr),
+                getConfig());
+          }
+        });
+    return proxy;
+  }
+
+  /**
+   * Setup and start the container on remote nodemanager.
+   */
+  private class EventProcessor implements Runnable {
+    private ContainerLauncherEvent event;
+
+    EventProcessor(ContainerLauncherEvent event) {
+      this.event = event;
+    }
+
+    @Override
+    public void run() {
+      LOG.info("Processing the event " + event.toString());
+
+      // Load ContainerManager tokens before creating a connection.
+      // TODO: Do it only once per NodeManager.
+      final String containerManagerBindAddr = event.getContainerMgrAddress();
+      ContainerID containerID = event.getContainerID();
+      ContainerToken containerToken = event.getContainerToken();
+
+      switch(event.getType()) {
+
+      case CONTAINER_REMOTE_LAUNCH:
+        ContainerRemoteLaunchEvent launchEv = (ContainerRemoteLaunchEvent) event;
+
+        try {
+          
+          ContainerManager proxy = 
+            getCMProxy(containerID, containerManagerBindAddr, containerToken);
+          
+          // Construct the actual Container
+          ContainerLaunchContext containerLaunchContext =
+              launchEv.getContainer();
+
+          // TODO: Make sure that child's mapred-local-dir is set correctly.
+
+          // Now launch the actual container
+          proxy.startContainer(containerLaunchContext);
+
+          // after launching send launched event to taskattempt
+          context.getEventHandler().handle(
+              new TaskAttemptEvent(launchEv.getTaskAttemptID(),
+                  TaskAttemptEventType.TA_CONTAINER_LAUNCHED));
+        } catch (Exception e) {
+          LOG.error("Container launch failed", e);
+          context.getEventHandler().handle(
+              new TaskAttemptEvent(launchEv.getTaskAttemptID(),
+                  TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
+        }
+
+        break;
+      case CONTAINER_REMOTE_CLEANUP:
+
+        // We will have to remove the launch event if it is still in eventQueue
+        // and not yet processed
+        if (eventQueue.contains(event)) {
+          eventQueue.remove(event); // TODO: Any synchro needed?
+          // TODO: raise any event?
+        } else {
+          try {
+            ContainerManager proxy = 
+              getCMProxy(containerID, containerManagerBindAddr, containerToken);
+            // TODO:check whether container is launched
+
+            // kill the remote container if already launched
+            proxy.stopContainer(event.getContainerID());
+            proxy.cleanupContainer(event.getContainerID());
+          } catch (Exception e) {
+            //ignore the cleanup failure
+            LOG.warn("cleanup failed for container " + event.getContainerID() ,
+                e);
+          }
+
+            // after killing send killed event to taskattempt
+            context.getEventHandler().handle(
+                new TaskAttemptEvent(event.getTaskAttemptID(),
+                    TaskAttemptEventType.TA_CONTAINER_CLEANED));
+        }
+        break;
+      }
+    }
+    
+  }
+
+  @Override
+  public void handle(ContainerLauncherEvent event) {
+    try {
+      eventQueue.put(event);
+    } catch (InterruptedException e) {
+      throw new YarnException(e);
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,40 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.launcher;
+
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerToken;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+public abstract class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
+
+  public ContainerRemoteLaunchEvent(TaskAttemptID taskAttemptID,
+      ContainerID containerID, String containerMgrAddress,
+      ContainerToken containerToken) {
+    super(taskAttemptID, containerID, containerMgrAddress,
+        containerToken,
+        ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH);
+  }
+  public abstract ContainerLaunchContext getContainer();
+
+  public abstract Task getRemoteTask();
+
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocator.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocator.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,31 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface ContainerAllocator extends EventHandler<ContainerAllocatorEvent>{
+
+  enum EventType {
+
+    CONTAINER_REQ,
+    CONTAINER_DEALLOCATE
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocatorEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocatorEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocatorEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocatorEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+public class ContainerAllocatorEvent extends 
+    AbstractEvent<ContainerAllocator.EventType> {
+  
+  private TaskAttemptID attemptID;
+
+  public ContainerAllocatorEvent(TaskAttemptID attemptID,
+      ContainerAllocator.EventType type) {
+    super(type);
+    this.attemptID = attemptID;
+  }
+
+  public TaskAttemptID getAttemptID() {
+    return attemptID;
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+public class ContainerRequestEvent extends ContainerAllocatorEvent {
+  
+  private Priority priority;
+  private Resource capability;
+  private String[] hosts;
+  private String[] racks;
+
+  public ContainerRequestEvent(TaskAttemptID attemptID, 
+      Resource capability, int priority,
+      String[] hosts, String[] racks) {
+    super(attemptID, ContainerAllocator.EventType.CONTAINER_REQ);
+    this.capability = capability;
+    this.priority = new Priority();
+    this.priority.priority = priority;
+    this.hosts = hosts;
+    this.racks = racks;
+  }
+
+  public Resource getCapability() {
+    return capability;
+  }
+
+  public Priority getPriority() {
+    return priority;
+  }
+
+  public String[] getHosts() {
+    return hosts;
+  }
+  
+  public String[] getRacks() {
+    return racks;
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,471 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.AMRMProtocol;
+import org.apache.hadoop.yarn.AMResponse;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationState;
+import org.apache.hadoop.yarn.ApplicationStatus;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerState;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceRequest;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+/**
+ * Allocates the container from the ResourceManager scheduler.
+ */
+public class RMContainerAllocator extends AbstractService 
+implements ContainerAllocator {
+  private static final Log LOG = 
+    LogFactory.getLog(RMContainerAllocator.class);
+  private static final String ANY = "*";
+  private static int rmPollInterval;//millis
+  private ApplicationID applicationId;
+  private EventHandler eventHandler;
+  private volatile boolean stopped;
+  protected Thread allocatorThread;
+  private ApplicationMaster applicationMaster;
+  private AMRMProtocol scheduler;
+  private final ClientService clientService;
+  private int lastResponseID = 0;
+
+  //mapping for assigned containers
+  private final Map<ContainerID, TaskAttemptID> assignedMap = 
+    new HashMap<ContainerID, TaskAttemptID>();
+
+  private final Map<Priority, 
+  Map<Resource,LinkedList<ContainerRequestEvent>>> localRequestsQueue = 
+    new HashMap<Priority, Map<Resource,LinkedList<ContainerRequestEvent>>>();
+
+  //Key -> Priority
+  //Value -> Map
+  //Key->ResourceName (eg. hostname, rackname, *)
+  //Value->Map
+  //Key->Resource Capability
+  //Value->ResourceReqeust
+  private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>> 
+  remoteRequestsTable = 
+    new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
+
+
+  private final Set<ResourceRequest> ask =new TreeSet<ResourceRequest>();
+  private final Set<Container> release = new TreeSet<Container>();
+
+  public RMContainerAllocator(ClientService clientService, AppContext context) {
+    super("RMContainerAllocator");
+    this.clientService = clientService;
+    this.applicationId = context.getApplicationID();
+    this.eventHandler = context.getEventHandler();
+    this.applicationMaster = new ApplicationMaster();
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    rmPollInterval = conf.getInt(YarnConfiguration.AM_EXPIRY_INTERVAL, 10000)/3;
+  }
+
+  @Override
+  public void start() {
+    scheduler= createSchedulerProxy();
+    //LOG.info("Scheduler is " + scheduler);
+    register();
+    startAllocatorThread();
+    super.start();
+  }
+
+  protected void register() {
+    //Register
+    applicationMaster.applicationId = applicationId;
+    applicationMaster.host =
+      clientService.getBindAddress().getAddress().getHostAddress();
+    applicationMaster.rpcPort = clientService.getBindAddress().getPort();
+    applicationMaster.state = ApplicationState.RUNNING;
+    applicationMaster.httpPort = clientService.getHttpPort();
+    applicationMaster.status = new ApplicationStatus();
+    applicationMaster.status.applicationId = applicationId;
+    applicationMaster.status.progress = 0.0f;
+    try {
+      scheduler.registerApplicationMaster(applicationMaster);
+    } catch(Exception are) {
+      LOG.info("Exception while registering", are);
+      throw new YarnException(are);
+    }
+  }
+
+  protected void unregister() {
+    try {
+      applicationMaster.state = ApplicationState.COMPLETED;
+      scheduler.finishApplicationMaster(applicationMaster);
+    } catch(Exception are) {
+      LOG.info("Error while unregistering ", are);
+    }
+  }
+
+  @Override
+  public void stop() {
+    stopped = true;
+    allocatorThread.interrupt();
+    try {
+      allocatorThread.join();
+    } catch (InterruptedException ie) {
+      LOG.info("Interruped Exception while stopping", ie);
+    }
+    unregister();
+    super.stop();
+  }
+
+  protected void startAllocatorThread() {
+    allocatorThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (!stopped && !Thread.currentThread().isInterrupted()) {
+          try {
+            Thread.sleep(rmPollInterval);
+            try {
+              allocate();
+            } catch (Exception e) {
+              LOG.error("ERROR IN CONTACTING RM.", e);
+            }
+          } catch (InterruptedException e) {
+            LOG.info("Allocated thread interrupted. Returning");
+            return;
+          }
+        }
+      }
+    });
+    allocatorThread.start();
+  }
+
+  protected AMRMProtocol createSchedulerProxy() {
+    final YarnRPC rpc = YarnRPC.create(getConfig());
+    final Configuration conf = new Configuration(getConfig());
+    final String serviceAddr = conf.get(
+        YarnConfiguration.SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_SCHEDULER_BIND_ADDRESS);
+
+    UserGroupInformation currentUser;
+    try {
+      currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+          SchedulerSecurityInfo.class, SecurityInfo.class);
+
+      String tokenURLEncodedStr =
+        System.getenv().get(
+            YarnConfiguration.APPLICATION_MASTER_TOKEN_ENV_NAME);
+      LOG.debug("AppMasterToken is " + tokenURLEncodedStr);
+      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+
+      try {
+        token.decodeFromUrlString(tokenURLEncodedStr);
+      } catch (IOException e) {
+        throw new YarnException(e);
+      }
+
+      currentUser.addToken(token);
+    }
+
+    return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+      @Override
+      public AMRMProtocol run() {
+        return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
+            NetUtils.createSocketAddr(serviceAddr), conf);
+      }
+    });       
+  }
+
+  // TODO: Need finer synchronization.
+  protected synchronized void allocate() throws Exception {
+    assign(getResources());
+  }
+
+  @Override
+  public synchronized void handle(ContainerAllocatorEvent event) {
+    LOG.info("Processing the event " + event.toString());
+    //TODO: can be replaced by switch instead of if-else
+    if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
+      requestContainer((ContainerRequestEvent) event);
+    } else if (
+        event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
+      //TODO: handle deallocation
+    }
+  }
+
+  protected synchronized void requestContainer(ContainerRequestEvent event) {
+    //add to the localRequestsQueue
+    //localRequests Queue is hashed by Resource and Priority for easy lookups
+    Map<Resource, LinkedList<ContainerRequestEvent>> eventMap =
+      this.localRequestsQueue.get(event.getPriority());
+    if (eventMap == null) {
+      eventMap = new HashMap<Resource, LinkedList<ContainerRequestEvent>>();
+      this.localRequestsQueue.put(event.getPriority(), eventMap);
+    }
+
+    LinkedList<ContainerRequestEvent> eventList =
+      eventMap.get(event.getCapability());
+    if (eventList == null) {
+      eventList = new LinkedList<ContainerRequestEvent>();
+      eventMap.put(event.getCapability(), eventList);
+    }
+    eventList.add(event);
+
+    // Create resource requests
+    for (String host : event.getHosts()) {
+      // Data-local
+      addResourceRequest(event.getPriority(), host, event.getCapability());
+    }
+
+    // Nothing Rack-local for now
+    for (String rack : event.getRacks()) {
+      addResourceRequest(event.getPriority(), rack, event.getCapability());
+    }
+
+    // Off-switch
+    addResourceRequest(event.getPriority(), ANY, event.getCapability());
+
+  }
+
+  private void addResourceRequest(Priority priority, String resourceName,
+      Resource capability) {
+    Map<String, Map<Resource, ResourceRequest>> remoteRequests = 
+      this.remoteRequestsTable.get(priority);
+    if (remoteRequests == null) {
+      remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
+      this.remoteRequestsTable.put(priority, remoteRequests);
+      LOG.info("Added priority=" + priority);
+    }
+    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+    if (reqMap == null) {
+      reqMap = new HashMap<Resource, ResourceRequest>();
+      remoteRequests.put(resourceName, reqMap);
+    }
+    ResourceRequest remoteRequest = reqMap.get(capability);
+    if (remoteRequest == null) {
+      remoteRequest = new ResourceRequest();
+      remoteRequest.priority = priority;
+      remoteRequest.hostName = resourceName;
+      remoteRequest.capability = capability;
+      remoteRequest.numContainers = 0;
+      reqMap.put(capability, remoteRequest);
+    }
+    remoteRequest.numContainers++;
+
+    // Note this down for next interaction with ResourceManager
+    ask.add(remoteRequest);
+    LOG.info("addResourceRequest:" + " applicationId=" + applicationId.id
+        + " priority=" + priority.priority + " resourceName=" + resourceName
+        + " numContainers=" + remoteRequest.numContainers + " #asks="
+        + ask.size());
+  }
+
+  private void decResourceRequest(Priority priority, String resourceName,
+      Resource capability) {
+    Map<String, Map<Resource, ResourceRequest>> remoteRequests = 
+      this.remoteRequestsTable.get(priority);
+    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+    ResourceRequest remoteRequest = reqMap.get(capability);
+
+    LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.id
+        + " priority=" + priority.priority + " resourceName=" + resourceName
+        + " numContainers=" + remoteRequest.numContainers + " #asks="
+        + ask.size());
+
+    remoteRequest.numContainers--;
+    if (remoteRequest.numContainers == 0) {
+      reqMap.remove(capability);
+      if (reqMap.size() == 0) {
+        remoteRequests.remove(resourceName);
+      }
+      if (remoteRequests.size() == 0) {
+        remoteRequestsTable.remove(priority);
+      }
+      //remove from ask if it may have
+      ask.remove(remoteRequest); 
+    } else {
+      ask.add(remoteRequest);//this will override the request if ask doesn't
+      //already have it.
+    }
+
+    LOG.info("AFTER decResourceRequest:" + " applicationId=" + applicationId.id
+        + " priority=" + priority.priority + " resourceName=" + resourceName
+        + " numContainers=" + remoteRequest.numContainers + " #asks="
+        + ask.size());
+  }
+
+  private List<Container> getResources() throws Exception {
+    ApplicationStatus status = new ApplicationStatus();
+    status.applicationId = applicationId;
+    status.responseID = lastResponseID;
+    AMResponse response = 
+      scheduler.allocate(status, 
+          new ArrayList(ask), new ArrayList(release));
+    lastResponseID = response.responseId;
+    List<Container> allContainers = response.containers;
+    ask.clear();
+    release.clear();
+
+    LOG.info("getResources() for " + applicationId + ":" +
+        " ask=" + ask.size() + 
+        " release= "+ release.size() + 
+        " recieved=" + allContainers.size());
+    List<Container> allocatedContainers = new ArrayList<Container>();
+    for (Container cont : allContainers) {
+      if (cont.state != ContainerState.COMPLETE) {
+        allocatedContainers.add(cont);
+        LOG.debug("Received Container :" + cont);
+      } else {
+        LOG.info("Received completed container " + cont);
+        TaskAttemptID attemptID = assignedMap.remove(cont.id);
+        if (attemptID == null) {
+          LOG.error("Container complete event for unknown container id " + 
+              cont.id);
+        } else {
+          //send the container completed event to Task attempt
+          eventHandler.handle(new TaskAttemptEvent(attemptID, 
+              TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+        }
+      }
+      LOG.debug("Received Container :" + cont);
+    }
+    return allocatedContainers;
+  }
+
+  private void assign(List<Container> allocatedContainers) {
+    // Schedule in priority order
+    for (Priority priority : localRequestsQueue.keySet()) {
+      LOG.info("Assigning for priority " + priority); 
+      assign(priority, allocatedContainers);
+      if (allocatedContainers.isEmpty()) { 
+        break;
+      }
+    }
+
+    if (!allocatedContainers.isEmpty()) {
+      //TODO
+      //after the assigment, still containers are left
+      //This can happen if container requests are cancelled by AM, currently
+      //not there. release the unassigned containers??
+
+      //LOG.info("Releasing container " + allocatedContainer);
+      //release.add(allocatedContainer);
+    }
+  }
+
+  private void assign(Priority priority, List<Container> allocatedContainers) {
+    for (Iterator<Container> i=allocatedContainers.iterator(); i.hasNext();) {
+      Container allocatedContainer = i.next();
+      String host = allocatedContainer.hostName.toString();
+      Resource capability = allocatedContainer.resource;
+
+      LinkedList<ContainerRequestEvent> requestList = 
+        localRequestsQueue.get(priority).get(capability);
+
+      if (requestList == null) {
+        LOG.info("No request match at priority " + priority);
+        return;
+      }
+
+      ContainerRequestEvent assigned = null;
+      //walk thru the requestList to see if in any host matches
+      Iterator<ContainerRequestEvent> it = requestList.iterator();
+      while (it.hasNext()) {
+        ContainerRequestEvent event = it.next();
+        if (Arrays.asList(event.getHosts()).contains(host)) { // TODO: Fix
+          assigned = event;
+          it.remove();
+          // Update resource requests
+          for (String hostName : event.getHosts()) {
+            decResourceRequest(priority, hostName, capability);
+          }
+          break;
+        }
+      }
+      if (assigned == null) {//host didn't match
+        if (requestList.size() > 0) {
+          //choose the first one in queue
+          assigned = requestList.remove();
+        }
+      }
+
+      if (assigned != null) {
+
+        i.remove(); // Remove from allocated Containers list also.
+
+        // Update resource requests
+        decResourceRequest(priority, ANY, capability);
+
+        //send the container assigned event to Task attempt
+        eventHandler.handle(new TaskAttemptContainerAssignedEvent(assigned
+            .getAttemptID(), allocatedContainer.id,
+            allocatedContainer.hostName.toString(),
+            allocatedContainer.containerToken));
+
+        assignedMap.put(allocatedContainer.id, assigned.getAttemptID());
+
+        LOG.info("Assigned container (" + allocatedContainer + ") " +
+            " to task " + assigned.getAttemptID() + " at priority " + priority + 
+            " on node " + allocatedContainer.hostName.toString());
+      }
+    }
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/StaticContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/StaticContainerAllocator.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/StaticContainerAllocator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/StaticContainerAllocator.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,157 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.ContainerID;
+
+/**
+ * Reads the static list of NodeManager from config file and allocate 
+ * containers.
+ */
+public class StaticContainerAllocator extends AbstractService 
+    implements ContainerAllocator {
+
+  private static final Log LOG = 
+    LogFactory.getLog(StaticContainerAllocator.class);
+
+  private AppContext context;
+  private volatile boolean stopped;
+  private BlockingQueue<ContainerAllocatorEvent> eventQueue =
+      new LinkedBlockingQueue<ContainerAllocatorEvent>();
+  private Thread allocatorThread;
+
+  private int containerCount;
+
+  private List<String> containerMgrList;
+  private int nextIndex;
+
+  public StaticContainerAllocator(AppContext context) {
+    super("StaticContainerAllocator");
+    this.context = context;
+  }
+
+  protected List<String> getContainerMgrList(Configuration conf)
+      throws IOException {
+    Path jobSubmitDir = FileContext.getLocalFSFileContext().makeQualified(
+        new Path(new File(YARNApplicationConstants.JOB_SUBMIT_DIR).getAbsolutePath()));
+    Path jobConfFile = new Path(jobSubmitDir, YARNApplicationConstants.JOB_CONF_FILE);
+    conf.addResource(jobConfFile);
+    String[] containerMgrHosts = 
+      conf.getStrings(YARNApplicationConstants.NM_HOSTS_CONF_KEY);
+    return Arrays.asList(containerMgrHosts);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    try {
+      containerMgrList = getContainerMgrList(conf);
+    } catch (IOException e) {
+      throw new YarnException("Cannot get container-managers list ", e);
+    }
+
+    if (containerMgrList.size() == 0) {
+      throw new YarnException("No of Container Managers are zero.");
+    }
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    allocatorThread = new Thread(new Allocator());
+    allocatorThread.start();
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    stopped = true;
+    allocatorThread.interrupt();
+    try {
+      allocatorThread.join();
+    } catch (InterruptedException ie) {
+      LOG.debug("Interruped Exception while stopping", ie);
+    }
+    super.stop();
+  }
+
+  @Override
+  public void handle(ContainerAllocatorEvent event) {
+    try {
+      eventQueue.put(event);
+    } catch (InterruptedException e) {
+      throw new YarnException(e);
+    }
+  }
+
+  private class Allocator implements Runnable {
+    @Override
+    public void run() {
+      ContainerAllocatorEvent event = null;
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        try {
+          event = eventQueue.take();
+          LOG.info("Processing the event " + event.toString());
+          allocate(event);
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+    }
+
+    private void allocate(ContainerAllocatorEvent event) {
+      // allocate the container in round robin fashion on
+      // container managers
+      if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
+        if (nextIndex < containerMgrList.size()) {
+          String containerMgr = containerMgrList.get(nextIndex);
+          ContainerID containerID = generateContainerID();
+
+        context.getEventHandler().handle(
+            new TaskAttemptContainerAssignedEvent(
+                event.getAttemptID(),
+                containerID, containerMgr, null));
+        }
+      }
+    }
+
+    private ContainerID generateContainerID() {
+      ContainerID cId = new ContainerID();
+      cId.appID = context.getApplicationID();
+      cId.id = containerCount++;
+      return cId;
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,82 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.speculate;
+
+public class DataStatistics {
+  private int count = 0;
+  private double sum = 0;
+  private double sumSquares = 0;
+
+  public DataStatistics() {
+  }
+
+  public DataStatistics(double initNum) {
+    this.count = 1;
+    this.sum = initNum;
+    this.sumSquares = initNum * initNum;
+  }
+
+  public synchronized void add(double newNum) {
+    this.count++;
+    this.sum += newNum;
+    this.sumSquares += newNum * newNum;
+  }
+
+  public void updateStatistics(double old, double update) {
+    sub(old);
+    add(update);
+  }
+
+  private synchronized void sub(double oldNum) {
+    this.count--;
+    this.sum = Math.max(this.sum -= oldNum, 0.0d);
+    this.sumSquares = Math.max(this.sumSquares -= oldNum * oldNum, 0.0d);
+  }
+
+  public synchronized double mean() {
+    return count == 0 ? 0.0 : sum/count;
+  }
+
+  public synchronized double var() {
+    // E(X^2) - E(X)^2
+    double mean = mean();
+    return Math.max((sumSquares/count) - mean * mean, 0.0d);
+  }
+
+  public synchronized double std() {
+    return Math.sqrt(this.var());
+  }
+
+  public synchronized double outlier(float sigma) {
+    if (count != 0.0) {
+      return mean() + std() * sigma;
+    }
+
+    return 0.0;
+  }
+
+  public synchronized double count() {
+    return count;
+  }
+
+  public String toString() {
+    return "DataStatistics: count is " + count + ", sum is " + sum +
+    ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std();
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,503 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.speculate;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.Clock;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+public class DefaultSpeculator extends AbstractService implements
+    Speculator {
+
+  private static final long ON_SCHEDULE = Long.MIN_VALUE;
+  private static final long ALREADY_SPECULATING = Long.MIN_VALUE + 1;
+  private static final long TOO_NEW = Long.MIN_VALUE + 2;
+  private static final long PROGRESS_IS_GOOD = Long.MIN_VALUE + 3;
+  private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
+  private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;
+
+  private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L * 1L;
+  private static final long SOONEST_RETRY_AFTER_SPECULATE = 1000L * 15L;
+
+  private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1;
+  private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01;
+  private static final int  MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10;
+
+  private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class);
+
+  private final ConcurrentMap<TaskID, Boolean> runningTasks
+      = new ConcurrentHashMap<TaskID, Boolean>();
+
+  private final Map<Task, AtomicBoolean> pendingSpeculations
+      = new ConcurrentHashMap<Task, AtomicBoolean>();
+
+  // These are the current needs, not the initial needs.  For each job, these
+  //  record the number of attempts that exist and that are actively
+  //  waiting for a container [as opposed to running or finished]
+  private final ConcurrentMap<JobID, AtomicInteger> mapContainerNeeds
+      = new ConcurrentHashMap<JobID, AtomicInteger>();
+  private final ConcurrentMap<JobID, AtomicInteger> reduceContainerNeeds
+      = new ConcurrentHashMap<JobID, AtomicInteger>();
+
+  private final Set<TaskID> mayHaveSpeculated = new HashSet();
+
+  private final Configuration conf;
+  private AppContext context;
+  private Thread speculationBackgroundThread = null;
+  private BlockingQueue<SpeculatorEvent> eventQueue
+      = new LinkedBlockingQueue<SpeculatorEvent>();
+  private TaskRuntimeEstimator estimator;
+
+  private BlockingQueue<Object> scanControl = new LinkedBlockingQueue<Object>();
+
+  private final Clock clock;
+
+  private final EventHandler<TaskEvent> eventHandler;
+
+  public DefaultSpeculator(Configuration conf, AppContext context) {
+    this(conf, context, new Clock());
+  }
+
+  public DefaultSpeculator(Configuration conf, AppContext context, Clock clock) {
+    this(conf, context, getEstimator(conf, context), clock);
+  }
+  
+  static private TaskRuntimeEstimator getEstimator
+      (Configuration conf, AppContext context) {
+    TaskRuntimeEstimator estimator;
+    
+    try {
+      // "yarn.mapreduce.job.task.runtime.estimator.class"
+      Class<? extends TaskRuntimeEstimator> estimatorClass
+          = conf.getClass(YarnMRJobConfig.TASK_RUNTIME_ESTIMATOR_CLASS,
+                          LegacyTaskRuntimeEstimator.class,
+                          TaskRuntimeEstimator.class);
+
+      Constructor<? extends TaskRuntimeEstimator> estimatorConstructor
+          = estimatorClass.getConstructor();
+
+      estimator = estimatorConstructor.newInstance();
+
+      estimator.contextualize(conf, context);
+    } catch (InstantiationException ex) {
+      LOG.error("Can't make a speculation runtime extimator" + ex);
+      throw new YarnException(ex);
+    } catch (IllegalAccessException ex) {
+      LOG.error("Can't make a speculation runtime extimator" + ex);
+      throw new YarnException(ex);
+    } catch (InvocationTargetException ex) {
+      LOG.error("Can't make a speculation runtime extimator" + ex);
+      throw new YarnException(ex);
+    } catch (NoSuchMethodException ex) {
+      LOG.error("Can't make a speculation runtime extimator" + ex);
+      throw new YarnException(ex);
+    }
+    
+  return estimator;
+  }
+
+  // This constructor is designed to be called by other constructors.
+  //  However, it's public because we do use it in the test cases.
+  // Normally we figure out our own estimator.
+  public DefaultSpeculator
+      (Configuration conf, AppContext context,
+       TaskRuntimeEstimator estimator, Clock clock) {
+    super(DefaultSpeculator.class.getName());
+
+    this.conf = conf;
+    this.context = context;
+    this.estimator = estimator;
+    this.clock = clock;
+    this.eventHandler = context.getEventHandler();
+  }
+
+/*   *************************************************************    */
+
+  // This is the task-mongering that creates the two new threads -- one for
+  //  processing events from the event queue and one for periodically
+  //  looking for speculation opportunities
+
+  @Override
+  public void start() {
+    Runnable speculationBackgroundCore
+        = new Runnable() {
+            @Override
+            public void run() {
+              while (!Thread.currentThread().isInterrupted()) {
+                long backgroundRunStartTime = clock.getTime();
+                try {
+                  int speculations = computeSpeculations();
+                  long mininumRecomp
+                      = speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE
+                                         : SOONEST_RETRY_AFTER_NO_SPECULATE;
+
+                  long wait = Math.max(mininumRecomp,
+                        clock.getTime() - backgroundRunStartTime);
+
+                  if (speculations > 0) {
+                    LOG.info("We launched " + speculations
+                        + " speculations.  Sleeping " + wait + " milliseconds.");
+                  }
+
+                  Object pollResult
+                      = scanControl.poll(wait, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                  LOG.error("Background thread returning, interrupted : " + e);
+                  e.printStackTrace(System.out);
+                  return;
+                }
+              }
+            }
+          };
+    speculationBackgroundThread = new Thread
+        (speculationBackgroundCore, "DefaultSpeculator background processing");
+    speculationBackgroundThread.start();
+
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    // this could be called before background thread is established
+    if (speculationBackgroundThread != null) {
+      speculationBackgroundThread.interrupt();
+    }
+    super.stop();
+  }
+
+  @Override
+  public void handleAttempt(TaskAttemptStatus status) {
+    long timestamp = clock.getTime();
+    statusUpdate(status, timestamp);
+  }
+
+  // This section is not part of the Speculator interface; it's used only for
+  //  testing
+  public boolean eventQueueEmpty() {
+    return eventQueue.isEmpty();
+  }
+
+  // This interface is intended to be used only for test cases.
+  public void scanForSpeculations() {
+    LOG.info("We got asked to run a debug speculation scan.");
+    // debug
+    System.out.println("We got asked to run a debug speculation scan.");
+    System.out.println("There are " + scanControl.size()
+        + " events stacked already.");
+    scanControl.add(new Object());
+    Thread.yield();
+  }
+
+
+/*   *************************************************************    */
+
+  // This section contains the code that gets run for a SpeculatorEvent
+
+  private AtomicInteger containerNeed(TaskID taskID) {
+    JobID jobID = taskID.jobID;
+    TaskType taskType = taskID.taskType;
+
+    ConcurrentMap<JobID, AtomicInteger> relevantMap
+        = taskType == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;
+
+    AtomicInteger result = relevantMap.get(jobID);
+
+    if (result == null) {
+      relevantMap.putIfAbsent(jobID, new AtomicInteger(0));
+      result = relevantMap.get(jobID);
+    }
+
+    return result;
+  }
+
+  private synchronized void processSpeculatorEvent(SpeculatorEvent event) {
+    switch (event.getType()) {
+      case ATTEMPT_STATUS_UPDATE:
+        statusUpdate(event.getReportedStatus(), event.getTimestamp());
+        break;
+
+      case TASK_CONTAINER_NEED_UPDATE:
+      {
+        AtomicInteger need = containerNeed(event.getTaskID());
+        int newNeed = need.addAndGet(event.containersNeededChange());
+        break;
+      }
+
+      case ATTEMPT_START:
+      {
+        estimator.enrollAttempt
+            (event.getReportedStatus(), event.getTimestamp());
+      }
+    }
+  }
+
+  /**
+   * Absorbs one TaskAttemptStatus
+   *
+   * @param reportedStatus the status report that we got from a task attempt
+   *        that we want to fold into the speculation data for this job
+   * @param timestamp the time this status corresponds to.  This matters
+   *        because statuses contain progress.
+   */
+  protected void statusUpdate(TaskAttemptStatus reportedStatus, long timestamp) {
+
+    String stateString = reportedStatus.stateString.toString();
+
+    TaskAttemptID attemptID = reportedStatus.id;
+    TaskID taskID = attemptID.taskID;
+    Job job = context.getJob(taskID.jobID);
+
+    if (job == null) {
+      return;
+    }
+
+    Task task = job.getTask(taskID);
+
+    if (task == null) {
+      return;
+    }
+
+    estimator.updateAttempt(reportedStatus, timestamp);
+
+    // If the task is already known to be speculation-bait, don't do anything
+    if (pendingSpeculations.get(task) != null) {
+      if (pendingSpeculations.get(task).get()) {
+        return;
+      }
+    }
+
+    if (stateString.equals(TaskAttemptState.RUNNING.name())) {
+      runningTasks.putIfAbsent(taskID, Boolean.TRUE);
+    } else {
+      runningTasks.remove(taskID, Boolean.TRUE);
+    }
+  }
+
+/*   *************************************************************    */
+
+// This is the code section that runs periodically and adds speculations for
+//  those jobs that need them.
+
+
+  // This can return a few magic values for tasks that shouldn't speculate:
+  //  returns ON_SCHEDULE if thresholdRuntime(taskID) says that we should not
+  //     considering speculating this task
+  //  returns ALREADY_SPECULATING if that is true.  This has priority.
+  //  returns TOO_NEW if our companion task hasn't gotten any information
+  //  returns PROGRESS_IS_GOOD if the task is sailing through
+  //  returns NOT_RUNNING if the task is not running
+  //
+  // All of these values are negative.  Any value that should be allowed to
+  //  speculate is 0 or positive.
+  private long speculationValue(TaskID taskID, long now) {
+    Job job = context.getJob(taskID.jobID);
+    Task task = job.getTask(taskID);
+    Map<TaskAttemptID, TaskAttempt> attempts = task.getAttempts();
+    long acceptableRuntime = Long.MIN_VALUE;
+    long result = Long.MIN_VALUE;
+
+    if (!mayHaveSpeculated.contains(taskID)) {
+      acceptableRuntime = estimator.thresholdRuntime(taskID);
+      if (acceptableRuntime == Long.MAX_VALUE) {
+        return ON_SCHEDULE;
+      }
+    }
+
+    TaskAttemptID runningTaskAttemptID = null;
+
+    int numberRunningAttempts = 0;
+
+    for (TaskAttempt taskAttempt : attempts.values()) {
+      if (taskAttempt.getState() == TaskAttemptState.RUNNING
+          || taskAttempt.getState() == TaskAttemptState.ASSIGNED) {
+        if (++numberRunningAttempts > 1) {
+          return ALREADY_SPECULATING;
+        }
+        runningTaskAttemptID = taskAttempt.getID();
+
+        long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);
+
+        long taskAttemptStartTime
+            = estimator.attemptEnrolledTime(runningTaskAttemptID);
+
+        if (taskAttemptStartTime > now) {
+          // This background process ran before we could process the task
+          //  attempt status change that chronicles the attempt start
+          return TOO_NEW;
+        }
+
+        long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;
+
+        long estimatedReplacementEndTime
+            = now + estimator.estimatedNewAttemptRuntime(taskID);
+
+        if (estimatedEndTime < now) {
+          return PROGRESS_IS_GOOD;
+        }
+
+        if (estimatedReplacementEndTime >= estimatedEndTime) {
+          return TOO_LATE_TO_SPECULATE;
+        }
+
+        result = estimatedEndTime - estimatedReplacementEndTime;
+      }
+    }
+
+    // If we are here, there's at most one task attempt.
+    if (numberRunningAttempts == 0) {
+      return NOT_RUNNING;
+    }
+
+
+
+    if (acceptableRuntime == Long.MIN_VALUE) {
+      acceptableRuntime = estimator.thresholdRuntime(taskID);
+      if (acceptableRuntime == Long.MAX_VALUE) {
+        return ON_SCHEDULE;
+      }
+    }
+
+    return result;
+  }
+
+  //Add attempt to a given Task.
+  protected void addSpeculativeAttempt(TaskID taskID) {
+    System.out.println
+        ("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
+    eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_ADD_SPEC_ATTEMPT));
+    mayHaveSpeculated.add(taskID);
+  }
+
+  @Override
+  public void handle(SpeculatorEvent event) {
+    processSpeculatorEvent(event);
+  }
+
+
+  private int maybeScheduleAMapSpeculation() {
+    return maybeScheduleASpeculation(TaskType.MAP);
+  }
+
+  private int maybeScheduleAReduceSpeculation() {
+    return maybeScheduleASpeculation(TaskType.REDUCE);
+  }
+
+  private int maybeScheduleASpeculation(TaskType type) {
+    int successes = 0;
+
+    long now = clock.getTime();
+
+    ConcurrentMap<JobID, AtomicInteger> containerNeeds
+        = type == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;
+
+    for (ConcurrentMap.Entry<JobID, AtomicInteger> jobEntry : containerNeeds.entrySet()) {
+      // This race conditon is okay.  If we skip a speculation attempt we
+      //  should have tried because the event that lowers the number of
+      //  containers needed to zero hasn't come through, it will next time.
+      // Also, if we miss the fact that the number of containers needed was
+      //  zero but increased due to a failure it's not too bad to launch one
+      //  container prematurely.
+      if (jobEntry.getValue().get() > 0) {
+        break;
+      }
+
+      int numberSpeculationsAlready = 0;
+      int numberRunningTasks = 0;
+
+      // loop through the tasks of the kind
+      Job job = context.getJob(jobEntry.getKey());
+
+      Map<TaskID, Task> tasks = job.getTasks(type);
+
+      int numberAllowedSpeculativeTasks
+          = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
+                           PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());
+
+      TaskID bestTaskID = null;
+      long bestSpeculationValue = -1L;
+
+      // this loop is potentially pricey.
+      // TODO track the tasks that are potentially worth looking at
+      for (Map.Entry<TaskID, Task> taskEntry : tasks.entrySet()) {
+        long mySpeculationValue = speculationValue(taskEntry.getKey(), now);
+
+        if (mySpeculationValue == ALREADY_SPECULATING) {
+          ++numberSpeculationsAlready;
+        }
+
+        if (mySpeculationValue != NOT_RUNNING) {
+          ++numberRunningTasks;
+        }
+
+        if (mySpeculationValue > bestSpeculationValue) {
+          bestTaskID = taskEntry.getKey();
+          bestSpeculationValue = mySpeculationValue;
+        }
+      }
+
+      numberAllowedSpeculativeTasks
+          = (int) Math.max(numberAllowedSpeculativeTasks,
+                           PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);
+
+      // If we found a speculation target, fire it off
+      if (bestTaskID != null
+          && numberAllowedSpeculativeTasks > numberSpeculationsAlready) {
+        addSpeculativeAttempt(bestTaskID);
+        ++successes;
+      }
+    }
+
+    return successes;
+  }
+
+  private int computeSpeculations() {
+    // We'll try to issue one map and one reduce speculation per job per run
+    return maybeScheduleAMapSpeculation() + maybeScheduleAReduceSpeculation();
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,194 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.speculate;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+
+/*
+ * This estimator exponentially smooths the rate of progress vrs. wallclock
+ *  time.  Conceivably we could write an estimator that smooths time per
+ *  unit progress, and get different results.
+ */
+public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase {
+
+  private final ConcurrentMap<TaskAttemptID, AtomicReference<EstimateVector>> estimates
+      = new ConcurrentHashMap<TaskAttemptID, AtomicReference<EstimateVector>>();
+
+  private SmoothedValue smoothedValue;
+
+  private long lambda;
+
+  public enum SmoothedValue {
+    RATE, TIME_PER_UNIT_PROGRESS
+  }
+
+  ExponentiallySmoothedTaskRuntimeEstimator
+      (long lambda, SmoothedValue smoothedValue) {
+    super();
+    this.smoothedValue = smoothedValue;
+    this.lambda = lambda;
+  }
+
+  public ExponentiallySmoothedTaskRuntimeEstimator() {
+    super();
+  }
+
+  // immutable
+  private class EstimateVector {
+    final double value;
+    final float basedOnProgress;
+    final long atTime;
+
+    EstimateVector
+        (double value, float basedOnProgress, long atTime) {
+      this.value = value;
+      this.basedOnProgress = basedOnProgress;
+      this.atTime = atTime;
+    }
+
+    EstimateVector incorporate(float newProgress, long newAtTime) {
+      if (newAtTime <= atTime || newProgress < basedOnProgress) {
+        return this;
+      }
+
+      double oldWeighting
+          = value < 0.0
+              ? 0.0 : Math.exp(((double) (newAtTime - atTime)) / lambda);
+
+      double newRead = (newProgress - basedOnProgress) / (newAtTime - atTime);
+
+      if (smoothedValue == SmoothedValue.TIME_PER_UNIT_PROGRESS) {
+        newRead = 1.0 / newRead;
+      }
+
+      return new EstimateVector
+          (value * oldWeighting + newRead * (1.0 - oldWeighting),
+           newProgress, newAtTime);
+    }
+  }
+
+  private void incorporateReading
+      (TaskAttemptID attemptID, float newProgress, long newTime) {
+    AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
+
+    if (vectorRef == null) {
+      estimates.putIfAbsent(attemptID, new AtomicReference<EstimateVector>(null));
+      incorporateReading(attemptID, newProgress, newTime);
+    }
+
+    EstimateVector oldVector = vectorRef.get();
+
+    if (oldVector == null) {
+      if (vectorRef.compareAndSet(null,
+             new EstimateVector(-1.0, 0.0F, Long.MIN_VALUE))) {
+        return;
+      }
+
+      incorporateReading(attemptID, newProgress, newTime);
+    }
+
+    while (!vectorRef.compareAndSet
+            (oldVector, oldVector.incorporate(newProgress, newTime))) {
+      oldVector = vectorRef.get();
+    }
+  }
+
+  private EstimateVector getEstimateVector(TaskAttemptID attemptID) {
+    AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
+
+    if (vectorRef == null) {
+      return null;
+    }
+
+    return vectorRef.get();
+  }
+
+  private static final long DEFAULT_EXPONENTIAL_SMOOTHING_LAMBDA_MILLISECONDS
+      = 1000L * 60;
+
+  @Override
+  public void contextualize(Configuration conf, AppContext context) {
+    super.contextualize(conf, context);
+
+    lambda
+        = conf.getLong(YarnMRJobConfig.EXPONENTIAL_SMOOTHING_LAMBDA_MILLISECONDS,
+            DEFAULT_EXPONENTIAL_SMOOTHING_LAMBDA_MILLISECONDS);
+    smoothedValue
+        = conf.getBoolean(YarnMRJobConfig.EXPONENTIAL_SMOOTHING_SMOOTH_RATE, true)
+            ? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS;
+  }
+
+  @Override
+  public long estimatedRuntime(TaskAttemptID id) {
+    Long startTime = startTimes.get(id);
+
+    if (startTime == null) {
+      return -1L;
+    }
+
+    EstimateVector vector = getEstimateVector(id);
+
+    if (vector == null) {
+      return -1L;
+    }
+
+    long sunkTime = vector.atTime - startTime;
+
+    double value = vector.value;
+    float progress = vector.basedOnProgress;
+
+    if (value == 0) {
+      return -1L;
+    }
+
+    double rate = smoothedValue == SmoothedValue.RATE ? value : 1.0 / value;
+
+    if (rate == 0.0) {
+      return -1L;
+    }
+
+    double remainingTime = (1.0 - progress) / rate;
+
+    return sunkTime + (long)remainingTime;
+  }
+
+  @Override
+  public long runtimeEstimateVariance(TaskAttemptID id) {
+    return -1L;
+  }
+
+  @Override
+  public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+    TaskAttemptID attemptID = status.id;
+
+    float progress = status.progress;
+
+    incorporateReading(attemptID, progress, timestamp);
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/LegacyTaskRuntimeEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/LegacyTaskRuntimeEstimator.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/LegacyTaskRuntimeEstimator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/LegacyTaskRuntimeEstimator.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,156 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.speculate;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+
+
+
+public class LegacyTaskRuntimeEstimator extends StartEndTimesBase {
+
+  private final Map<TaskAttempt, AtomicLong> attemptRuntimeEstimates
+      = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
+  private final Map<TaskAttempt, AtomicLong> attemptRuntimeEstimateVariances
+      = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
+
+
+  @Override
+  public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+    super.updateAttempt(status, timestamp);
+
+    String stateString = status.stateString.toString();
+
+    TaskAttemptID attemptID = status.id;
+    TaskID taskID = attemptID.taskID;
+    JobID jobID = taskID.jobID;
+    Job job = context.getJob(jobID);
+
+    if (job == null) {
+      return;
+    }
+
+    Task task = job.getTask(taskID);
+
+    if (task == null) {
+      return;
+    }
+
+    TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+    if (taskAttempt == null) {
+      return;
+    }
+
+    Long boxedStart = startTimes.get(attemptID);
+    long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
+
+    // We need to do two things.
+    //  1: If this is a completion, we accumulate statistics in the superclass
+    //  2: If this is not a completion, we learn more about it.
+
+    // This is not a completion, but we're cooking.
+    //
+    if (stateString.equals(TaskAttemptState.RUNNING.name())) {
+      // See if this task is already in the registry
+      AtomicLong estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
+      AtomicLong estimateVarianceContainer
+          = attemptRuntimeEstimateVariances.get(taskAttempt);
+
+      if (estimateContainer == null) {
+        synchronized (attemptRuntimeEstimates) {
+          if (attemptRuntimeEstimates.get(taskAttempt) == null) {
+            attemptRuntimeEstimates.put(taskAttempt, new AtomicLong());
+          }
+
+          estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
+        }
+      }
+
+      if (estimateVarianceContainer == null) {
+        synchronized (attemptRuntimeEstimateVariances) {
+          if (attemptRuntimeEstimateVariances.get(taskAttempt) == null) {
+            attemptRuntimeEstimateVariances.put(taskAttempt, new AtomicLong());
+          }
+
+          estimateVarianceContainer
+              = attemptRuntimeEstimateVariances.get(taskAttempt);
+        }
+      }
+
+      long estimate = -1;
+      long varianceEstimate = -1;
+
+      // This code assumes that we'll never consider starting a third
+      //  speculative task attempt if two are already running for this task
+      if (start > 0 && timestamp > start) {
+        estimate = (long) ((timestamp - start) / Math.max(0.0001, status.progress));
+        varianceEstimate = (long) (estimate * status.progress / 10);
+      }
+
+      estimateContainer.set(estimate);
+      estimateVarianceContainer.set(varianceEstimate);
+    }
+  }
+
+  private long storedPerAttemptValue
+       (Map<TaskAttempt, AtomicLong> data, TaskAttemptID attemptID) {
+    TaskID taskID = attemptID.taskID;
+    JobID jobID = taskID.jobID;
+    Job job = context.getJob(jobID);
+
+    Task task = job.getTask(taskID);
+
+    if (task == null) {
+      return -1L;
+    }
+
+    TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+    if (taskAttempt == null) {
+      return -1L;
+    }
+
+    AtomicLong estimate = data.get(taskAttempt);
+
+    return estimate == null ? -1L : estimate.get();
+
+  }
+
+  @Override
+  public long estimatedRuntime(TaskAttemptID attemptID) {
+    return storedPerAttemptValue(attemptRuntimeEstimates, attemptID);
+  }
+
+  @Override
+  public long runtimeEstimateVariance(TaskAttemptID attemptID) {
+    return storedPerAttemptValue(attemptRuntimeEstimateVariances, attemptID);
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,72 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.speculate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+
+/*
+ * This class is provided solely as an exemplae of the values that mean
+ *  that nothing needs to be computed.  It's not currently used.
+ */
+public class NullTaskRuntimesEngine implements TaskRuntimeEstimator {
+  @Override
+  public void enrollAttempt(TaskAttemptStatus status, long timestamp) {
+    // no code
+  }
+
+  @Override
+  public long attemptEnrolledTime(TaskAttemptID attemptID) {
+    return Long.MAX_VALUE;
+  }
+
+  @Override
+  public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+    // no code
+  }
+
+  @Override
+  public void contextualize(Configuration conf, AppContext context) {
+    // no code
+  }
+
+  @Override
+  public long thresholdRuntime(TaskID id) {
+    return Long.MAX_VALUE;
+  }
+
+  @Override
+  public long estimatedRuntime(TaskAttemptID id) {
+    return -1L;
+  }
+  @Override
+  public long estimatedNewAttemptRuntime(TaskID id) {
+    return -1L;
+  }
+
+  @Override
+  public long runtimeEstimateVariance(TaskAttemptID id) {
+    return -1L;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/Speculator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/Speculator.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/Speculator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/Speculator.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,44 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.speculate;
+
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+/**
+ * Speculator component. Task Attempts' status updates are sent to this
+ * component. Concrete implementation runs the speculative algorithm and
+ * sends the TaskEventType.T_ADD_ATTEMPT.
+ *
+ * An implementation also has to arrange for the jobs to be scanned from
+ * time to time, to launch the speculations.
+ */
+public interface Speculator
+              extends EventHandler<SpeculatorEvent> {
+
+  enum EventType {
+    ATTEMPT_STATUS_UPDATE,
+    ATTEMPT_START,
+    TASK_CONTAINER_NEED_UPDATE
+  }
+
+  // This will be implemented if we go to a model where the events are
+  //  processed within the TaskAttempts' state transitions' code.
+  public void handleAttempt(TaskAttemptStatus status);
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SpeculatorEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SpeculatorEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SpeculatorEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SpeculatorEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,74 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.speculate;
+
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+
+public class SpeculatorEvent extends AbstractEvent<Speculator.EventType> {
+
+  // valid for ATTEMPT_STATUS_UPDATE
+  private TaskAttemptStatus reportedStatus;
+
+  // valid for TASK_CONTAINER_NEED_UPDATE
+  private TaskID taskID;
+  private int containersNeededChange;
+
+
+
+  public SpeculatorEvent(TaskAttemptStatus reportedStatus, long timestamp) {
+    super(Speculator.EventType.ATTEMPT_STATUS_UPDATE, timestamp);
+    this.reportedStatus = reportedStatus;
+  }
+
+  public SpeculatorEvent(TaskAttemptID attemptID, boolean flag, long timestamp) {
+    super(Speculator.EventType.ATTEMPT_START, timestamp);
+    this.reportedStatus = new TaskAttemptStatus();
+    this.reportedStatus.id = attemptID;
+  }
+
+  /*
+   * This c'tor creates a TASK_CONTAINER_NEED_UPDATE event .
+   * We send a +1 event when a task enters a state where it wants a container,
+   *  and a -1 event when it either gets one or withdraws the request.
+   * The per job sum of all these events is the number of containers requested
+   *  but not granted.  The intent is that we only do speculations when the
+   *  speculation wouldn't compete for containers with tasks which need
+   *  to be run.
+   */
+  public SpeculatorEvent(TaskID taskID, int containersNeededChange) {
+    super(Speculator.EventType.TASK_CONTAINER_NEED_UPDATE);
+    this.taskID = taskID;
+    this.containersNeededChange = containersNeededChange;
+  }
+
+  public TaskAttemptStatus getReportedStatus() {
+    return reportedStatus;
+  }
+
+  public int containersNeededChange() {
+    return containersNeededChange;
+  }
+
+  public TaskID getTaskID() {
+    return taskID;
+  }
+}
\ No newline at end of file