You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/12 01:52:43 UTC
tez git commit: TEZ-2707. Fix comments from reviews - part 2. (sseth)
Repository: tez
Updated Branches:
refs/heads/TEZ-2003 34e90dee4 -> 2fe54840c
TEZ-2707. Fix comments from reviews - part 2. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2fe54840
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2fe54840
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2fe54840
Branch: refs/heads/TEZ-2003
Commit: 2fe54840cad703d3e6e78176f48aadd2e7093f3e
Parents: 34e90de
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Aug 11 16:52:32 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Aug 11 16:52:32 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 2 +-
.../dag/app/TaskAttemptListenerImpTezDag.java | 4 ++
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 6 +-
.../apache/tez/dag/app/rm/AMSchedulerEvent.java | 10 ++-
.../rm/AMSchedulerEventDeallocateContainer.java | 10 +--
.../rm/AMSchedulerEventNodeBlacklistUpdate.java | 8 +--
.../tez/dag/app/rm/AMSchedulerEventTAEnded.java | 8 +--
.../app/rm/AMSchedulerEventTALaunchRequest.java | 8 +--
.../apache/tez/dag/app/rm/node/AMNodeEvent.java | 10 +--
.../apache/tez/dag/app/rm/node/AMNodeImpl.java | 14 ++---
.../tez/dag/app/rm/node/AMNodeTracker.java | 32 +++++-----
.../apache/tez/dag/app/MockDAGAppMaster.java | 2 -
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 65 ++++++--------------
.../tez/dag/app/dag/impl/TestVertexImpl2.java | 56 ++++++++++++++---
15 files changed, 113 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/2fe54840/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index fd3374e..adb800b 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -46,5 +46,6 @@ ALL CHANGES:
TEZ-2698. rebase 08/05
TEZ-2675. Add javadocs for new pluggable components, fix problems reported by jenkins
TEZ-2678. Fix comments from reviews - part 1.
+ TEZ-2707. Fix comments from reviews - part 2.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/2fe54840/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index ed4f520..fdc48b3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -2416,7 +2416,7 @@ public class DAGAppMaster extends AbstractService {
@VisibleForTesting
- static void parsePlugin(List<NamedEntityDescriptor> resultList,
+ public static void parsePlugin(List<NamedEntityDescriptor> resultList,
BiMap<String, Integer> pluginMap, List<TezNamedEntityDescriptorProto> namedEntityDescriptorProtos,
boolean tezYarnEnabled, boolean uberEnabled, UserPayload defaultPayload) {
http://git-wip-us.apache.org/repos/asf/tez/blob/2fe54840/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 2f6e93c..185193f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -225,6 +225,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
" events: " + (inEvents != null ? inEvents.size() : -1));
}
+ long currTime = context.getClock().getTime();
List<TezEvent> otherEvents = new ArrayList<TezEvent>();
// route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events
// (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT)
@@ -232,6 +233,9 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
// 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent
// 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
+ // for now, set the event time on the AM when it is received.
+ // this avoids any time disparity between machines.
+ tezEvent.setEventReceivedTime(currTime);
final EventType eventType = tezEvent.getEventType();
if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
http://git-wip-us.apache.org/repos/asf/tez/blob/2fe54840/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index bd8b99a..3106556 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -133,7 +133,6 @@ public class TaskAttemptImpl implements TaskAttempt,
protected final AppContext appContext;
private final TaskHeartbeatHandler taskHeartbeatHandler;
private long launchTime = 0;
- private long scheduleTime = 0;
private long finishTime = 0;
private String trackerName;
private int httpPort;
@@ -437,7 +436,6 @@ public class TaskAttemptImpl implements TaskAttempt,
this.task = task;
this.vertex = this.task.getVertex();
this.schedulingCausalTA = schedulingCausalTA;
- this.scheduledTime = clock.getTime();
this.reportedStatus = new TaskAttemptStatus(this.attemptId);
initTaskAttemptStatus(reportedStatus);
@@ -674,7 +672,7 @@ public class TaskAttemptImpl implements TaskAttempt,
public long getScheduleTime() {
readLock.lock();
try {
- return scheduleTime;
+ return scheduledTime;
} finally {
readLock.unlock();
}
@@ -1040,7 +1038,7 @@ public class TaskAttemptImpl implements TaskAttempt,
public TaskAttemptStateInternal transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event;
- ta.scheduleTime = ta.clock.getTime();
+ ta.scheduledTime = ta.clock.getTime();
// TODO Creating the remote task here may not be required in case of
// recovery.
http://git-wip-us.apache.org/repos/asf/tez/blob/2fe54840/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java
index af0bed0..dd9d951 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java
@@ -22,8 +22,14 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
public class AMSchedulerEvent extends AbstractEvent<AMSchedulerEventType> {
- // TODO Not a very useful class...
- public AMSchedulerEvent(AMSchedulerEventType type) {
+ private final int schedulerId;
+
+ public AMSchedulerEvent(AMSchedulerEventType type, int schedulerId) {
super(type);
+ this.schedulerId = schedulerId;
+ }
+
+ public int getSchedulerId() {
+ return this.schedulerId;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/2fe54840/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
index 5270aa2..d1ca99e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
@@ -23,20 +23,14 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
public class AMSchedulerEventDeallocateContainer extends AMSchedulerEvent {
private final ContainerId containerId;
- private final int schedulerId;
-
+
public AMSchedulerEventDeallocateContainer(ContainerId containerId, int schedulerId) {
- super(AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+ super(AMSchedulerEventType.S_CONTAINER_DEALLOCATE, schedulerId);
this.containerId = containerId;
- this.schedulerId = schedulerId;
}
public ContainerId getContainerId() {
return this.containerId;
}
-
- public int getSchedulerId() {
- return schedulerId;
- }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/2fe54840/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
index 679705a..d22c0ec 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
@@ -23,20 +23,14 @@ import org.apache.hadoop.yarn.api.records.NodeId;
public class AMSchedulerEventNodeBlacklistUpdate extends AMSchedulerEvent {
private final NodeId nodeId;
- private final int schedulerId;
public AMSchedulerEventNodeBlacklistUpdate(NodeId nodeId, boolean add, int schedulerId) {
super((add ? AMSchedulerEventType.S_NODE_BLACKLISTED
- : AMSchedulerEventType.S_NODE_UNBLACKLISTED));
+ : AMSchedulerEventType.S_NODE_UNBLACKLISTED), schedulerId);
this.nodeId = nodeId;
- this.schedulerId = schedulerId;
}
public NodeId getNodeId() {
return this.nodeId;
}
-
- public int getSchedulerId() {
- return schedulerId;
- }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/2fe54840/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
index ccc5465..f7fee3a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
@@ -30,17 +30,15 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
private final TaskAttemptState state;
private final TaskAttemptEndReason taskAttemptEndReason;
private final String diagnostics;
- private final int schedulerId;
public AMSchedulerEventTAEnded(TaskAttempt attempt, ContainerId containerId,
TaskAttemptState state, TaskAttemptEndReason taskAttemptEndReason, String diagnostics, int schedulerId) {
- super(AMSchedulerEventType.S_TA_ENDED);
+ super(AMSchedulerEventType.S_TA_ENDED, schedulerId);
this.attempt = attempt;
this.containerId = containerId;
this.state = state;
this.taskAttemptEndReason = taskAttemptEndReason;
this.diagnostics = diagnostics;
- this.schedulerId = schedulerId;
}
public TezTaskAttemptID getAttemptID() {
@@ -59,10 +57,6 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
return this.containerId;
}
- public int getSchedulerId() {
- return schedulerId;
- }
-
public TaskAttemptEndReason getTaskAttemptEndReason() {
return taskAttemptEndReason;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/2fe54840/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
index c59193c..0424c97 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
@@ -38,7 +38,6 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
private final TaskSpec remoteTaskSpec;
private final TaskAttempt taskAttempt;
- private final int schedulerId;
private final int launcherId;
private final int taskCommId;
@@ -48,7 +47,7 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
TaskLocationHint locationHint, int priority,
ContainerContext containerContext,
int schedulerId, int launcherId, int taskCommId) {
- super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST);
+ super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST, schedulerId);
this.attemptId = attemptId;
this.capability = capability;
this.remoteTaskSpec = remoteTaskSpec;
@@ -56,7 +55,6 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
this.locationHint = locationHint;
this.priority = priority;
this.containerContext = containerContext;
- this.schedulerId = schedulerId;
this.launcherId = launcherId;
this.taskCommId = taskCommId;
}
@@ -89,10 +87,6 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
return this.containerContext;
}
- public int getSchedulerId() {
- return schedulerId;
- }
-
public int getLauncherId() {
return launcherId;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/2fe54840/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
index 85bc513..1a975b0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
@@ -24,19 +24,19 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
public class AMNodeEvent extends AbstractEvent<AMNodeEventType> {
private final NodeId nodeId;
- private final int sourceId; // Effectively the schedulerId
+ private final int schedulerId;
- public AMNodeEvent(NodeId nodeId, int sourceId, AMNodeEventType type) {
+ public AMNodeEvent(NodeId nodeId, int schedulerId, AMNodeEventType type) {
super(type);
this.nodeId = nodeId;
- this.sourceId = sourceId;
+ this.schedulerId = schedulerId;
}
public NodeId getNodeId() {
return this.nodeId;
}
- public int getSourceId() {
- return sourceId;
+ public int getSchedulerId() {
+ return schedulerId;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/2fe54840/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index 88b36cb1f..18d5978 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -54,7 +54,7 @@ public class AMNodeImpl implements AMNode {
private final ReadLock readLock;
private final WriteLock writeLock;
private final NodeId nodeId;
- private final int sourceId;
+ private final int schedulerId;
private final AppContext appContext;
private final int maxTaskFailuresPerNode;
private boolean blacklistingEnabled;
@@ -173,14 +173,14 @@ public class AMNodeImpl implements AMNode {
@SuppressWarnings("rawtypes")
- public AMNodeImpl(NodeId nodeId, int sourceId, int maxTaskFailuresPerNode,
+ public AMNodeImpl(NodeId nodeId, int schedulerId, int maxTaskFailuresPerNode,
EventHandler eventHandler, boolean blacklistingEnabled,
AppContext appContext) {
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
this.nodeId = nodeId;
- this.sourceId = sourceId;
+ this.schedulerId = schedulerId;
this.appContext = appContext;
this.eventHandler = eventHandler;
this.blacklistingEnabled = blacklistingEnabled;
@@ -249,7 +249,7 @@ public class AMNodeImpl implements AMNode {
/* Blacklist the node with the AMNodeTracker and check if the node should be blacklisted */
protected boolean registerBadNodeAndShouldBlacklist() {
- return appContext.getNodeTracker().registerBadNodeAndShouldBlacklist(this, sourceId);
+ return appContext.getNodeTracker().registerBadNodeAndShouldBlacklist(this, schedulerId);
}
protected void blacklistSelf() {
@@ -259,8 +259,7 @@ public class AMNodeImpl implements AMNode {
// these containers are not useful anymore
pastContainers.addAll(containers);
containers.clear();
- // TODO TEZ-2124 node tracking per ext source
- sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true, 0));
+ sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true, schedulerId));
}
@SuppressWarnings("unchecked")
@@ -366,8 +365,7 @@ public class AMNodeImpl implements AMNode {
public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
node.ignoreBlacklisting = ignore;
if (node.getState() == AMNodeState.BLACKLISTED) {
- // TODO TEZ-2124 node tracking per ext source
- node.sendEvent(new AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false, 0));
+ node.sendEvent(new AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false, node.schedulerId));
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/2fe54840/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index 32e515b..751276e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -82,14 +82,14 @@ public class AMNodeTracker extends AbstractService implements
}
}
- public void nodeSeen(NodeId nodeId, int sourceId) {
- PerSourceNodeTracker nodeTracker = getAndCreateIfNeededPerSourceTracker(sourceId);
+ public void nodeSeen(NodeId nodeId, int schedulerId) {
+ PerSourceNodeTracker nodeTracker = getAndCreateIfNeededPerSourceTracker(schedulerId);
nodeTracker.nodeSeen(nodeId);
}
- boolean registerBadNodeAndShouldBlacklist(AMNode amNode, int sourceId) {
- return perSourceNodeTrackers.get(sourceId).registerBadNodeAndShouldBlacklist(amNode);
+ boolean registerBadNodeAndShouldBlacklist(AMNode amNode, int schedulerId) {
+ return perSourceNodeTrackers.get(schedulerId).registerBadNodeAndShouldBlacklist(amNode);
}
public void handle(AMNodeEvent rEvent) {
@@ -101,42 +101,42 @@ public class AMNodeTracker extends AbstractService implements
case N_IGNORE_BLACKLISTING_ENABLED:
case N_IGNORE_BLACKLISTING_DISABLED:
// All of these will only be seen after a node has been registered.
- perSourceNodeTrackers.get(rEvent.getSourceId()).handle(rEvent);
+ perSourceNodeTrackers.get(rEvent.getSchedulerId()).handle(rEvent);
break;
case N_TURNED_UNHEALTHY:
case N_TURNED_HEALTHY:
case N_NODE_COUNT_UPDATED:
// These events can be seen without a node having been marked as 'seen' before
- getAndCreateIfNeededPerSourceTracker(rEvent.getSourceId()).handle(rEvent);
+ getAndCreateIfNeededPerSourceTracker(rEvent.getSchedulerId()).handle(rEvent);
break;
}
}
- public AMNode get(NodeId nodeId, int sourceId) {
- return perSourceNodeTrackers.get(sourceId).get(nodeId);
+ public AMNode get(NodeId nodeId, int schedulerId) {
+ return perSourceNodeTrackers.get(schedulerId).get(nodeId);
}
- public int getNumNodes(int sourceId) {
- return perSourceNodeTrackers.get(sourceId).getNumNodes();
+ public int getNumNodes(int schedulerId) {
+ return perSourceNodeTrackers.get(schedulerId).getNumNodes();
}
@Private
@VisibleForTesting
- public boolean isBlacklistingIgnored(int sourceId) {
- return perSourceNodeTrackers.get(sourceId).isBlacklistingIgnored();
+ public boolean isBlacklistingIgnored(int schedulerId) {
+ return perSourceNodeTrackers.get(schedulerId).isBlacklistingIgnored();
}
public void dagComplete(DAG dag) {
// TODO TEZ-2337 Maybe reset failures from previous DAGs
}
- private PerSourceNodeTracker getAndCreateIfNeededPerSourceTracker(int sourceId) {
- PerSourceNodeTracker nodeTracker = perSourceNodeTrackers.get(sourceId);
+ private PerSourceNodeTracker getAndCreateIfNeededPerSourceTracker(int schedulerId) {
+ PerSourceNodeTracker nodeTracker = perSourceNodeTrackers.get(schedulerId);
if (nodeTracker == null) {
nodeTracker =
- new PerSourceNodeTracker(sourceId, eventHandler, appContext, maxTaskFailuresPerNode,
+ new PerSourceNodeTracker(schedulerId, eventHandler, appContext, maxTaskFailuresPerNode,
nodeBlacklistingEnabled, blacklistDisablePercent);
- PerSourceNodeTracker old = perSourceNodeTrackers.putIfAbsent(sourceId, nodeTracker);
+ PerSourceNodeTracker old = perSourceNodeTrackers.putIfAbsent(schedulerId, nodeTracker);
nodeTracker = old != null ? old : nodeTracker;
}
return nodeTracker;
http://git-wip-us.apache.org/repos/asf/tez/blob/2fe54840/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index b04b461..fe3e4ef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -417,8 +417,6 @@ public class MockDAGAppMaster extends DAGAppMaster {
events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData(
EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
MockDAGAppMaster.this.getContext().getClock().getTime()));
-// TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events,
-// cData.cIdStr, cData.taId, cData.nextFromEventId, 50000);
TaskHeartbeatRequest request =
new TaskHeartbeatRequest(cData.cIdStr, cData.taId, events, cData.nextFromEventId, cData.nextPreRoutedFromEventId,
50000);
http://git-wip-us.apache.org/repos/asf/tez/blob/2fe54840/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 947ea93..04bb2df 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -21,7 +21,6 @@ package org.apache.tez.dag.app.dag.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -286,10 +285,7 @@ public class TestTaskAttempt {
TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
MockEventHandler eventHandler = new MockEventHandler();
- TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- TaskCommunicator taskComm = mock(TaskCommunicator.class);
- doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
- doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+ TaskAttemptListener taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -338,10 +334,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = spy(new MockEventHandler());
- TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- TaskCommunicator taskComm = mock(TaskCommunicator.class);
- doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
- doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+ TaskAttemptListener taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -441,10 +434,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = new MockEventHandler();
- TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- TaskCommunicator taskComm = mock(TaskCommunicator.class);
- doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
- doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+ TaskAttemptListener taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -508,10 +498,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = spy(new MockEventHandler());
- TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- TaskCommunicator taskComm = mock(TaskCommunicator.class);
- doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
- doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+ TaskAttemptListener taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -602,10 +589,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = spy(new MockEventHandler());
- TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- TaskCommunicator taskComm = mock(TaskCommunicator.class);
- doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
- doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+ TaskAttemptListener taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -735,10 +719,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = spy(new MockEventHandler());
- TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- TaskCommunicator taskComm = mock(TaskCommunicator.class);
- doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
- doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+ TaskAttemptListener taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -829,10 +810,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = spy(new MockEventHandler());
- TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- TaskCommunicator taskComm = mock(TaskCommunicator.class);
- doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
- doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+ TaskAttemptListener taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -926,10 +904,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = spy(new MockEventHandler());
- TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- TaskCommunicator taskComm = mock(TaskCommunicator.class);
- doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
- doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+ TaskAttemptListener taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1031,10 +1006,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = spy(new MockEventHandler());
- TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- TaskCommunicator taskComm = mock(TaskCommunicator.class);
- doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
- doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+ TaskAttemptListener taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1133,10 +1105,7 @@ public class TestTaskAttempt {
MockEventHandler mockEh = new MockEventHandler();
MockEventHandler eventHandler = spy(mockEh);
- TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- TaskCommunicator taskComm = mock(TaskCommunicator.class);
- doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
- doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+ TaskAttemptListener taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1280,11 +1249,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = spy(new MockEventHandler());
- TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- TaskCommunicator mockTaskComm = mock(TaskCommunicator.class);
- when(mockTaskComm.getAddress()).thenReturn(
- new InetSocketAddress("localhost", 0));
- when(taListener.getTaskCommunicator(any(int.class))).thenReturn(mockTaskComm);
+ TaskAttemptListener taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1412,4 +1377,12 @@ public class TestTaskAttempt {
return new ContainerContext(new HashMap<String, LocalResource>(),
new Credentials(), new HashMap<String, String>(), "");
}
+
+ private TaskAttemptListener createMockTaskAttemptListener() {
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ TaskCommunicator taskComm = mock(TaskCommunicator.class);
+ doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+ doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+ return taListener;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/2fe54840/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
index 352ad87..0e34f68 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
@@ -24,36 +24,42 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
+import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
-import org.apache.tez.runtime.api.ExecutionContext;
import org.junit.Test;
/**
@@ -363,16 +369,46 @@ public class TestVertexImpl2 {
this.vertexName = "testvertex";
this.vertexExecutionContext = vertexExecutionContext;
this.defaultExecutionContext = defaultDagExecitionContext;
- if (numPlugins == 0) {
- this.taskSchedulers.put(TezConstants.getTezYarnServicePluginName(), 0);
- this.containerLaunchers.put(TezConstants.getTezYarnServicePluginName(), 0);
- this.taskSchedulers.put(TezConstants.getTezYarnServicePluginName(), 0);
- } else {
+ if (numPlugins == 0) { // Add default container plugins only
+ UserPayload defaultPayload;
+ try {
+ defaultPayload = TezUtils.createUserPayloadFromConf(new Configuration(false));
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), taskSchedulers, null,
+ true, false, defaultPayload);
+ DAGAppMaster
+ .parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), containerLaunchers, null,
+ true, false, defaultPayload);
+ DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), taskComms, null,
+ true, false, defaultPayload);
+ } else { // Add N plugins, no YARN defaults
+ List<TezNamedEntityDescriptorProto> schedulerList = new LinkedList<>();
+ List<TezNamedEntityDescriptorProto> launcherList = new LinkedList<>();
+ List<TezNamedEntityDescriptorProto> taskCommList = new LinkedList<>();
for (int i = 0; i < numPlugins; i++) {
- this.taskSchedulers.put(append(TASK_SCHEDULER_NAME_BASE, i), i);
- this.containerLaunchers.put(append(CONTAINER_LAUNCHER_NAME_BASE, i), i);
- this.taskComms.put(append(TASK_COMM_NAME_BASE, i), i);
+ schedulerList.add(TezNamedEntityDescriptorProto.newBuilder()
+ .setName(append(TASK_SCHEDULER_NAME_BASE, i)).setEntityDescriptor(
+ DAGProtos.TezEntityDescriptorProto.newBuilder()
+ .setClassName(append(TASK_SCHEDULER_NAME_BASE, i))).build());
+ launcherList.add(TezNamedEntityDescriptorProto.newBuilder()
+ .setName(append(CONTAINER_LAUNCHER_NAME_BASE, i)).setEntityDescriptor(
+ DAGProtos.TezEntityDescriptorProto.newBuilder()
+ .setClassName(append(CONTAINER_LAUNCHER_NAME_BASE, i))).build());
+ taskCommList.add(
+ TezNamedEntityDescriptorProto.newBuilder().setName(append(TASK_COMM_NAME_BASE, i))
+ .setEntityDescriptor(
+ DAGProtos.TezEntityDescriptorProto.newBuilder()
+ .setClassName(append(TASK_COMM_NAME_BASE, i))).build());
}
+ DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), taskSchedulers,
+ schedulerList, false, false, null);
+ DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), containerLaunchers,
+ launcherList, false, false, null);
+ DAGAppMaster
+ .parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), taskComms, taskCommList,
+ false, false, null);
}
this.appContext = createDefaultMockAppContext();