You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC
svn commit: r1469642 [19/36] - in /incubator/tez/branches/TEZ-1: ./
example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/
example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/
tez-common/src/main/ tez-common/src/main/java/ t...
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,363 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.node;
+
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklisted;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventNodeFailed;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+
+public class AMNodeImpl implements AMNode {
+
+ private static final Log LOG = LogFactory.getLog(AMNodeImpl.class);
+
+ private final ReadLock readLock;
+ private final WriteLock writeLock;
+ private final NodeId nodeId;
+ private final AppContext appContext;
+ private final int maxTaskFailuresPerNode;
+ private int numFailedTAs = 0;
+ private int numSuccessfulTAs = 0;
+ private boolean blacklistingEnabled;
+ private boolean ignoreBlacklisting = false;
+
+ @SuppressWarnings("rawtypes")
+ protected EventHandler eventHandler;
+
+ private final List<ContainerId> containers = new LinkedList<ContainerId>();
+
+ //Book-keeping only. In case of Health status change.
+ private final List<ContainerId> pastContainers = new LinkedList<ContainerId>();
+
+
+
+ private final StateMachine<AMNodeState, AMNodeEventType, AMNodeEvent> stateMachine;
+
+ private static StateMachineFactory
+ <AMNodeImpl, AMNodeState, AMNodeEventType, AMNodeEvent>
+ stateMachineFactory =
+ new StateMachineFactory<AMNodeImpl, AMNodeState, AMNodeEventType, AMNodeEvent>(
+ AMNodeState.ACTIVE)
+ // Transitions from ACTIVE state.
+ .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE, AMNodeEventType.N_CONTAINER_ALLOCATED, new ContainerAllocatedTransition())
+ .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE, AMNodeEventType.N_TA_SUCCEEDED, new TaskAttemptSucceededTransition())
+ .addTransition(AMNodeState.ACTIVE, EnumSet.of(AMNodeState.ACTIVE, AMNodeState.BLACKLISTED), AMNodeEventType.N_TA_ENDED, new TaskAttemptFailedTransition())
+ .addTransition(AMNodeState.ACTIVE, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new NodeTurnedUnhealthyTransition())
+ .addTransition(AMNodeState.ACTIVE, EnumSet.of(AMNodeState.ACTIVE, AMNodeState.BLACKLISTED), AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, new IgnoreBlacklistingDisabledTransition())
+ .addTransition(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, new IgnoreBlacklistingStateChangeTransition(true))
+ .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE, AMNodeEventType.N_TURNED_HEALTHY)
+
+ // Transitions from BLACKLISTED state.
+ .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED, AMNodeEventType.N_CONTAINER_ALLOCATED, new ContainerAllocatedWhileBlacklistedTransition())
+ .addTransition(AMNodeState.BLACKLISTED, EnumSet.of(AMNodeState.BLACKLISTED, AMNodeState.ACTIVE), AMNodeEventType.N_TA_SUCCEEDED, new TaskAttemptSucceededWhileBlacklistedTransition())
+ .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED, AMNodeEventType.N_TA_ENDED, new CountFailedTaskAttemptTransition())
+ .addTransition(AMNodeState.BLACKLISTED, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new NodeTurnedUnhealthyTransition())
+ .addTransition(AMNodeState.BLACKLISTED, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, new IgnoreBlacklistingStateChangeTransition(true))
+ .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED, EnumSet.of(AMNodeEventType.N_TURNED_HEALTHY, AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED), new GenericErrorTransition())
+
+ //Transitions from FORCED_ACTIVE state.
+ .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_CONTAINER_ALLOCATED, new ContainerAllocatedTransition())
+ .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_TA_SUCCEEDED, new TaskAttemptSucceededTransition())
+ .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_TA_ENDED, new CountFailedTaskAttemptTransition())
+ .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new NodeTurnedUnhealthyTransition())
+ .addTransition(AMNodeState.FORCED_ACTIVE, EnumSet.of(AMNodeState.BLACKLISTED, AMNodeState.ACTIVE), AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, new IgnoreBlacklistingDisabledTransition())
+ .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, EnumSet.of(AMNodeEventType.N_TURNED_HEALTHY, AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED), new GenericErrorTransition())
+
+ // Transitions from UNHEALTHY state.
+ .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, AMNodeEventType.N_CONTAINER_ALLOCATED, new ContainerAllocatedWhileUnhealthyTransition())
+ .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, EnumSet.of(AMNodeEventType.N_TA_SUCCEEDED, AMNodeEventType.N_TA_ENDED))
+ .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, new IgnoreBlacklistingStateChangeTransition(false))
+ .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, new IgnoreBlacklistingStateChangeTransition(true))
+ .addTransition(AMNodeState.UNHEALTHY, EnumSet.of(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE), AMNodeEventType.N_TURNED_HEALTHY, new NodeTurnedHealthyTransition())
+ .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new GenericErrorTransition())
+
+ .installTopology();
+
+
+ @SuppressWarnings("rawtypes")
+ public AMNodeImpl(NodeId nodeId, int maxTaskFailuresPerNode,
+ EventHandler eventHandler, boolean blacklistingEnabled,
+ AppContext appContext) {
+ ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+ this.readLock = rwLock.readLock();
+ this.writeLock = rwLock.writeLock();
+ this.nodeId = nodeId;
+ this.appContext = appContext;
+ this.eventHandler = eventHandler;
+ this.maxTaskFailuresPerNode = maxTaskFailuresPerNode;
+ this.stateMachine = stateMachineFactory.make(this);
+ // TODO Handle the case where a node is created due to the RM reporting it's
+ // state as UNHEALTHY
+ }
+
+ @Override
+ public NodeId getNodeId() {
+ return this.nodeId;
+ }
+
+ @Override
+ public AMNodeState getState() {
+ this.readLock.lock();
+ try {
+ return this.stateMachine.getCurrentState();
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public List<ContainerId> getContainers() {
+ this.readLock.lock();
+ try {
+ List<ContainerId> cIds = new LinkedList<ContainerId>(this.containers);
+ return cIds;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public void handle(AMNodeEvent event) {
+ this.writeLock.lock();
+ LOG.info("DEBUG: Processing AMNodeEvent " + event.getNodeId()
+ + " of type " + event.getType() + " while in state: " + getState()
+ + ". Event: " + event);
+ try {
+ final AMNodeState oldState = getState();
+ try {
+ stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle event " + event.getType()
+ + " at current state " + oldState + " for NodeId " + this.nodeId, e);
+ // TODO Should this fail the job ?
+ }
+ if (oldState != getState()) {
+ LOG.info("AMNode " + this.nodeId + " transitioned from " + oldState
+ + " to " + getState());
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ protected boolean shouldBlacklistNode() {
+ return blacklistingEnabled && (numFailedTAs >= maxTaskFailuresPerNode);
+ }
+
+ protected void blacklistSelf() {
+ sendEvent(new AMNodeEvent(getNodeId(),
+ AMNodeEventType.N_NODE_WAS_BLACKLISTED));
+ sendEvent(new AMSchedulerEventNodeBlacklisted(getNodeId()));
+ }
+
+ @SuppressWarnings("unchecked")
+ private void sendEvent(Event<?> event) {
+ this.eventHandler.handle(event);
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Start of Transition Classes //
+ //////////////////////////////////////////////////////////////////////////////
+
+ protected static class ContainerAllocatedTransition implements
+ SingleArcTransition<AMNodeImpl, AMNodeEvent> {
+ @Override
+ public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
+ AMNodeEventContainerAllocated event = (AMNodeEventContainerAllocated) nEvent;
+ node.containers.add(event.getContainerId());
+ }
+ }
+
+ protected static class TaskAttemptSucceededTransition implements
+ SingleArcTransition<AMNodeImpl, AMNodeEvent> {
+ @Override
+ public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
+ node.numSuccessfulTAs++;
+ }
+ }
+
+ protected static class TaskAttemptFailedTransition implements
+ MultipleArcTransition<AMNodeImpl, AMNodeEvent, AMNodeState> {
+ @Override
+ public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
+ AMNodeEventTaskAttemptEnded event = (AMNodeEventTaskAttemptEnded) nEvent;
+ if (event.failed()) {
+ node.numFailedTAs++;
+ boolean shouldBlacklist = node.shouldBlacklistNode();
+ if (shouldBlacklist) {
+ node.blacklistSelf();
+ return AMNodeState.BLACKLISTED;
+ }
+ }
+ return AMNodeState.ACTIVE;
+ }
+ }
+
+ // Forgetting about past errors. Will go back to ACTIVE, not FORCED_ACTIVE
+ protected static class NodeTurnedUnhealthyTransition implements
+ SingleArcTransition<AMNodeImpl, AMNodeEvent> {
+ @Override
+ public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
+ for (ContainerId c : node.containers) {
+ node.sendEvent(new AMContainerEventNodeFailed(c, "Node failed"));
+ }
+ // Resetting counters.
+ node.numFailedTAs = 0;
+ node.numSuccessfulTAs = 0;
+ }
+ }
+
+ protected static class IgnoreBlacklistingDisabledTransition implements
+ MultipleArcTransition<AMNodeImpl, AMNodeEvent, AMNodeState> {
+
+ @Override
+ public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
+ node.ignoreBlacklisting = false;
+ boolean shouldBlacklist = node.shouldBlacklistNode();
+ if (shouldBlacklist) {
+ node.blacklistSelf();
+ return AMNodeState.BLACKLISTED;
+ }
+ return AMNodeState.ACTIVE;
+ }
+ }
+
+ protected static class IgnoreBlacklistingStateChangeTransition implements
+ SingleArcTransition<AMNodeImpl, AMNodeEvent> {
+
+ private boolean ignore;
+
+ public IgnoreBlacklistingStateChangeTransition(boolean ignore) {
+ this.ignore = ignore;
+ }
+
+ @Override
+ public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
+ node.ignoreBlacklisting = ignore;
+ }
+ }
+
+ protected static class ContainerAllocatedWhileBlacklistedTransition implements
+ SingleArcTransition<AMNodeImpl, AMNodeEvent> {
+ @Override
+ public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
+ AMNodeEventContainerAllocated event = (AMNodeEventContainerAllocated) nEvent;
+ node.sendEvent(new AMContainerEvent(event.getContainerId(),
+ AMContainerEventType.C_STOP_REQUEST));
+ // ZZZ CReuse: Should the scheduler check node state before scheduling a
+ // container on it ?
+ }
+ }
+
+ protected static class TaskAttemptSucceededWhileBlacklistedTransition
+ implements MultipleArcTransition<AMNodeImpl, AMNodeEvent, AMNodeState> {
+ @Override
+ public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
+ node.numSuccessfulTAs++;
+ return AMNodeState.BLACKLISTED;
+ // For now, always blacklisted. May change at a later point to re-enable
+ // the node.
+ }
+ }
+
+ protected static class CountFailedTaskAttemptTransition implements
+ SingleArcTransition<AMNodeImpl, AMNodeEvent> {
+ @Override
+ public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
+ AMNodeEventTaskAttemptEnded event = (AMNodeEventTaskAttemptEnded) nEvent;
+ if (event.failed())
+ node.numFailedTAs++;
+ }
+ }
+
+ protected static class GenericErrorTransition implements
+ SingleArcTransition<AMNodeImpl, AMNodeEvent> {
+
+ @Override
+ public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
+ LOG.warn("Invalid event: " + nEvent.getType() + " while in state: "
+ + node.getState() + ". Ignoring." + " Event: " + nEvent);
+ }
+ }
+
+ protected static class ContainerAllocatedWhileUnhealthyTransition implements
+ SingleArcTransition<AMNodeImpl, AMNodeEvent> {
+ @Override
+ public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
+ AMNodeEventContainerAllocated event = (AMNodeEventContainerAllocated) nEvent;
+ LOG.info("Node: " + node.getNodeId()
+ + " got allocated a contaienr with id: " + event.getContainerId()
+ + " while in UNHEALTHY state. Releasing it.");
+ node.sendEvent(new AMContainerEventNodeFailed(event.getContainerId(),
+ "new container assigned on failed node " + node.getNodeId()));
+ }
+ }
+
+ protected static class NodeTurnedHealthyTransition implements
+ MultipleArcTransition<AMNodeImpl, AMNodeEvent, AMNodeState> {
+ @Override
+ public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
+ node.pastContainers.addAll(node.containers);
+ node.containers.clear();
+ if (node.ignoreBlacklisting) {
+ return AMNodeState.FORCED_ACTIVE;
+ } else {
+ return AMNodeState.ACTIVE;
+ }
+ }
+ }
+
+ @Override
+ public boolean isUnhealthy() {
+ this.readLock.lock();
+ try {
+ return getState() == AMNodeState.UNHEALTHY;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public boolean isBlacklisted() {
+ this.readLock.lock();
+ try {
+ return getState() == AMNodeState.BLACKLISTED;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,211 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.node;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class AMNodeMap extends AbstractService implements
+ EventHandler<AMNodeEvent> {
+
+ static final Log LOG = LogFactory.getLog(AMNodeMap.class);
+
+ private final ConcurrentHashMap<NodeId, AMNode> nodeMap;
+ private final ConcurrentHashMap<String, Set<NodeId>> blacklistMap;
+ private final EventHandler eventHandler;
+ private final AppContext appContext;
+ private int numClusterNodes;
+ private boolean ignoreBlacklisting = false;
+ private int maxTaskFailuresPerNode;
+ private boolean nodeBlacklistingEnabled;
+ private int blacklistDisablePercent;
+
+
+ // TODO XXX Ensure there's a test for IgnoreBlacklisting in
+ // TestRMContainerAllocator. Otherwise add one.
+ public AMNodeMap(EventHandler eventHandler, AppContext appContext) {
+ super("AMNodeMap");
+ this.nodeMap = new ConcurrentHashMap<NodeId, AMNode>();
+ this.blacklistMap = new ConcurrentHashMap<String, Set<NodeId>>();
+ this.eventHandler = eventHandler;
+ this.appContext = appContext;
+ }
+
+ @Override
+ public synchronized void init(Configuration conf) {
+ this.maxTaskFailuresPerNode = conf.getInt(
+ MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3);
+ this.nodeBlacklistingEnabled = conf.getBoolean(
+ MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+ this.blacklistDisablePercent = conf.getInt(
+ MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
+ MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT);
+
+ LOG.info("blacklistDisablePercent is " + blacklistDisablePercent +
+ ", blacklistingEnabled: " + nodeBlacklistingEnabled +
+ ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode);
+
+ if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
+ throw new YarnException("Invalid blacklistDisablePercent: "
+ + blacklistDisablePercent
+ + ". Should be an integer between 0 and 100 or -1 to disabled");
+ }
+ super.init(conf);
+ }
+
+ public void nodeSeen(NodeId nodeId) {
+ nodeMap.putIfAbsent(nodeId, new AMNodeImpl(nodeId, maxTaskFailuresPerNode,
+ eventHandler, nodeBlacklistingEnabled, appContext));
+ }
+
+ // Interface for the scheduler to check about a specific host.
+ public boolean isHostBlackListed(String hostname) {
+ if (!nodeBlacklistingEnabled || ignoreBlacklisting) {
+ return false;
+ }
+ return blacklistMap.containsKey(hostname);
+ }
+
+ private void addToBlackList(NodeId nodeId) {
+ String host = nodeId.getHost();
+ Set<NodeId> nodes;
+
+ if (!blacklistMap.containsKey(host)) {
+ nodes = new HashSet<NodeId>();
+ blacklistMap.put(host, nodes);
+ } else {
+ nodes = blacklistMap.get(host);
+ }
+
+ if (!nodes.contains(nodeId)) {
+ nodes.add(nodeId);
+ }
+ }
+
+ // TODO: Currently, un-blacklisting feature is not supported.
+ /*
+ private void removeFromBlackList(NodeId nodeId) {
+ String host = nodeId.getHost();
+ if (blacklistMap.containsKey(host)) {
+ ArrayList<NodeId> nodes = blacklistMap.get(host);
+ nodes.remove(nodeId);
+ }
+ }
+ */
+
+ public void handle(AMNodeEvent rEvent) {
+ // No synchronization required until there's multiple dispatchers.
+ NodeId nodeId = rEvent.getNodeId();
+ switch (rEvent.getType()) {
+ case N_NODE_WAS_BLACKLISTED:
+ // When moving away from IGNORE_BLACKLISTING state, nodes will send out blacklisted events. These need to be ignored.
+ addToBlackList(nodeId);
+ computeIgnoreBlacklisting();
+ break;
+ case N_NODE_COUNT_UPDATED:
+ AMNodeEventNodeCountUpdated event = (AMNodeEventNodeCountUpdated) rEvent;
+ numClusterNodes = event.getNodeCount();
+ computeIgnoreBlacklisting();
+ break;
+ default:
+ nodeMap.get(nodeId).handle(rEvent);
+ }
+ }
+
+ // May be incorrect if there's multiple NodeManagers running on a single host.
+ // knownNodeCount is based on node managers, not hosts. blacklisting is
+ // currently based on hosts.
+ protected void computeIgnoreBlacklisting() {
+
+ boolean stateChanged = false;
+
+ if (!nodeBlacklistingEnabled) {
+ return;
+ }
+ if (blacklistDisablePercent != -1) {
+ if (numClusterNodes == 0) {
+ LOG.info("KnownNode Count at 0. Not computing ignoreBlacklisting");
+ return;
+ }
+ int val = (int) ((float) blacklistMap.size() / numClusterNodes * 100);
+ if (val >= blacklistDisablePercent) {
+ if (ignoreBlacklisting == false) {
+ ignoreBlacklisting = true;
+ LOG.info("Ignore Blacklisting set to true. Known: " + numClusterNodes
+ + ", Blacklisted: " + blacklistMap.size());
+ stateChanged = true;
+ }
+ } else {
+ if (ignoreBlacklisting == true) {
+ ignoreBlacklisting = false;
+ LOG.info("Ignore blacklisting set to false. Known: "
+ + numClusterNodes + ", Blacklisted: " + blacklistMap.size());
+ stateChanged = true;
+ }
+ }
+ }
+
+ if (stateChanged) {
+ sendIngoreBlacklistingStateToNodes();
+ }
+ }
+
+ private void sendIngoreBlacklistingStateToNodes() {
+ AMNodeEventType eventType =
+ ignoreBlacklisting ? AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED
+ : AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED;
+ for (NodeId nodeId : nodeMap.keySet()) {
+ sendEvent(new AMNodeEvent(nodeId, eventType));
+ }
+ }
+
+ public AMNode get(NodeId nodeId) {
+ return nodeMap.get(nodeId);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void sendEvent(Event<?> event) {
+ this.eventHandler.handle(event);
+ }
+
+ public int size() {
+ return nodeMap.size();
+ }
+
+ @Private
+ @VisibleForTesting
+ public boolean isBlacklistingIgnored() {
+ return this.ignoreBlacklisting;
+ }
+}
\ No newline at end of file
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeState.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeState.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeState.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeState.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+public enum AMNodeState {
+ ACTIVE,
+ FORCED_ACTIVE,
+ BLACKLISTED,
+ UNHEALTHY,
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeState.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/package-info.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/package-info.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/package-info.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/package-info.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.tez.dag.app.rm;
+import org.apache.hadoop.classification.InterfaceAudience;
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/package-info.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/ClientHSPolicyProvider.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/ClientHSPolicyProvider.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/ClientHSPolicyProvider.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/ClientHSPolicyProvider.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.app.security.authorize;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.v2.api.HSClientProtocolPB;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.Service;
+
+/**
+ * {@link PolicyProvider} for YARN MapReduce protocols.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ClientHSPolicyProvider extends PolicyProvider {
+
+ private static final Service[] mrHSServices =
+ new Service[] {
+ new Service(
+ JHAdminConfig.MR_HS_SECURITY_SERVICE_AUTHORIZATION,
+ HSClientProtocolPB.class)
+ };
+
+ @Override
+ public Service[] getServices() {
+ return mrHSServices;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/ClientHSPolicyProvider.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/MRAMPolicyProvider.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/MRAMPolicyProvider.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/MRAMPolicyProvider.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/MRAMPolicyProvider.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.app.security.authorize;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.Service;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+/**
+ * {@link PolicyProvider} for YARN MapReduce protocols.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class MRAMPolicyProvider extends PolicyProvider {
+
+ private static final Service[] mapReduceApplicationMasterServices =
+ new Service[] {
+ new Service(
+ MRJobConfig.MR_AM_SECURITY_SERVICE_AUTHORIZATION_TASK_UMBILICAL,
+ TaskUmbilicalProtocol.class),
+ new Service(
+ MRJobConfig.MR_AM_SECURITY_SERVICE_AUTHORIZATION_CLIENT,
+ MRClientProtocolPB.class)
+ };
+
+ @Override
+ public Service[] getServices() {
+ return mapReduceApplicationMasterServices;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/MRAMPolicyProvider.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/package-info.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/package-info.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/package-info.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/package-info.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.tez.dag.app.security.authorize;
+import org.apache.hadoop.classification.InterfaceAudience;
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/package-info.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DataStatistics.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DataStatistics.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DataStatistics.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DataStatistics.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,78 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.speculate;
+
+public class DataStatistics {
+ private int count = 0;
+ private double sum = 0;
+ private double sumSquares = 0;
+
+ public DataStatistics() {
+ }
+
+ public DataStatistics(double initNum) {
+ this.count = 1;
+ this.sum = initNum;
+ this.sumSquares = initNum * initNum;
+ }
+
+ public synchronized void add(double newNum) {
+ this.count++;
+ this.sum += newNum;
+ this.sumSquares += newNum * newNum;
+ }
+
+ public synchronized void updateStatistics(double old, double update) {
+ this.sum += update - old;
+ this.sumSquares += (update * update) - (old * old);
+ }
+
+ public synchronized double mean() {
+ return count == 0 ? 0.0 : sum/count;
+ }
+
+ public synchronized double var() {
+ // E(X^2) - E(X)^2
+ if (count <= 1) {
+ return 0.0;
+ }
+ double mean = mean();
+ return Math.max((sumSquares/count) - mean * mean, 0.0d);
+ }
+
+ public synchronized double std() {
+ return Math.sqrt(this.var());
+ }
+
+ public synchronized double outlier(float sigma) {
+ if (count != 0.0) {
+ return mean() + std() * sigma;
+ }
+
+ return 0.0;
+ }
+
+ public synchronized double count() {
+ return count;
+ }
+
+ public String toString() {
+ return "DataStatistics: count is " + count + ", sum is " + sum +
+ ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std();
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DataStatistics.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,507 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.speculate;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.dag.api.client.impl.TezBuilderUtils;
+import org.apache.tez.dag.api.records.TaskAttemptState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventType;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+import org.apache.tez.engine.records.TezVertexID;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+// FIXME does not handle multiple vertices
+public class DefaultSpeculator extends AbstractService implements
+ Speculator {
+
+ private static final long ON_SCHEDULE = Long.MIN_VALUE;
+ private static final long ALREADY_SPECULATING = Long.MIN_VALUE + 1;
+ private static final long TOO_NEW = Long.MIN_VALUE + 2;
+ private static final long PROGRESS_IS_GOOD = Long.MIN_VALUE + 3;
+ private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
+ private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;
+
+ private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L * 1L;
+ private static final long SOONEST_RETRY_AFTER_SPECULATE = 1000L * 15L;
+
+ private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1;
+ private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01;
+ private static final int MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10;
+
+ private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class);
+
+ private final ConcurrentMap<TezTaskID, Boolean> runningTasks
+ = new ConcurrentHashMap<TezTaskID, Boolean>();
+
+ private final Map<Task, AtomicBoolean> pendingSpeculations
+ = new ConcurrentHashMap<Task, AtomicBoolean>();
+
+ // These are the current needs, not the initial needs. For each job, these
+ // record the number of attempts that exist and that are actively
+ // waiting for a container [as opposed to running or finished]
+ // TODO handle multiple dags
+ private final ConcurrentMap<TezVertexID, AtomicInteger> vertexContainerNeeds
+ = new ConcurrentHashMap<TezVertexID, AtomicInteger>();
+
+ private final Set<TezTaskID> mayHaveSpeculated = new HashSet<TezTaskID>();
+
+ private final Configuration conf;
+ private AppContext context;
+ private Thread speculationBackgroundThread = null;
+ private BlockingQueue<SpeculatorEvent> eventQueue
+ = new LinkedBlockingQueue<SpeculatorEvent>();
+ private TaskRuntimeEstimator estimator;
+
+ private BlockingQueue<Object> scanControl = new LinkedBlockingQueue<Object>();
+
+ private final Clock clock;
+
+ private final EventHandler<TaskEvent> eventHandler;
+
+ public DefaultSpeculator(Configuration conf, AppContext context) {
+ this(conf, context, context.getClock());
+ }
+
+ public DefaultSpeculator(Configuration conf, AppContext context, Clock clock) {
+ this(conf, context, getEstimator(conf, context), clock);
+ }
+
+ static private TaskRuntimeEstimator getEstimator
+ (Configuration conf, AppContext context) {
+ TaskRuntimeEstimator estimator;
+
+ try {
+ // "yarn.mapreduce.job.task.runtime.estimator.class"
+ Class<? extends TaskRuntimeEstimator> estimatorClass
+ = conf.getClass(MRJobConfig.MR_AM_TASK_ESTIMATOR,
+ LegacyTaskRuntimeEstimator.class,
+ TaskRuntimeEstimator.class);
+
+ Constructor<? extends TaskRuntimeEstimator> estimatorConstructor
+ = estimatorClass.getConstructor();
+
+ estimator = estimatorConstructor.newInstance();
+
+ estimator.contextualize(conf, context);
+ } catch (InstantiationException ex) {
+ LOG.error("Can't make a speculation runtime extimator", ex);
+ throw new YarnException(ex);
+ } catch (IllegalAccessException ex) {
+ LOG.error("Can't make a speculation runtime extimator", ex);
+ throw new YarnException(ex);
+ } catch (InvocationTargetException ex) {
+ LOG.error("Can't make a speculation runtime extimator", ex);
+ throw new YarnException(ex);
+ } catch (NoSuchMethodException ex) {
+ LOG.error("Can't make a speculation runtime extimator", ex);
+ throw new YarnException(ex);
+ }
+
+ return estimator;
+ }
+
+ // This constructor is designed to be called by other constructors.
+ // However, it's public because we do use it in the test cases.
+ // Normally we figure out our own estimator.
+ public DefaultSpeculator
+ (Configuration conf, AppContext context,
+ TaskRuntimeEstimator estimator, Clock clock) {
+ super(DefaultSpeculator.class.getName());
+
+ this.conf = conf;
+ this.context = context;
+ this.estimator = estimator;
+ this.clock = clock;
+ this.eventHandler = context.getEventHandler();
+ }
+
+/* ************************************************************* */
+
+ // This is the task-mongering that creates the two new threads -- one for
+ // processing events from the event queue and one for periodically
+ // looking for speculation opportunities
+
+ @Override
+ public void start() {
+ Runnable speculationBackgroundCore
+ = new Runnable() {
+ @Override
+ public void run() {
+ while (!Thread.currentThread().isInterrupted()) {
+ long backgroundRunStartTime = clock.getTime();
+ try {
+ int speculations = computeSpeculations();
+ long mininumRecomp
+ = speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE
+ : SOONEST_RETRY_AFTER_NO_SPECULATE;
+
+ long wait = Math.max(mininumRecomp,
+ clock.getTime() - backgroundRunStartTime);
+
+ if (speculations > 0) {
+ LOG.info("We launched " + speculations
+ + " speculations. Sleeping " + wait + " milliseconds.");
+ }
+
+ Object pollResult
+ = scanControl.poll(wait, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.error("Background thread returning, interrupted : " + e);
+ e.printStackTrace(System.out);
+ return;
+ }
+ }
+ }
+ };
+ speculationBackgroundThread = new Thread
+ (speculationBackgroundCore, "DefaultSpeculator background processing");
+ speculationBackgroundThread.start();
+
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ // this could be called before background thread is established
+ if (speculationBackgroundThread != null) {
+ speculationBackgroundThread.interrupt();
+ }
+ super.stop();
+ }
+
+ @Override
+ public void handleAttempt(TaskAttemptStatus status) {
+ long timestamp = clock.getTime();
+ statusUpdate(status, timestamp);
+ }
+
+ // This section is not part of the Speculator interface; it's used only for
+ // testing
+ public boolean eventQueueEmpty() {
+ return eventQueue.isEmpty();
+ }
+
+ // This interface is intended to be used only for test cases.
+ public void scanForSpeculations() {
+ LOG.info("We got asked to run a debug speculation scan.");
+ // debug
+ System.out.println("We got asked to run a debug speculation scan.");
+ System.out.println("There are " + scanControl.size()
+ + " events stacked already.");
+ scanControl.add(new Object());
+ Thread.yield();
+ }
+
+
+/* ************************************************************* */
+
+ // This section contains the code that gets run for a SpeculatorEvent
+
+ private AtomicInteger containerNeed(TezTaskID taskID) {
+ TezVertexID vId = taskID.getVertexID();
+
+ AtomicInteger result = vertexContainerNeeds.get(vId);
+
+ if (result == null) {
+ vertexContainerNeeds.putIfAbsent(vId, new AtomicInteger(0));
+ result = vertexContainerNeeds.get(vId);
+ }
+
+ return result;
+ }
+
+ private synchronized void processSpeculatorEvent(SpeculatorEvent event) {
+ switch (event.getType()) {
+ case ATTEMPT_STATUS_UPDATE:
+ statusUpdate(event.getReportedStatus(), event.getTimestamp());
+ break;
+
+ case TASK_CONTAINER_NEED_UPDATE:
+ {
+ AtomicInteger need = containerNeed(event.getTaskID());
+ need.addAndGet(event.containersNeededChange());
+ break;
+ }
+
+ case ATTEMPT_START:
+ {
+ LOG.info("ATTEMPT_START " + event.getTaskID());
+ estimator.enrollAttempt
+ (event.getReportedStatus(), event.getTimestamp());
+ break;
+ }
+
+ case JOB_CREATE:
+ {
+ LOG.info("JOB_CREATE " + event.getJobID());
+ estimator.contextualize(getConfig(), context);
+ break;
+ }
+ }
+ }
+
+ /**
+ * Absorbs one TaskAttemptStatus
+ *
+ * @param reportedStatus the status report that we got from a task attempt
+ * that we want to fold into the speculation data for this job
+ * @param timestamp the time this status corresponds to. This matters
+ * because statuses contain progress.
+ */
+ protected void statusUpdate(TaskAttemptStatus reportedStatus, long timestamp) {
+
+ String stateString = reportedStatus.taskState.toString();
+
+ TezTaskAttemptID attemptID = reportedStatus.id;
+ TezTaskID taskID = attemptID.getTaskID();
+ DAG job = context.getDAG();
+
+ if (job == null) {
+ return;
+ }
+
+ Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
+
+ if (task == null) {
+ return;
+ }
+
+ estimator.updateAttempt(reportedStatus, timestamp);
+
+ // If the task is already known to be speculation-bait, don't do anything
+ if (pendingSpeculations.get(task) != null) {
+ if (pendingSpeculations.get(task).get()) {
+ return;
+ }
+ }
+
+ if (stateString.equals(TaskAttemptState.RUNNING.name())) {
+ runningTasks.putIfAbsent(taskID, Boolean.TRUE);
+ } else {
+ runningTasks.remove(taskID, Boolean.TRUE);
+ }
+ }
+
+/* ************************************************************* */
+
+// This is the code section that runs periodically and adds speculations for
+// those jobs that need them.
+
+
+ // This can return a few magic values for tasks that shouldn't speculate:
+ // returns ON_SCHEDULE if thresholdRuntime(taskID) says that we should not
+ // considering speculating this task
+ // returns ALREADY_SPECULATING if that is true. This has priority.
+ // returns TOO_NEW if our companion task hasn't gotten any information
+ // returns PROGRESS_IS_GOOD if the task is sailing through
+ // returns NOT_RUNNING if the task is not running
+ //
+ // All of these values are negative. Any value that should be allowed to
+ // speculate is 0 or positive.
+ private long speculationValue(TezTaskID taskID, long now) {
+ DAG job = context.getDAG();
+ Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
+ Map<TezTaskAttemptID, TaskAttempt> attempts = task.getAttempts();
+ long acceptableRuntime = Long.MIN_VALUE;
+ long result = Long.MIN_VALUE;
+
+ if (!mayHaveSpeculated.contains(taskID)) {
+ acceptableRuntime = estimator.thresholdRuntime(taskID);
+ if (acceptableRuntime == Long.MAX_VALUE) {
+ return ON_SCHEDULE;
+ }
+ }
+
+ TezTaskAttemptID runningTaskAttemptID = null;
+
+ int numberRunningAttempts = 0;
+
+ for (TaskAttempt taskAttempt : attempts.values()) {
+ if (taskAttempt.getState() == TaskAttemptState.RUNNING
+ || taskAttempt.getState() == TaskAttemptState.STARTING) {
+ if (++numberRunningAttempts > 1) {
+ return ALREADY_SPECULATING;
+ }
+ runningTaskAttemptID = taskAttempt.getID();
+
+ long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);
+
+ long taskAttemptStartTime
+ = estimator.attemptEnrolledTime(runningTaskAttemptID);
+ if (taskAttemptStartTime > now) {
+ // This background process ran before we could process the task
+ // attempt status change that chronicles the attempt start
+ return TOO_NEW;
+ }
+
+ long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;
+
+ long estimatedReplacementEndTime
+ = now + estimator.estimatedNewAttemptRuntime(taskID);
+
+ if (estimatedEndTime < now) {
+ return PROGRESS_IS_GOOD;
+ }
+
+ if (estimatedReplacementEndTime >= estimatedEndTime) {
+ return TOO_LATE_TO_SPECULATE;
+ }
+
+ result = estimatedEndTime - estimatedReplacementEndTime;
+ }
+ }
+
+ // If we are here, there's at most one task attempt.
+ if (numberRunningAttempts == 0) {
+ return NOT_RUNNING;
+ }
+
+
+
+ if (acceptableRuntime == Long.MIN_VALUE) {
+ acceptableRuntime = estimator.thresholdRuntime(taskID);
+ if (acceptableRuntime == Long.MAX_VALUE) {
+ return ON_SCHEDULE;
+ }
+ }
+
+ return result;
+ }
+
+ //Add attempt to a given Task.
+ protected void addSpeculativeAttempt(TezTaskID taskID) {
+ LOG.info
+ ("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
+ eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_ADD_SPEC_ATTEMPT));
+ mayHaveSpeculated.add(taskID);
+ }
+
+ @Override
+ public void handle(SpeculatorEvent event) {
+ processSpeculatorEvent(event);
+ }
+
+
+ private int maybeScheduleAMapSpeculation() {
+ return maybeScheduleASpeculation(0);
+ }
+
+ private int maybeScheduleAReduceSpeculation() {
+ return maybeScheduleASpeculation(1);
+ }
+
+ private int maybeScheduleASpeculation(int vertexId) {
+ int successes = 0;
+
+ long now = clock.getTime();
+
+ // FIXME this needs to be fixed for a DAG
+ // TODO handle multiple dags
+ for (ConcurrentMap.Entry<TezVertexID, AtomicInteger> vertexEntry :
+ vertexContainerNeeds.entrySet()) {
+ // This race conditon is okay. If we skip a speculation attempt we
+ // should have tried because the event that lowers the number of
+ // containers needed to zero hasn't come through, it will next time.
+ // Also, if we miss the fact that the number of containers needed was
+ // zero but increased due to a failure it's not too bad to launch one
+ // container prematurely.
+ if (vertexEntry.getValue().get() > 0) {
+ continue;
+ }
+
+ int numberSpeculationsAlready = 0;
+ int numberRunningTasks = 0;
+
+ // loop through the tasks of the kind
+ DAG job = context.getDAG();
+
+ Map<TezTaskID, Task> tasks =
+ job.getVertex(TezBuilderUtils.newVertexID(job.getID(), vertexId)).getTasks();
+
+ int numberAllowedSpeculativeTasks
+ = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
+ PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());
+
+ TezTaskID bestTaskID = null;
+ long bestSpeculationValue = -1L;
+
+ // this loop is potentially pricey.
+ // TODO track the tasks that are potentially worth looking at
+ for (Map.Entry<TezTaskID, Task> taskEntry : tasks.entrySet()) {
+ long mySpeculationValue = speculationValue(taskEntry.getKey(), now);
+
+ if (mySpeculationValue == ALREADY_SPECULATING) {
+ ++numberSpeculationsAlready;
+ }
+
+ if (mySpeculationValue != NOT_RUNNING) {
+ ++numberRunningTasks;
+ }
+
+ if (mySpeculationValue > bestSpeculationValue) {
+ bestTaskID = taskEntry.getKey();
+ bestSpeculationValue = mySpeculationValue;
+ }
+ }
+ numberAllowedSpeculativeTasks
+ = (int) Math.max(numberAllowedSpeculativeTasks,
+ PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);
+
+ // If we found a speculation target, fire it off
+ if (bestTaskID != null
+ && numberAllowedSpeculativeTasks > numberSpeculationsAlready) {
+ addSpeculativeAttempt(bestTaskID);
+ ++successes;
+ }
+ }
+
+ return successes;
+ }
+
+ private int computeSpeculations() {
+ // We'll try to issue one map and one reduce speculation per job per run
+ return maybeScheduleAMapSpeculation() + maybeScheduleAReduceSpeculation();
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,192 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.speculate;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+/**
+ * This estimator exponentially smooths the rate of progress versus wallclock
+ * time. Conceivably we could write an estimator that smooths time per
+ * unit progress, and get different results.
+ */
+public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase {
+
+ private final ConcurrentMap<TezTaskAttemptID, AtomicReference<EstimateVector>> estimates
+ = new ConcurrentHashMap<TezTaskAttemptID, AtomicReference<EstimateVector>>();
+
+ private SmoothedValue smoothedValue;
+
+ private long lambda;
+
+ public enum SmoothedValue {
+ RATE, TIME_PER_UNIT_PROGRESS
+ }
+
+ ExponentiallySmoothedTaskRuntimeEstimator
+ (long lambda, SmoothedValue smoothedValue) {
+ super();
+ this.smoothedValue = smoothedValue;
+ this.lambda = lambda;
+ }
+
+ public ExponentiallySmoothedTaskRuntimeEstimator() {
+ super();
+ }
+
+ // immutable
+ private class EstimateVector {
+ final double value;
+ final float basedOnProgress;
+ final long atTime;
+
+ EstimateVector(double value, float basedOnProgress, long atTime) {
+ this.value = value;
+ this.basedOnProgress = basedOnProgress;
+ this.atTime = atTime;
+ }
+
+ EstimateVector incorporate(float newProgress, long newAtTime) {
+ if (newAtTime <= atTime || newProgress < basedOnProgress) {
+ return this;
+ }
+
+ double oldWeighting
+ = value < 0.0
+ ? 0.0 : Math.exp(((double) (newAtTime - atTime)) / lambda);
+
+ double newRead = (newProgress - basedOnProgress) / (newAtTime - atTime);
+
+ if (smoothedValue == SmoothedValue.TIME_PER_UNIT_PROGRESS) {
+ newRead = 1.0 / newRead;
+ }
+
+ return new EstimateVector
+ (value * oldWeighting + newRead * (1.0 - oldWeighting),
+ newProgress, newAtTime);
+ }
+ }
+
+ private void incorporateReading
+ (TezTaskAttemptID attemptID, float newProgress, long newTime) {
+ //TODO: Refactor this method, it seems more complicated than necessary.
+ AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
+
+ if (vectorRef == null) {
+ estimates.putIfAbsent(attemptID, new AtomicReference<EstimateVector>(null));
+ incorporateReading(attemptID, newProgress, newTime);
+ return;
+ }
+
+ EstimateVector oldVector = vectorRef.get();
+
+ if (oldVector == null) {
+ if (vectorRef.compareAndSet(null,
+ new EstimateVector(-1.0, 0.0F, Long.MIN_VALUE))) {
+ return;
+ }
+
+ incorporateReading(attemptID, newProgress, newTime);
+ return;
+ }
+
+ while (!vectorRef.compareAndSet
+ (oldVector, oldVector.incorporate(newProgress, newTime))) {
+ oldVector = vectorRef.get();
+ }
+ }
+
+ private EstimateVector getEstimateVector(TezTaskAttemptID attemptID) {
+ AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
+
+ if (vectorRef == null) {
+ return null;
+ }
+
+ return vectorRef.get();
+ }
+
+ @Override
+ public void contextualize(Configuration conf, AppContext context) {
+ super.contextualize(conf, context);
+
+ lambda
+ = conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS,
+ MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS);
+ smoothedValue
+ = conf.getBoolean(MRJobConfig.MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE, true)
+ ? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS;
+ }
+
+ @Override
+ public long estimatedRuntime(TezTaskAttemptID id) {
+ Long startTime = (Long) startTimes.get(id);
+
+ if (startTime == null) {
+ return -1L;
+ }
+
+ EstimateVector vector = getEstimateVector(id);
+
+ if (vector == null) {
+ return -1L;
+ }
+
+ long sunkTime = vector.atTime - startTime;
+
+ double value = vector.value;
+ float progress = vector.basedOnProgress;
+
+ if (value == 0) {
+ return -1L;
+ }
+
+ double rate = smoothedValue == SmoothedValue.RATE ? value : 1.0 / value;
+
+ if (rate == 0.0) {
+ return -1L;
+ }
+
+ double remainingTime = (1.0 - progress) / rate;
+
+ return sunkTime + (long)remainingTime;
+ }
+
+ @Override
+ public long runtimeEstimateVariance(TezTaskAttemptID id) {
+ return -1L;
+ }
+
+ @Override
+ public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+ super.updateAttempt(status, timestamp);
+ TezTaskAttemptID attemptID = status.id;
+
+ float progress = status.progress;
+
+ incorporateReading(attemptID, progress, timestamp);
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,145 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.speculate;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.records.TaskAttemptState;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+
+public class LegacyTaskRuntimeEstimator extends StartEndTimesBase {
+
+ private final Map<TaskAttempt, AtomicLong> attemptRuntimeEstimates
+ = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
+ private final ConcurrentHashMap<TaskAttempt, AtomicLong> attemptRuntimeEstimateVariances
+ = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
+
+ @Override
+ public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+ super.updateAttempt(status, timestamp);
+
+
+ TezTaskAttemptID attemptID = status.id;
+ TezTaskID taskID = attemptID.getTaskID();
+ DAG job = context.getDAG();
+
+ if (job == null) {
+ return;
+ }
+
+ Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
+
+ if (task == null) {
+ return;
+ }
+
+ TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+ if (taskAttempt == null) {
+ return;
+ }
+
+ Long boxedStart = (Long) startTimes.get(attemptID);
+ long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
+
+ // We need to do two things.
+ // 1: If this is a completion, we accumulate statistics in the superclass
+ // 2: If this is not a completion, we learn more about it.
+
+ // This is not a completion, but we're cooking.
+ //
+ if (taskAttempt.getState() == TaskAttemptState.RUNNING) {
+ // See if this task is already in the registry
+ AtomicLong estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
+ AtomicLong estimateVarianceContainer
+ = attemptRuntimeEstimateVariances.get(taskAttempt);
+
+ if (estimateContainer == null) {
+ if (attemptRuntimeEstimates.get(taskAttempt) == null) {
+ attemptRuntimeEstimates.put(taskAttempt, new AtomicLong());
+
+ estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
+ }
+ }
+
+ if (estimateVarianceContainer == null) {
+ attemptRuntimeEstimateVariances.putIfAbsent(taskAttempt, new AtomicLong());
+ estimateVarianceContainer = attemptRuntimeEstimateVariances.get(taskAttempt);
+ }
+
+
+ long estimate = -1;
+ long varianceEstimate = -1;
+
+ // This code assumes that we'll never consider starting a third
+ // speculative task attempt if two are already running for this task
+ if (start > 0 && timestamp > start) {
+ estimate = (long) ((timestamp - start) / Math.max(0.0001, status.progress));
+ varianceEstimate = (long) (estimate * status.progress / 10);
+ }
+ if (estimateContainer != null) {
+ estimateContainer.set(estimate);
+ }
+ if (estimateVarianceContainer != null) {
+ estimateVarianceContainer.set(varianceEstimate);
+ }
+ }
+ }
+
+ private long storedPerAttemptValue
+ (Map<TaskAttempt, AtomicLong> data, TezTaskAttemptID attemptID) {
+ TezTaskID taskID = attemptID.getTaskID();
+ DAG job = context.getDAG();
+
+ Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
+
+ if (task == null) {
+ return -1L;
+ }
+
+ TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+ if (taskAttempt == null) {
+ return -1L;
+ }
+
+ AtomicLong estimate = data.get(taskAttempt);
+
+ return estimate == null ? -1L : estimate.get();
+
+ }
+
+ @Override
+ public long estimatedRuntime(TezTaskAttemptID attemptID) {
+ return storedPerAttemptValue(attemptRuntimeEstimates, attemptID);
+ }
+
+ @Override
+ public long runtimeEstimateVariance(TezTaskAttemptID attemptID) {
+ return storedPerAttemptValue(attemptRuntimeEstimateVariances, attemptID);
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,72 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.speculate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+
+
+/*
+ * This class is provided solely as an exemplae of the values that mean
+ * that nothing needs to be computed. It's not currently used.
+ */
+public class NullTaskRuntimesEngine implements TaskRuntimeEstimator {
+ @Override
+ public void enrollAttempt(TaskAttemptStatus status, long timestamp) {
+ // no code
+ }
+
+ @Override
+ public long attemptEnrolledTime(TezTaskAttemptID attemptID) {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+ // no code
+ }
+
+ @Override
+ public void contextualize(Configuration conf, AppContext context) {
+ // no code
+ }
+
+ @Override
+ public long thresholdRuntime(TezTaskID id) {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public long estimatedRuntime(TezTaskAttemptID id) {
+ return -1L;
+ }
+ @Override
+ public long estimatedNewAttemptRuntime(TezTaskID id) {
+ return -1L;
+ }
+
+ @Override
+ public long runtimeEstimateVariance(TezTaskAttemptID id) {
+ return -1L;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.speculate;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+
+/**
+ * Speculator component. Task Attempts' status updates are sent to this
+ * component. Concrete implementation runs the speculative algorithm and
+ * sends the TaskEventType.T_ADD_ATTEMPT.
+ *
+ * An implementation also has to arrange for the jobs to be scanned from
+ * time to time, to launch the speculations.
+ */
+public interface Speculator
+ extends EventHandler<SpeculatorEvent> {
+
+ enum EventType {
+ ATTEMPT_STATUS_UPDATE,
+ ATTEMPT_START,
+ TASK_CONTAINER_NEED_UPDATE,
+ JOB_CREATE
+ }
+
+ // This will be implemented if we go to a model where the events are
+ // processed within the TaskAttempts' state transitions' code.
+ public void handleAttempt(TaskAttemptStatus status);
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,86 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.speculate;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+
+public class SpeculatorEvent extends AbstractEvent<Speculator.EventType> {
+
+ // valid for ATTEMPT_STATUS_UPDATE
+ private TaskAttemptStatus reportedStatus;
+
+ // valid for TASK_CONTAINER_NEED_UPDATE
+ private TezTaskID taskID;
+ private int containersNeededChange;
+
+ // valid for CREATE_JOB
+ private TezDAGID dagId;
+
+ public SpeculatorEvent(TezDAGID dagId, long timestamp) {
+ super(Speculator.EventType.JOB_CREATE, timestamp);
+ this.dagId = dagId;
+ }
+
+ public SpeculatorEvent(TaskAttemptStatus reportedStatus, long timestamp) {
+ super(Speculator.EventType.ATTEMPT_STATUS_UPDATE, timestamp);
+ this.reportedStatus = reportedStatus;
+ }
+
+ public SpeculatorEvent(TezTaskAttemptID attemptID, boolean flag, long timestamp) {
+ super(Speculator.EventType.ATTEMPT_START, timestamp);
+ this.reportedStatus = new TaskAttemptStatus();
+ this.reportedStatus.id = attemptID;
+ this.taskID = attemptID.getTaskID();
+ }
+
+ /*
+ * This c'tor creates a TASK_CONTAINER_NEED_UPDATE event .
+ * We send a +1 event when a task enters a state where it wants a container,
+ * and a -1 event when it either gets one or withdraws the request.
+ * The per job sum of all these events is the number of containers requested
+ * but not granted. The intent is that we only do speculations when the
+ * speculation wouldn't compete for containers with tasks which need
+ * to be run.
+ */
+ public SpeculatorEvent(TezTaskID taskID, int containersNeededChange) {
+ super(Speculator.EventType.TASK_CONTAINER_NEED_UPDATE);
+ this.taskID = taskID;
+ this.containersNeededChange = containersNeededChange;
+ }
+
+ public TaskAttemptStatus getReportedStatus() {
+ return reportedStatus;
+ }
+
+ public int containersNeededChange() {
+ return containersNeededChange;
+ }
+
+ public TezTaskID getTaskID() {
+ return taskID;
+ }
+
+ public TezDAGID getJobID() {
+ return dagId;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,195 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.speculate;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.records.TaskAttemptState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+import org.apache.tez.engine.records.TezVertexID;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+abstract class StartEndTimesBase<V> implements TaskRuntimeEstimator {
+ static final float MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE
+ = 0.05F;
+ static final int MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
+ = 1;
+
+ protected Configuration conf = null;
+ protected AppContext context = null;
+
+ protected final Map<TezTaskAttemptID, Long> startTimes
+ = new ConcurrentHashMap<TezTaskAttemptID, Long>();
+
+ // XXXX This class design assumes that the contents of AppContext.getAllJobs
+ // never changes. Is that right?
+ //
+ // This assumption comes in in several places, mostly in data structure that
+ // can grow without limit if a AppContext gets new Job's when the old ones
+ // run out. Also, these mapper statistics blocks won't cover the Job's
+ // we don't know about.
+ // TODO handle multiple DAGs
+ protected final Map<TezVertexID, DataStatistics> vertexStatistics
+ = new HashMap<TezVertexID, DataStatistics>();
+
+ private float slowTaskRelativeTresholds = 0f;
+
+ protected final Set<Task> doneTasks = new HashSet<Task>();
+
+ @Override
+ public void enrollAttempt(TaskAttemptStatus status, long timestamp) {
+ startTimes.put(status.id,timestamp);
+ }
+
+ @Override
+ public long attemptEnrolledTime(TezTaskAttemptID attemptID) {
+ Long result = startTimes.get(attemptID);
+
+ return result == null ? Long.MAX_VALUE : result;
+ }
+
+
+ @Override
+ public void contextualize(Configuration conf, AppContext context) {
+ this.conf = conf;
+ this.context = context;
+
+
+ final DAG dag = context.getDAG();
+ for (Entry<TezVertexID, Vertex> entry: dag.getVertices().entrySet()) {
+ vertexStatistics.put(entry.getKey(), new DataStatistics());
+ slowTaskRelativeTresholds =
+ conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD, 1.0f);
+ }
+ }
+
+ protected DataStatistics dataStatisticsForTask(TezTaskID taskID) {
+ DAG dag = context.getDAG();
+
+ if (dag == null) {
+ return null;
+ }
+
+ Task task = dag.getVertex(taskID.getVertexID()).getTask(taskID);
+
+ if (task == null) {
+ return null;
+ }
+
+ return vertexStatistics.get(taskID.getVertexID());
+ }
+
+ @Override
+ public long thresholdRuntime(TezTaskID taskID) {
+ DAG job = context.getDAG();
+
+ DataStatistics statistics = dataStatisticsForTask(taskID);
+
+ Vertex v = job.getVertex(taskID.getVertexID());
+ int completedTasksOfType = v.getCompletedTasks();
+ int totalTasksOfType = v.getTotalTasks();
+
+ if (completedTasksOfType < MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
+ || (((float)completedTasksOfType) / totalTasksOfType)
+ < MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE ) {
+ return Long.MAX_VALUE;
+ }
+
+ long result = statistics == null
+ ? Long.MAX_VALUE
+ : (long)statistics.outlier(slowTaskRelativeTresholds);
+ return result;
+ }
+
+ @Override
+ public long estimatedNewAttemptRuntime(TezTaskID id) {
+ DataStatistics statistics = dataStatisticsForTask(id);
+
+ if (statistics == null) {
+ return -1L;
+ }
+
+ return (long)statistics.mean();
+ }
+
+ @Override
+ public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+
+ TezTaskAttemptID attemptID = status.id;
+ TezTaskID taskID = attemptID.getTaskID();
+ DAG job = context.getDAG();
+
+ if (job == null) {
+ return;
+ }
+
+ Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
+
+ if (task == null) {
+ return;
+ }
+
+ Long boxedStart = startTimes.get(attemptID);
+ long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
+
+ TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+ if (taskAttempt.getState() == TaskAttemptState.SUCCEEDED) {
+ boolean isNew = false;
+ // is this a new success?
+ synchronized (doneTasks) {
+ if (!doneTasks.contains(task)) {
+ doneTasks.add(task);
+ isNew = true;
+ }
+ }
+
+ // It's a new completion
+ // Note that if a task completes twice [because of a previous speculation
+ // and a race, or a success followed by loss of the machine with the
+ // local data] we only count the first one.
+ if (isNew) {
+ long finish = timestamp;
+ if (start > 1L && finish > 1L && start <= finish) {
+ long duration = finish - start;
+
+ DataStatistics statistics
+ = dataStatisticsForTask(taskID);
+
+ if (statistics != null) {
+ statistics.add(duration);
+ }
+ }
+ }
+ }
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
------------------------------------------------------------------------------
svn:eol-style = native