You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC

svn commit: r1469642 [19/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t...

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,363 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.node;
+
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklisted;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventNodeFailed;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+
+public class AMNodeImpl implements AMNode {
+
+  private static final Log LOG = LogFactory.getLog(AMNodeImpl.class);
+
+  private final ReadLock readLock;
+  private final WriteLock writeLock;
+  private final NodeId nodeId;
+  private final AppContext appContext;
+  private final int maxTaskFailuresPerNode;
+  private int numFailedTAs = 0;
+  private int numSuccessfulTAs = 0;
+  private boolean blacklistingEnabled;
+  private boolean ignoreBlacklisting = false;
+  
+  @SuppressWarnings("rawtypes")
+  protected EventHandler eventHandler;
+
+  private final List<ContainerId> containers = new LinkedList<ContainerId>();
+  
+  //Book-keeping only. In case of Health status change.
+  private final List<ContainerId> pastContainers = new LinkedList<ContainerId>();
+
+
+
+  private final StateMachine<AMNodeState, AMNodeEventType, AMNodeEvent> stateMachine;
+
+  private static StateMachineFactory
+  <AMNodeImpl, AMNodeState, AMNodeEventType, AMNodeEvent> 
+  stateMachineFactory = 
+  new StateMachineFactory<AMNodeImpl, AMNodeState, AMNodeEventType, AMNodeEvent>(
+  AMNodeState.ACTIVE)
+        // Transitions from ACTIVE state.
+    .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE, AMNodeEventType.N_CONTAINER_ALLOCATED, new ContainerAllocatedTransition())
+    .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE, AMNodeEventType.N_TA_SUCCEEDED, new TaskAttemptSucceededTransition())
+    .addTransition(AMNodeState.ACTIVE, EnumSet.of(AMNodeState.ACTIVE, AMNodeState.BLACKLISTED), AMNodeEventType.N_TA_ENDED, new TaskAttemptFailedTransition())
+    .addTransition(AMNodeState.ACTIVE, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new NodeTurnedUnhealthyTransition())
+    .addTransition(AMNodeState.ACTIVE, EnumSet.of(AMNodeState.ACTIVE, AMNodeState.BLACKLISTED), AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, new IgnoreBlacklistingDisabledTransition())
+    .addTransition(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, new IgnoreBlacklistingStateChangeTransition(true))
+    .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE, AMNodeEventType.N_TURNED_HEALTHY)
+
+        // Transitions from BLACKLISTED state.
+    .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED, AMNodeEventType.N_CONTAINER_ALLOCATED, new ContainerAllocatedWhileBlacklistedTransition())
+    .addTransition(AMNodeState.BLACKLISTED, EnumSet.of(AMNodeState.BLACKLISTED, AMNodeState.ACTIVE), AMNodeEventType.N_TA_SUCCEEDED, new TaskAttemptSucceededWhileBlacklistedTransition())
+    .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED, AMNodeEventType.N_TA_ENDED, new CountFailedTaskAttemptTransition())
+    .addTransition(AMNodeState.BLACKLISTED, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new NodeTurnedUnhealthyTransition())
+    .addTransition(AMNodeState.BLACKLISTED, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, new IgnoreBlacklistingStateChangeTransition(true))
+    .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED, EnumSet.of(AMNodeEventType.N_TURNED_HEALTHY, AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED), new GenericErrorTransition())
+
+        //Transitions from FORCED_ACTIVE state.
+    .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_CONTAINER_ALLOCATED, new ContainerAllocatedTransition())
+    .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_TA_SUCCEEDED, new TaskAttemptSucceededTransition())
+    .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_TA_ENDED, new CountFailedTaskAttemptTransition())
+    .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new NodeTurnedUnhealthyTransition())
+    .addTransition(AMNodeState.FORCED_ACTIVE, EnumSet.of(AMNodeState.BLACKLISTED, AMNodeState.ACTIVE), AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, new IgnoreBlacklistingDisabledTransition())
+    .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, EnumSet.of(AMNodeEventType.N_TURNED_HEALTHY, AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED), new GenericErrorTransition())
+            
+        // Transitions from UNHEALTHY state.
+    .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, AMNodeEventType.N_CONTAINER_ALLOCATED, new ContainerAllocatedWhileUnhealthyTransition())
+    .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, EnumSet.of(AMNodeEventType.N_TA_SUCCEEDED, AMNodeEventType.N_TA_ENDED))
+    .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, new IgnoreBlacklistingStateChangeTransition(false))
+    .addTransition(AMNodeState.UNHEALTHY,  AMNodeState.UNHEALTHY, AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, new IgnoreBlacklistingStateChangeTransition(true))
+    .addTransition(AMNodeState.UNHEALTHY, EnumSet.of(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE), AMNodeEventType.N_TURNED_HEALTHY, new NodeTurnedHealthyTransition())
+    .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new GenericErrorTransition())
+
+        .installTopology();
+  
+
+  @SuppressWarnings("rawtypes")
+  public AMNodeImpl(NodeId nodeId, int maxTaskFailuresPerNode,
+      EventHandler eventHandler, boolean blacklistingEnabled,
+      AppContext appContext) {
+    ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+    this.readLock = rwLock.readLock();
+    this.writeLock = rwLock.writeLock();
+    this.nodeId = nodeId;
+    this.appContext = appContext;
+    this.eventHandler = eventHandler;
+    this.maxTaskFailuresPerNode = maxTaskFailuresPerNode;
+    this.stateMachine = stateMachineFactory.make(this);
+    // TODO Handle the case where a node is created due to the RM reporting it's
+    // state as UNHEALTHY
+  }
+
+  @Override
+  public NodeId getNodeId() {
+    return this.nodeId;
+  }
+
+  @Override
+  public AMNodeState getState() {
+    this.readLock.lock();
+    try {
+      return this.stateMachine.getCurrentState();
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  public List<ContainerId> getContainers() {
+    this.readLock.lock();
+    try {
+      List<ContainerId> cIds = new LinkedList<ContainerId>(this.containers);
+      return cIds;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  public void handle(AMNodeEvent event) {
+    this.writeLock.lock();
+    LOG.info("DEBUG: Processing AMNodeEvent " + event.getNodeId()
+        + " of type " + event.getType() + " while in state: " + getState()
+        + ". Event: " + event);
+    try {
+      final AMNodeState oldState = getState();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle event " + event.getType()
+            + " at current state " + oldState + " for NodeId " + this.nodeId, e);
+        // TODO Should this fail the job ?
+      }
+      if (oldState != getState()) {
+        LOG.info("AMNode " + this.nodeId + " transitioned from " + oldState
+            + " to " + getState());
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+  
+  protected boolean shouldBlacklistNode() {
+    return blacklistingEnabled && (numFailedTAs >= maxTaskFailuresPerNode);
+  }
+
+  protected void blacklistSelf() {
+    sendEvent(new AMNodeEvent(getNodeId(),
+        AMNodeEventType.N_NODE_WAS_BLACKLISTED));
+    sendEvent(new AMSchedulerEventNodeBlacklisted(getNodeId()));
+  }
+
+  @SuppressWarnings("unchecked")
+  private void sendEvent(Event<?> event) {
+    this.eventHandler.handle(event);
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  //                   Start of Transition Classes                            //
+  //////////////////////////////////////////////////////////////////////////////
+  
+  protected static class ContainerAllocatedTransition implements
+      SingleArcTransition<AMNodeImpl, AMNodeEvent> {
+    @Override
+    public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
+      AMNodeEventContainerAllocated event = (AMNodeEventContainerAllocated) nEvent;
+      node.containers.add(event.getContainerId());
+    }
+  }
+
+  protected static class TaskAttemptSucceededTransition implements
+      SingleArcTransition<AMNodeImpl, AMNodeEvent> {
+    @Override
+    public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
+      node.numSuccessfulTAs++;
+    }
+  }
+
+  protected static class TaskAttemptFailedTransition implements
+      MultipleArcTransition<AMNodeImpl, AMNodeEvent, AMNodeState> {
+    @Override
+    public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
+      AMNodeEventTaskAttemptEnded event = (AMNodeEventTaskAttemptEnded) nEvent;
+      if (event.failed()) {
+        node.numFailedTAs++;
+        boolean shouldBlacklist = node.shouldBlacklistNode();
+        if (shouldBlacklist) {
+          node.blacklistSelf();
+          return AMNodeState.BLACKLISTED;
+        }
+      }
+      return AMNodeState.ACTIVE;
+    }
+  }
+
+  // Forgetting about past errors. Will go back to ACTIVE, not FORCED_ACTIVE
+  protected static class NodeTurnedUnhealthyTransition implements
+      SingleArcTransition<AMNodeImpl, AMNodeEvent> {
+    @Override
+    public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
+      for (ContainerId c : node.containers) {
+        node.sendEvent(new AMContainerEventNodeFailed(c, "Node failed"));
+      }
+      // Resetting counters.
+      node.numFailedTAs = 0;
+      node.numSuccessfulTAs = 0;
+    }
+  }
+
+  protected static class IgnoreBlacklistingDisabledTransition implements
+      MultipleArcTransition<AMNodeImpl, AMNodeEvent, AMNodeState> {
+
+    @Override
+    public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
+      node.ignoreBlacklisting = false;
+      boolean shouldBlacklist = node.shouldBlacklistNode();
+      if (shouldBlacklist) {
+        node.blacklistSelf();
+        return AMNodeState.BLACKLISTED;
+      }
+      return AMNodeState.ACTIVE;
+    }
+  }
+
+  protected static class IgnoreBlacklistingStateChangeTransition implements
+      SingleArcTransition<AMNodeImpl, AMNodeEvent> {
+
+    private boolean ignore;
+
+    public IgnoreBlacklistingStateChangeTransition(boolean ignore) {
+      this.ignore = ignore;
+    }
+
+    @Override
+    public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
+      node.ignoreBlacklisting = ignore;
+    }
+  }
+
+  protected static class ContainerAllocatedWhileBlacklistedTransition implements
+      SingleArcTransition<AMNodeImpl, AMNodeEvent> {
+    @Override
+    public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
+      AMNodeEventContainerAllocated event = (AMNodeEventContainerAllocated) nEvent;
+      node.sendEvent(new AMContainerEvent(event.getContainerId(),
+          AMContainerEventType.C_STOP_REQUEST));
+      // ZZZ CReuse: Should the scheduler check node state before scheduling a
+      // container on it ?
+    }
+  }
+
+  protected static class TaskAttemptSucceededWhileBlacklistedTransition
+      implements MultipleArcTransition<AMNodeImpl, AMNodeEvent, AMNodeState> {
+    @Override
+    public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
+      node.numSuccessfulTAs++;
+      return AMNodeState.BLACKLISTED;
+      // For now, always blacklisted. May change at a later point to re-enable
+      // the node.
+    }
+  }
+
+  protected static class CountFailedTaskAttemptTransition implements
+      SingleArcTransition<AMNodeImpl, AMNodeEvent> {
+    @Override
+    public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
+      AMNodeEventTaskAttemptEnded event = (AMNodeEventTaskAttemptEnded) nEvent;
+      if (event.failed())
+        node.numFailedTAs++;
+    }
+  }
+
+  protected static class GenericErrorTransition implements
+      SingleArcTransition<AMNodeImpl, AMNodeEvent> {
+
+    @Override
+    public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
+      LOG.warn("Invalid event: " + nEvent.getType() + " while in state: "
+          + node.getState() + ". Ignoring." + " Event: " + nEvent);
+    }
+  }
+
+  protected static class ContainerAllocatedWhileUnhealthyTransition implements
+      SingleArcTransition<AMNodeImpl, AMNodeEvent> {
+    @Override
+    public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
+      AMNodeEventContainerAllocated event = (AMNodeEventContainerAllocated) nEvent;
+      LOG.info("Node: " + node.getNodeId()
+          + " got allocated a contaienr with id: " + event.getContainerId()
+          + " while in UNHEALTHY state. Releasing it.");
+      node.sendEvent(new AMContainerEventNodeFailed(event.getContainerId(),
+          "new container assigned on failed node " + node.getNodeId()));
+    }
+  }
+
+  protected static class NodeTurnedHealthyTransition implements
+      MultipleArcTransition<AMNodeImpl, AMNodeEvent, AMNodeState> {
+    @Override
+    public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
+      node.pastContainers.addAll(node.containers);
+      node.containers.clear();
+      if (node.ignoreBlacklisting) {
+        return AMNodeState.FORCED_ACTIVE;
+      } else {
+        return AMNodeState.ACTIVE;
+      }
+    }
+  }
+
+  @Override
+  public boolean isUnhealthy() {
+    this.readLock.lock();
+    try {
+      return getState() == AMNodeState.UNHEALTHY;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  public boolean isBlacklisted() {
+    this.readLock.lock();
+    try {
+      return getState() == AMNodeState.BLACKLISTED;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,211 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.node;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class AMNodeMap extends AbstractService implements
+    EventHandler<AMNodeEvent> {
+  
+  static final Log LOG = LogFactory.getLog(AMNodeMap.class);
+  
+  private final ConcurrentHashMap<NodeId, AMNode> nodeMap;
+  private final ConcurrentHashMap<String, Set<NodeId>> blacklistMap;
+  private final EventHandler eventHandler;
+  private final AppContext appContext;
+  private int numClusterNodes;
+  private boolean ignoreBlacklisting = false;
+  private int maxTaskFailuresPerNode;
+  private boolean nodeBlacklistingEnabled;
+  private int blacklistDisablePercent;
+  
+  
+  // TODO XXX Ensure there's a test for IgnoreBlacklisting in
+  // TestRMContainerAllocator. Otherwise add one.
+  public AMNodeMap(EventHandler eventHandler, AppContext appContext) {
+    super("AMNodeMap");
+    this.nodeMap = new ConcurrentHashMap<NodeId, AMNode>();
+    this.blacklistMap = new ConcurrentHashMap<String, Set<NodeId>>();
+    this.eventHandler = eventHandler;
+    this.appContext = appContext;
+  }
+  
+  @Override
+  public synchronized void init(Configuration conf) {
+    this.maxTaskFailuresPerNode = conf.getInt(
+        MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3);
+    this.nodeBlacklistingEnabled = conf.getBoolean(
+        MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+    this.blacklistDisablePercent = conf.getInt(
+          MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
+          MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT);
+
+    LOG.info("blacklistDisablePercent is " + blacklistDisablePercent +
+        ", blacklistingEnabled: " + nodeBlacklistingEnabled + 
+        ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode);
+
+    if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
+      throw new YarnException("Invalid blacklistDisablePercent: "
+          + blacklistDisablePercent
+          + ". Should be an integer between 0 and 100 or -1 to disabled");
+    }    
+    super.init(conf);
+  }
+  
+  public void nodeSeen(NodeId nodeId) {
+    nodeMap.putIfAbsent(nodeId, new AMNodeImpl(nodeId, maxTaskFailuresPerNode,
+        eventHandler, nodeBlacklistingEnabled, appContext));
+  }
+
+  // Interface for the scheduler to check about a specific host.
+  public boolean isHostBlackListed(String hostname) {
+    if (!nodeBlacklistingEnabled || ignoreBlacklisting) {
+      return false;
+    }
+    return blacklistMap.containsKey(hostname);
+  }
+
+  private void addToBlackList(NodeId nodeId) {
+    String host = nodeId.getHost();
+    Set<NodeId> nodes;
+    
+    if (!blacklistMap.containsKey(host)) {
+      nodes = new HashSet<NodeId>();
+      blacklistMap.put(host, nodes);
+    } else {
+      nodes = blacklistMap.get(host);
+    }
+    
+    if (!nodes.contains(nodeId)) {
+      nodes.add(nodeId);
+    }
+  }
+  
+  // TODO: Currently, un-blacklisting feature is not supported.
+  /*
+  private void removeFromBlackList(NodeId nodeId) {
+    String host = nodeId.getHost();
+    if (blacklistMap.containsKey(host)) {
+      ArrayList<NodeId> nodes = blacklistMap.get(host);
+      nodes.remove(nodeId);
+    }
+  }
+  */
+
+  public void handle(AMNodeEvent rEvent) {
+    // No synchronization required until there's multiple dispatchers.
+    NodeId nodeId = rEvent.getNodeId();
+    switch (rEvent.getType()) {
+    case N_NODE_WAS_BLACKLISTED:
+   // When moving away from IGNORE_BLACKLISTING state, nodes will send out blacklisted events. These need to be ignored.
+      addToBlackList(nodeId);
+      computeIgnoreBlacklisting();
+      break;
+    case N_NODE_COUNT_UPDATED:
+      AMNodeEventNodeCountUpdated event = (AMNodeEventNodeCountUpdated) rEvent;
+      numClusterNodes = event.getNodeCount();
+      computeIgnoreBlacklisting();
+      break;
+    default:
+      nodeMap.get(nodeId).handle(rEvent);
+    }
+  }
+
+  // May be incorrect if there's multiple NodeManagers running on a single host.
+  // knownNodeCount is based on node managers, not hosts. blacklisting is
+  // currently based on hosts.
+  protected void computeIgnoreBlacklisting() {
+    
+    boolean stateChanged = false;
+    
+    if (!nodeBlacklistingEnabled) {
+      return;
+    }
+    if (blacklistDisablePercent != -1) {
+      if (numClusterNodes == 0) {
+        LOG.info("KnownNode Count at 0. Not computing ignoreBlacklisting");
+        return;
+      }
+      int val = (int) ((float) blacklistMap.size() / numClusterNodes * 100);
+      if (val >= blacklistDisablePercent) {
+        if (ignoreBlacklisting == false) {
+          ignoreBlacklisting = true;
+          LOG.info("Ignore Blacklisting set to true. Known: " + numClusterNodes
+              + ", Blacklisted: " + blacklistMap.size());
+          stateChanged = true;
+        }
+      } else {
+        if (ignoreBlacklisting == true) {
+          ignoreBlacklisting = false;
+          LOG.info("Ignore blacklisting set to false. Known: "
+              + numClusterNodes + ", Blacklisted: " + blacklistMap.size());
+          stateChanged = true;
+        }
+      }
+    }
+
+    if (stateChanged) {
+      sendIngoreBlacklistingStateToNodes();
+    }
+  }
+
+  private void sendIngoreBlacklistingStateToNodes() {
+    AMNodeEventType eventType =
+        ignoreBlacklisting ? AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED
+        : AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED;
+    for (NodeId nodeId : nodeMap.keySet()) {
+      sendEvent(new AMNodeEvent(nodeId, eventType));
+    }
+  }
+
+  public AMNode get(NodeId nodeId) {
+    return nodeMap.get(nodeId);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void sendEvent(Event<?> event) {
+    this.eventHandler.handle(event);
+  }
+
+  public int size() {
+    return nodeMap.size();
+  }
+
+  @Private
+  @VisibleForTesting
+  public boolean isBlacklistingIgnored() {
+    return this.ignoreBlacklisting;
+  }
+}
\ No newline at end of file

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeState.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeState.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeState.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeState.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+public enum AMNodeState {
+  ACTIVE,
+  FORCED_ACTIVE,
+  BLACKLISTED,
+  UNHEALTHY,
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeState.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/package-info.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/package-info.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/package-info.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/package-info.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.tez.dag.app.rm;
+import org.apache.hadoop.classification.InterfaceAudience;

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/package-info.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/ClientHSPolicyProvider.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/ClientHSPolicyProvider.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/ClientHSPolicyProvider.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/ClientHSPolicyProvider.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.app.security.authorize;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.v2.api.HSClientProtocolPB;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.Service;
+
+/**
+ * {@link PolicyProvider} for YARN MapReduce protocols.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ClientHSPolicyProvider extends PolicyProvider {
+  
+  private static final Service[] mrHSServices = 
+      new Service[] {
+    new Service(
+        JHAdminConfig.MR_HS_SECURITY_SERVICE_AUTHORIZATION,
+        HSClientProtocolPB.class)
+  };
+
+  @Override
+  public Service[] getServices() {
+    return mrHSServices;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/ClientHSPolicyProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/MRAMPolicyProvider.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/MRAMPolicyProvider.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/MRAMPolicyProvider.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/MRAMPolicyProvider.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.app.security.authorize;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.Service;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+/**
+ * {@link PolicyProvider} for YARN MapReduce protocols.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class MRAMPolicyProvider extends PolicyProvider {
+  
+  private static final Service[] mapReduceApplicationMasterServices = 
+      new Service[] {
+    new Service(
+        MRJobConfig.MR_AM_SECURITY_SERVICE_AUTHORIZATION_TASK_UMBILICAL,
+        TaskUmbilicalProtocol.class),
+    new Service(
+        MRJobConfig.MR_AM_SECURITY_SERVICE_AUTHORIZATION_CLIENT,
+        MRClientProtocolPB.class)
+  };
+
+  @Override
+  public Service[] getServices() {
+    return mapReduceApplicationMasterServices;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/MRAMPolicyProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/package-info.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/package-info.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/package-info.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/package-info.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.tez.dag.app.security.authorize;
+import org.apache.hadoop.classification.InterfaceAudience;

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/package-info.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DataStatistics.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DataStatistics.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DataStatistics.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DataStatistics.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,78 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.speculate;
+
+public class DataStatistics {
+  private int count = 0;
+  private double sum = 0;
+  private double sumSquares = 0;
+
+  public DataStatistics() {
+  }
+
+  public DataStatistics(double initNum) {
+    this.count = 1;
+    this.sum = initNum;
+    this.sumSquares = initNum * initNum;
+  }
+
+  public synchronized void add(double newNum) {
+    this.count++;
+    this.sum += newNum;
+    this.sumSquares += newNum * newNum;
+  }
+
+  public synchronized void updateStatistics(double old, double update) {
+	this.sum += update - old;
+	this.sumSquares += (update * update) - (old * old);
+  }
+
+  public synchronized double mean() {
+    return count == 0 ? 0.0 : sum/count;
+  }
+
+  public synchronized double var() {
+    // E(X^2) - E(X)^2
+    if (count <= 1) {
+      return 0.0;
+    }
+    double mean = mean();
+    return Math.max((sumSquares/count) - mean * mean, 0.0d);
+  }
+
+  public synchronized double std() {
+    return Math.sqrt(this.var());
+  }
+
+  public synchronized double outlier(float sigma) {
+    if (count != 0.0) {
+      return mean() + std() * sigma;
+    }
+
+    return 0.0;
+  }
+
+  public synchronized double count() {
+    return count;
+  }
+
+  public String toString() {
+    return "DataStatistics: count is " + count + ", sum is " + sum +
+    ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std();
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DataStatistics.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,507 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.speculate;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.dag.api.client.impl.TezBuilderUtils;
+import org.apache.tez.dag.api.records.TaskAttemptState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventType;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+import org.apache.tez.engine.records.TezVertexID;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+// FIXME does not handle multiple vertices
+public class DefaultSpeculator extends AbstractService implements
+    Speculator {
+
+  private static final long ON_SCHEDULE = Long.MIN_VALUE;
+  private static final long ALREADY_SPECULATING = Long.MIN_VALUE + 1;
+  private static final long TOO_NEW = Long.MIN_VALUE + 2;
+  private static final long PROGRESS_IS_GOOD = Long.MIN_VALUE + 3;
+  private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
+  private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;
+
+  private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L * 1L;
+  private static final long SOONEST_RETRY_AFTER_SPECULATE = 1000L * 15L;
+
+  private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1;
+  private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01;
+  private static final int  MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10;
+
+  private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class);
+
+  private final ConcurrentMap<TezTaskID, Boolean> runningTasks
+      = new ConcurrentHashMap<TezTaskID, Boolean>();
+
+  private final Map<Task, AtomicBoolean> pendingSpeculations
+      = new ConcurrentHashMap<Task, AtomicBoolean>();
+
+  // These are the current needs, not the initial needs.  For each job, these
+  //  record the number of attempts that exist and that are actively
+  //  waiting for a container [as opposed to running or finished]
+  // TODO handle multiple dags
+  private final ConcurrentMap<TezVertexID, AtomicInteger> vertexContainerNeeds
+      = new ConcurrentHashMap<TezVertexID, AtomicInteger>();
+
+  private final Set<TezTaskID> mayHaveSpeculated = new HashSet<TezTaskID>();
+
+  private final Configuration conf;
+  private AppContext context;
+  private Thread speculationBackgroundThread = null;
+  private BlockingQueue<SpeculatorEvent> eventQueue
+      = new LinkedBlockingQueue<SpeculatorEvent>();
+  private TaskRuntimeEstimator estimator;
+
+  private BlockingQueue<Object> scanControl = new LinkedBlockingQueue<Object>();
+
+  private final Clock clock;
+
+  private final EventHandler<TaskEvent> eventHandler;
+
+  public DefaultSpeculator(Configuration conf, AppContext context) {
+    this(conf, context, context.getClock());
+  }
+
+  public DefaultSpeculator(Configuration conf, AppContext context, Clock clock) {
+    this(conf, context, getEstimator(conf, context), clock);
+  }
+  
+  static private TaskRuntimeEstimator getEstimator
+      (Configuration conf, AppContext context) {
+    TaskRuntimeEstimator estimator;
+    
+    try {
+      // "yarn.mapreduce.job.task.runtime.estimator.class"
+      Class<? extends TaskRuntimeEstimator> estimatorClass
+          = conf.getClass(MRJobConfig.MR_AM_TASK_ESTIMATOR,
+                          LegacyTaskRuntimeEstimator.class,
+                          TaskRuntimeEstimator.class);
+
+      Constructor<? extends TaskRuntimeEstimator> estimatorConstructor
+          = estimatorClass.getConstructor();
+
+      estimator = estimatorConstructor.newInstance();
+
+      estimator.contextualize(conf, context);
+    } catch (InstantiationException ex) {
+      LOG.error("Can't make a speculation runtime extimator", ex);
+      throw new YarnException(ex);
+    } catch (IllegalAccessException ex) {
+      LOG.error("Can't make a speculation runtime extimator", ex);
+      throw new YarnException(ex);
+    } catch (InvocationTargetException ex) {
+      LOG.error("Can't make a speculation runtime extimator", ex);
+      throw new YarnException(ex);
+    } catch (NoSuchMethodException ex) {
+      LOG.error("Can't make a speculation runtime extimator", ex);
+      throw new YarnException(ex);
+    }
+    
+  return estimator;
+  }
+
+  // This constructor is designed to be called by other constructors.
+  //  However, it's public because we do use it in the test cases.
+  // Normally we figure out our own estimator.
+  public DefaultSpeculator
+      (Configuration conf, AppContext context,
+       TaskRuntimeEstimator estimator, Clock clock) {
+    super(DefaultSpeculator.class.getName());
+
+    this.conf = conf;
+    this.context = context;
+    this.estimator = estimator;
+    this.clock = clock;
+    this.eventHandler = context.getEventHandler();
+  }
+
+/*   *************************************************************    */
+
+  // This is the task-mongering that creates the two new threads -- one for
+  //  processing events from the event queue and one for periodically
+  //  looking for speculation opportunities
+
+  @Override
+  public void start() {
+    Runnable speculationBackgroundCore
+        = new Runnable() {
+            @Override
+            public void run() {
+              while (!Thread.currentThread().isInterrupted()) {
+                long backgroundRunStartTime = clock.getTime();
+                try {
+                  int speculations = computeSpeculations();
+                  long mininumRecomp
+                      = speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE
+                                         : SOONEST_RETRY_AFTER_NO_SPECULATE;
+
+                  long wait = Math.max(mininumRecomp,
+                        clock.getTime() - backgroundRunStartTime);
+
+                  if (speculations > 0) {
+                    LOG.info("We launched " + speculations
+                        + " speculations.  Sleeping " + wait + " milliseconds.");
+                  }
+
+                  Object pollResult
+                      = scanControl.poll(wait, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                  LOG.error("Background thread returning, interrupted : " + e);
+                  e.printStackTrace(System.out);
+                  return;
+                }
+              }
+            }
+          };
+    speculationBackgroundThread = new Thread
+        (speculationBackgroundCore, "DefaultSpeculator background processing");
+    speculationBackgroundThread.start();
+
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    // this could be called before background thread is established
+    if (speculationBackgroundThread != null) {
+      speculationBackgroundThread.interrupt();
+    }
+    super.stop();
+  }
+
+  @Override
+  public void handleAttempt(TaskAttemptStatus status) {
+    long timestamp = clock.getTime();
+    statusUpdate(status, timestamp);
+  }
+
+  // This section is not part of the Speculator interface; it's used only for
+  //  testing
+  public boolean eventQueueEmpty() {
+    return eventQueue.isEmpty();
+  }
+
+  // This interface is intended to be used only for test cases.
+  public void scanForSpeculations() {
+    LOG.info("We got asked to run a debug speculation scan.");
+    // debug
+    System.out.println("We got asked to run a debug speculation scan.");
+    System.out.println("There are " + scanControl.size()
+        + " events stacked already.");
+    scanControl.add(new Object());
+    Thread.yield();
+  }
+
+
+/*   *************************************************************    */
+
+  // This section contains the code that gets run for a SpeculatorEvent
+
+  private AtomicInteger containerNeed(TezTaskID taskID) {
+    TezVertexID vId = taskID.getVertexID();
+
+    AtomicInteger result = vertexContainerNeeds.get(vId);
+
+    if (result == null) {
+      vertexContainerNeeds.putIfAbsent(vId, new AtomicInteger(0));
+      result = vertexContainerNeeds.get(vId);
+    }
+
+    return result;
+  }
+
+  private synchronized void processSpeculatorEvent(SpeculatorEvent event) {
+    switch (event.getType()) {
+      case ATTEMPT_STATUS_UPDATE:
+        statusUpdate(event.getReportedStatus(), event.getTimestamp());
+        break;
+
+      case TASK_CONTAINER_NEED_UPDATE:
+      {
+        AtomicInteger need = containerNeed(event.getTaskID());
+        need.addAndGet(event.containersNeededChange());
+        break;
+      }
+
+      case ATTEMPT_START:
+      {
+        LOG.info("ATTEMPT_START " + event.getTaskID());
+        estimator.enrollAttempt
+            (event.getReportedStatus(), event.getTimestamp());
+        break;
+      }
+      
+      case JOB_CREATE:
+      {
+        LOG.info("JOB_CREATE " + event.getJobID());
+        estimator.contextualize(getConfig(), context);
+        break;
+      }
+    }
+  }
+
+  /**
+   * Absorbs one TaskAttemptStatus
+   *
+   * @param reportedStatus the status report that we got from a task attempt
+   *        that we want to fold into the speculation data for this job
+   * @param timestamp the time this status corresponds to.  This matters
+   *        because statuses contain progress.
+   */
+  protected void statusUpdate(TaskAttemptStatus reportedStatus, long timestamp) {
+
+    String stateString = reportedStatus.taskState.toString();
+
+    TezTaskAttemptID attemptID = reportedStatus.id;
+    TezTaskID taskID = attemptID.getTaskID();
+    DAG job = context.getDAG();
+
+    if (job == null) {
+      return;
+    }
+
+    Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
+
+    if (task == null) {
+      return;
+    }
+
+    estimator.updateAttempt(reportedStatus, timestamp);
+
+    // If the task is already known to be speculation-bait, don't do anything
+    if (pendingSpeculations.get(task) != null) {
+      if (pendingSpeculations.get(task).get()) {
+        return;
+      }
+    }
+
+    if (stateString.equals(TaskAttemptState.RUNNING.name())) {
+      runningTasks.putIfAbsent(taskID, Boolean.TRUE);
+    } else {
+      runningTasks.remove(taskID, Boolean.TRUE);
+    }
+  }
+
+/*   *************************************************************    */
+
+// This is the code section that runs periodically and adds speculations for
+//  those jobs that need them.
+
+
+  // This can return a few magic values for tasks that shouldn't speculate:
+  //  returns ON_SCHEDULE if thresholdRuntime(taskID) says that we should not
+  //     considering speculating this task
+  //  returns ALREADY_SPECULATING if that is true.  This has priority.
+  //  returns TOO_NEW if our companion task hasn't gotten any information
+  //  returns PROGRESS_IS_GOOD if the task is sailing through
+  //  returns NOT_RUNNING if the task is not running
+  //
+  // All of these values are negative.  Any value that should be allowed to
+  //  speculate is 0 or positive.
+  private long speculationValue(TezTaskID taskID, long now) {
+    DAG job = context.getDAG();
+    Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
+    Map<TezTaskAttemptID, TaskAttempt> attempts = task.getAttempts();
+    long acceptableRuntime = Long.MIN_VALUE;
+    long result = Long.MIN_VALUE;
+
+    if (!mayHaveSpeculated.contains(taskID)) {
+      acceptableRuntime = estimator.thresholdRuntime(taskID);
+      if (acceptableRuntime == Long.MAX_VALUE) {
+        return ON_SCHEDULE;
+      }
+    }
+
+    TezTaskAttemptID runningTaskAttemptID = null;
+
+    int numberRunningAttempts = 0;
+
+    for (TaskAttempt taskAttempt : attempts.values()) {
+      if (taskAttempt.getState() == TaskAttemptState.RUNNING
+          || taskAttempt.getState() == TaskAttemptState.STARTING) {
+        if (++numberRunningAttempts > 1) {
+          return ALREADY_SPECULATING;
+        }
+        runningTaskAttemptID = taskAttempt.getID();
+
+        long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);
+
+        long taskAttemptStartTime
+            = estimator.attemptEnrolledTime(runningTaskAttemptID);
+        if (taskAttemptStartTime > now) {
+          // This background process ran before we could process the task
+          //  attempt status change that chronicles the attempt start
+          return TOO_NEW;
+        }
+
+        long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;
+
+        long estimatedReplacementEndTime
+            = now + estimator.estimatedNewAttemptRuntime(taskID);
+
+        if (estimatedEndTime < now) {
+          return PROGRESS_IS_GOOD;
+        }
+
+        if (estimatedReplacementEndTime >= estimatedEndTime) {
+          return TOO_LATE_TO_SPECULATE;
+        }
+
+        result = estimatedEndTime - estimatedReplacementEndTime;
+      }
+    }
+
+    // If we are here, there's at most one task attempt.
+    if (numberRunningAttempts == 0) {
+      return NOT_RUNNING;
+    }
+
+
+
+    if (acceptableRuntime == Long.MIN_VALUE) {
+      acceptableRuntime = estimator.thresholdRuntime(taskID);
+      if (acceptableRuntime == Long.MAX_VALUE) {
+        return ON_SCHEDULE;
+      }
+    }
+
+    return result;
+  }
+
+  //Add attempt to a given Task.
+  protected void addSpeculativeAttempt(TezTaskID taskID) {
+    LOG.info
+        ("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
+    eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_ADD_SPEC_ATTEMPT));
+    mayHaveSpeculated.add(taskID);
+  }
+
+  @Override
+  public void handle(SpeculatorEvent event) {
+    processSpeculatorEvent(event);
+  }
+
+
+  private int maybeScheduleAMapSpeculation() {
+    return maybeScheduleASpeculation(0);
+  }
+
+  private int maybeScheduleAReduceSpeculation() {
+    return maybeScheduleASpeculation(1);
+  }
+
+  private int maybeScheduleASpeculation(int vertexId) {
+    int successes = 0;
+
+    long now = clock.getTime();
+
+    // FIXME this needs to be fixed for a DAG
+    // TODO handle multiple dags
+    for (ConcurrentMap.Entry<TezVertexID, AtomicInteger> vertexEntry :
+        vertexContainerNeeds.entrySet()) {
+      // This race conditon is okay.  If we skip a speculation attempt we
+      //  should have tried because the event that lowers the number of
+      //  containers needed to zero hasn't come through, it will next time.
+      // Also, if we miss the fact that the number of containers needed was
+      //  zero but increased due to a failure it's not too bad to launch one
+      //  container prematurely.
+      if (vertexEntry.getValue().get() > 0) {
+        continue;
+      }
+
+      int numberSpeculationsAlready = 0;
+      int numberRunningTasks = 0;
+
+      // loop through the tasks of the kind
+      DAG job = context.getDAG();
+
+      Map<TezTaskID, Task> tasks = 
+          job.getVertex(TezBuilderUtils.newVertexID(job.getID(), vertexId)).getTasks();
+
+      int numberAllowedSpeculativeTasks
+          = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
+                           PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());
+
+      TezTaskID bestTaskID = null;
+      long bestSpeculationValue = -1L;
+
+      // this loop is potentially pricey.
+      // TODO track the tasks that are potentially worth looking at
+      for (Map.Entry<TezTaskID, Task> taskEntry : tasks.entrySet()) {
+        long mySpeculationValue = speculationValue(taskEntry.getKey(), now);
+
+        if (mySpeculationValue == ALREADY_SPECULATING) {
+          ++numberSpeculationsAlready;
+        }
+
+        if (mySpeculationValue != NOT_RUNNING) {
+          ++numberRunningTasks;
+        }
+
+        if (mySpeculationValue > bestSpeculationValue) {
+          bestTaskID = taskEntry.getKey();
+          bestSpeculationValue = mySpeculationValue;
+        }
+      }
+      numberAllowedSpeculativeTasks
+          = (int) Math.max(numberAllowedSpeculativeTasks,
+                           PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);
+
+      // If we found a speculation target, fire it off
+      if (bestTaskID != null
+          && numberAllowedSpeculativeTasks > numberSpeculationsAlready) {
+        addSpeculativeAttempt(bestTaskID);
+        ++successes;
+      }
+    }
+
+    return successes;
+  }
+
+  private int computeSpeculations() {
+    // We'll try to issue one map and one reduce speculation per job per run
+    return maybeScheduleAMapSpeculation() + maybeScheduleAReduceSpeculation();
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,192 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.speculate;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+/**
+ * This estimator exponentially smooths the rate of progress versus wallclock
+ * time.  Conceivably we could write an estimator that smooths time per
+ * unit progress, and get different results.
+ */
+public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase {
+
+  private final ConcurrentMap<TezTaskAttemptID, AtomicReference<EstimateVector>> estimates
+      = new ConcurrentHashMap<TezTaskAttemptID, AtomicReference<EstimateVector>>();
+
+  private SmoothedValue smoothedValue;
+
+  private long lambda;
+
+  public enum SmoothedValue {
+    RATE, TIME_PER_UNIT_PROGRESS
+  }
+
+  ExponentiallySmoothedTaskRuntimeEstimator
+      (long lambda, SmoothedValue smoothedValue) {
+    super();
+    this.smoothedValue = smoothedValue;
+    this.lambda = lambda;
+  }
+
+  public ExponentiallySmoothedTaskRuntimeEstimator() {
+    super();
+  }
+
+  // immutable
+  private class EstimateVector {
+    final double value;
+    final float basedOnProgress;
+    final long atTime;
+
+    EstimateVector(double value, float basedOnProgress, long atTime) {
+      this.value = value;
+      this.basedOnProgress = basedOnProgress;
+      this.atTime = atTime;
+    }
+
+    EstimateVector incorporate(float newProgress, long newAtTime) {
+      if (newAtTime <= atTime || newProgress < basedOnProgress) {
+        return this;
+      }
+
+      double oldWeighting
+          = value < 0.0
+              ? 0.0 : Math.exp(((double) (newAtTime - atTime)) / lambda);
+
+      double newRead = (newProgress - basedOnProgress) / (newAtTime - atTime);
+
+      if (smoothedValue == SmoothedValue.TIME_PER_UNIT_PROGRESS) {
+        newRead = 1.0 / newRead;
+      }
+
+      return new EstimateVector
+          (value * oldWeighting + newRead * (1.0 - oldWeighting),
+           newProgress, newAtTime);
+    }
+  }
+
+  private void incorporateReading
+      (TezTaskAttemptID attemptID, float newProgress, long newTime) {
+    //TODO: Refactor this method, it seems more complicated than necessary.
+    AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
+
+    if (vectorRef == null) {
+      estimates.putIfAbsent(attemptID, new AtomicReference<EstimateVector>(null));
+      incorporateReading(attemptID, newProgress, newTime);
+      return;
+    }
+
+    EstimateVector oldVector = vectorRef.get();
+
+    if (oldVector == null) {
+      if (vectorRef.compareAndSet(null,
+             new EstimateVector(-1.0, 0.0F, Long.MIN_VALUE))) {
+        return;
+      }
+
+      incorporateReading(attemptID, newProgress, newTime);
+      return;
+    }
+
+    while (!vectorRef.compareAndSet
+            (oldVector, oldVector.incorporate(newProgress, newTime))) {
+      oldVector = vectorRef.get();
+    }
+  }
+
+  private EstimateVector getEstimateVector(TezTaskAttemptID attemptID) {
+    AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
+
+    if (vectorRef == null) {
+      return null;
+    }
+
+    return vectorRef.get();
+  }
+
+  @Override
+  public void contextualize(Configuration conf, AppContext context) {
+    super.contextualize(conf, context);
+
+    lambda
+        = conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS,
+            MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS);
+    smoothedValue
+        = conf.getBoolean(MRJobConfig.MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE, true)
+            ? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS;
+  }
+
+  @Override
+  public long estimatedRuntime(TezTaskAttemptID id) {
+    Long startTime = (Long) startTimes.get(id);
+
+    if (startTime == null) {
+      return -1L;
+    }
+
+    EstimateVector vector = getEstimateVector(id);
+
+    if (vector == null) {
+      return -1L;
+    }
+
+    long sunkTime = vector.atTime - startTime;
+
+    double value = vector.value;
+    float progress = vector.basedOnProgress;
+
+    if (value == 0) {
+      return -1L;
+    }
+
+    double rate = smoothedValue == SmoothedValue.RATE ? value : 1.0 / value;
+
+    if (rate == 0.0) {
+      return -1L;
+    }
+
+    double remainingTime = (1.0 - progress) / rate;
+
+    return sunkTime + (long)remainingTime;
+  }
+
+  @Override
+  public long runtimeEstimateVariance(TezTaskAttemptID id) {
+    return -1L;
+  }
+
+  @Override
+  public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+    super.updateAttempt(status, timestamp);
+    TezTaskAttemptID attemptID = status.id;
+
+    float progress = status.progress;
+
+    incorporateReading(attemptID, progress, timestamp);
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,145 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.speculate;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.records.TaskAttemptState;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+
+public class LegacyTaskRuntimeEstimator extends StartEndTimesBase {
+
+  private final Map<TaskAttempt, AtomicLong> attemptRuntimeEstimates
+      = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
+  private final ConcurrentHashMap<TaskAttempt, AtomicLong> attemptRuntimeEstimateVariances
+      = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
+
+  @Override
+  public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+    super.updateAttempt(status, timestamp);
+    
+
+    TezTaskAttemptID attemptID = status.id;
+    TezTaskID taskID = attemptID.getTaskID();
+    DAG job = context.getDAG();
+
+    if (job == null) {
+      return;
+    }
+
+    Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
+
+    if (task == null) {
+      return;
+    }
+
+    TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+    if (taskAttempt == null) {
+      return;
+    }
+
+    Long boxedStart = (Long) startTimes.get(attemptID);
+    long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
+
+    // We need to do two things.
+    //  1: If this is a completion, we accumulate statistics in the superclass
+    //  2: If this is not a completion, we learn more about it.
+
+    // This is not a completion, but we're cooking.
+    //
+    if (taskAttempt.getState() == TaskAttemptState.RUNNING) {
+      // See if this task is already in the registry
+      AtomicLong estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
+      AtomicLong estimateVarianceContainer
+          = attemptRuntimeEstimateVariances.get(taskAttempt);
+
+      if (estimateContainer == null) {
+        if (attemptRuntimeEstimates.get(taskAttempt) == null) {
+          attemptRuntimeEstimates.put(taskAttempt, new AtomicLong());
+
+          estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
+        }
+      }
+
+      if (estimateVarianceContainer == null) {
+        attemptRuntimeEstimateVariances.putIfAbsent(taskAttempt, new AtomicLong());
+        estimateVarianceContainer = attemptRuntimeEstimateVariances.get(taskAttempt);
+      }
+
+
+      long estimate = -1;
+      long varianceEstimate = -1;
+
+      // This code assumes that we'll never consider starting a third
+      //  speculative task attempt if two are already running for this task
+      if (start > 0 && timestamp > start) {
+        estimate = (long) ((timestamp - start) / Math.max(0.0001, status.progress));
+        varianceEstimate = (long) (estimate * status.progress / 10);
+      }
+      if (estimateContainer != null) {
+        estimateContainer.set(estimate);
+      }
+      if (estimateVarianceContainer != null) {
+        estimateVarianceContainer.set(varianceEstimate);
+      }
+    }
+  }
+
+  private long storedPerAttemptValue
+       (Map<TaskAttempt, AtomicLong> data, TezTaskAttemptID attemptID) {
+    TezTaskID taskID = attemptID.getTaskID();
+    DAG job = context.getDAG();
+
+    Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
+
+    if (task == null) {
+      return -1L;
+    }
+
+    TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+    if (taskAttempt == null) {
+      return -1L;
+    }
+
+    AtomicLong estimate = data.get(taskAttempt);
+
+    return estimate == null ? -1L : estimate.get();
+
+  }
+
+  @Override
+  public long estimatedRuntime(TezTaskAttemptID attemptID) {
+    return storedPerAttemptValue(attemptRuntimeEstimates, attemptID);
+  }
+
+  @Override
+  public long runtimeEstimateVariance(TezTaskAttemptID attemptID) {
+    return storedPerAttemptValue(attemptRuntimeEstimateVariances, attemptID);
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,72 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.speculate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+
+
+/*
+ * This class is provided solely as an exemplae of the values that mean
+ *  that nothing needs to be computed.  It's not currently used.
+ */
+public class NullTaskRuntimesEngine implements TaskRuntimeEstimator {
+  @Override
+  public void enrollAttempt(TaskAttemptStatus status, long timestamp) {
+    // no code
+  }
+
+  @Override
+  public long attemptEnrolledTime(TezTaskAttemptID attemptID) {
+    return Long.MAX_VALUE;
+  }
+
+  @Override
+  public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+    // no code
+  }
+
+  @Override
+  public void contextualize(Configuration conf, AppContext context) {
+    // no code
+  }
+
+  @Override
+  public long thresholdRuntime(TezTaskID id) {
+    return Long.MAX_VALUE;
+  }
+
+  @Override
+  public long estimatedRuntime(TezTaskAttemptID id) {
+    return -1L;
+  }
+  @Override
+  public long estimatedNewAttemptRuntime(TezTaskID id) {
+    return -1L;
+  }
+
+  @Override
+  public long runtimeEstimateVariance(TezTaskAttemptID id) {
+    return -1L;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.speculate;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+
+/**
+ * Speculator component. Task Attempts' status updates are sent to this
+ * component. Concrete implementation runs the speculative algorithm and
+ * sends the TaskEventType.T_ADD_ATTEMPT.
+ *
+ * An implementation also has to arrange for the jobs to be scanned from
+ * time to time, to launch the speculations.
+ */
+public interface Speculator
+              extends EventHandler<SpeculatorEvent> {
+
+  enum EventType {
+    ATTEMPT_STATUS_UPDATE,
+    ATTEMPT_START,
+    TASK_CONTAINER_NEED_UPDATE,
+    JOB_CREATE
+  }
+
+  // This will be implemented if we go to a model where the events are
+  //  processed within the TaskAttempts' state transitions' code.
+  public void handleAttempt(TaskAttemptStatus status);
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,86 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.speculate;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+
+public class SpeculatorEvent extends AbstractEvent<Speculator.EventType> {
+
+  // valid for ATTEMPT_STATUS_UPDATE
+  private TaskAttemptStatus reportedStatus;
+
+  // valid for TASK_CONTAINER_NEED_UPDATE
+  private TezTaskID taskID;
+  private int containersNeededChange;
+  
+  // valid for CREATE_JOB
+  private TezDAGID dagId;
+
+  public SpeculatorEvent(TezDAGID dagId, long timestamp) {
+    super(Speculator.EventType.JOB_CREATE, timestamp);
+    this.dagId = dagId;
+  }
+
+  public SpeculatorEvent(TaskAttemptStatus reportedStatus, long timestamp) {
+    super(Speculator.EventType.ATTEMPT_STATUS_UPDATE, timestamp);
+    this.reportedStatus = reportedStatus;
+  }
+
+  public SpeculatorEvent(TezTaskAttemptID attemptID, boolean flag, long timestamp) {
+    super(Speculator.EventType.ATTEMPT_START, timestamp);
+    this.reportedStatus = new TaskAttemptStatus();
+    this.reportedStatus.id = attemptID;
+    this.taskID = attemptID.getTaskID();
+  }
+
+  /*
+   * This c'tor creates a TASK_CONTAINER_NEED_UPDATE event .
+   * We send a +1 event when a task enters a state where it wants a container,
+   *  and a -1 event when it either gets one or withdraws the request.
+   * The per job sum of all these events is the number of containers requested
+   *  but not granted.  The intent is that we only do speculations when the
+   *  speculation wouldn't compete for containers with tasks which need
+   *  to be run.
+   */
+  public SpeculatorEvent(TezTaskID taskID, int containersNeededChange) {
+    super(Speculator.EventType.TASK_CONTAINER_NEED_UPDATE);
+    this.taskID = taskID;
+    this.containersNeededChange = containersNeededChange;
+  }
+
+  public TaskAttemptStatus getReportedStatus() {
+    return reportedStatus;
+  }
+
+  public int containersNeededChange() {
+    return containersNeededChange;
+  }
+
+  public TezTaskID getTaskID() {
+    return taskID;
+  }
+  
+  public TezDAGID getJobID() {
+    return dagId;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,195 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.speculate;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.records.TaskAttemptState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+import org.apache.tez.engine.records.TezVertexID;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+abstract class StartEndTimesBase<V> implements TaskRuntimeEstimator {
+  static final float MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE
+      = 0.05F;
+  static final int MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
+      = 1;
+
+  protected Configuration conf = null;
+  protected AppContext context = null;
+
+  protected final Map<TezTaskAttemptID, Long> startTimes
+      = new ConcurrentHashMap<TezTaskAttemptID, Long>();
+
+  // XXXX This class design assumes that the contents of AppContext.getAllJobs
+  //   never changes.  Is that right?
+  //
+  // This assumption comes in in several places, mostly in data structure that
+  //   can grow without limit if a AppContext gets new Job's when the old ones
+  //   run out.  Also, these mapper statistics blocks won't cover the Job's
+  //   we don't know about.
+  // TODO handle multiple DAGs
+  protected final Map<TezVertexID, DataStatistics> vertexStatistics
+      = new HashMap<TezVertexID, DataStatistics>();
+
+  private float slowTaskRelativeTresholds = 0f;
+
+  protected final Set<Task> doneTasks = new HashSet<Task>();
+
+  @Override
+  public void enrollAttempt(TaskAttemptStatus status, long timestamp) {
+    startTimes.put(status.id,timestamp);
+  }
+
+  @Override
+  public long attemptEnrolledTime(TezTaskAttemptID attemptID) {
+    Long result = startTimes.get(attemptID);
+
+    return result == null ? Long.MAX_VALUE : result;
+  }
+
+
+  @Override
+  public void contextualize(Configuration conf, AppContext context) {
+    this.conf = conf;
+    this.context = context;
+
+
+    final DAG dag = context.getDAG();
+    for (Entry<TezVertexID, Vertex> entry: dag.getVertices().entrySet()) {
+      vertexStatistics.put(entry.getKey(), new DataStatistics());
+      slowTaskRelativeTresholds =
+          conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD, 1.0f);
+    }
+  }
+
+  protected DataStatistics dataStatisticsForTask(TezTaskID taskID) {
+    DAG dag = context.getDAG();
+
+    if (dag == null) {
+      return null;
+    }
+
+    Task task = dag.getVertex(taskID.getVertexID()).getTask(taskID);
+
+    if (task == null) {
+      return null;
+    }
+
+    return vertexStatistics.get(taskID.getVertexID());
+  }
+
+  @Override
+  public long thresholdRuntime(TezTaskID taskID) {
+    DAG job = context.getDAG();
+
+    DataStatistics statistics = dataStatisticsForTask(taskID);
+
+    Vertex v = job.getVertex(taskID.getVertexID());
+    int completedTasksOfType = v.getCompletedTasks();
+    int totalTasksOfType = v.getTotalTasks();
+    
+    if (completedTasksOfType < MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
+        || (((float)completedTasksOfType) / totalTasksOfType)
+              < MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE ) {
+      return Long.MAX_VALUE;
+    }
+
+    long result =  statistics == null
+        ? Long.MAX_VALUE
+        : (long)statistics.outlier(slowTaskRelativeTresholds);
+    return result;
+  }
+
+  @Override
+  public long estimatedNewAttemptRuntime(TezTaskID id) {
+    DataStatistics statistics = dataStatisticsForTask(id);
+
+    if (statistics == null) {
+      return -1L;
+    }
+
+    return (long)statistics.mean();
+  }
+
+  @Override
+  public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+
+    TezTaskAttemptID attemptID = status.id;
+    TezTaskID taskID = attemptID.getTaskID();
+    DAG job = context.getDAG();
+
+    if (job == null) {
+      return;
+    }
+
+    Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
+
+    if (task == null) {
+      return;
+    }
+
+    Long boxedStart = startTimes.get(attemptID);
+    long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
+    
+    TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+    if (taskAttempt.getState() == TaskAttemptState.SUCCEEDED) {
+      boolean isNew = false;
+      // is this  a new success?
+      synchronized (doneTasks) {
+        if (!doneTasks.contains(task)) {
+          doneTasks.add(task);
+          isNew = true;
+        }
+      }
+
+      // It's a new completion
+      // Note that if a task completes twice [because of a previous speculation
+      //  and a race, or a success followed by loss of the machine with the
+      //  local data] we only count the first one.
+      if (isNew) {
+        long finish = timestamp;
+        if (start > 1L && finish > 1L && start <= finish) {
+          long duration = finish - start;
+
+          DataStatistics statistics
+          = dataStatisticsForTask(taskID);
+
+          if (statistics != null) {
+            statistics.add(duration);
+          }
+        }
+      }
+    }
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
------------------------------------------------------------------------------
    svn:eol-style = native