You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/22 09:26:19 UTC
[20/50] [abbrv] tez git commit: TEZ-2124. Change Node tracking to
work per external container source. (sseth)
TEZ-2124. Change Node tracking to work per external container source. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3a143a67
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3a143a67
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3a143a67
Branch: refs/heads/master
Commit: 3a143a67187b29f792c43b9acc40feec0dc16697
Parents: 9e94757
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Jul 16 14:18:35 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 21 18:13:55 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 2 +
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 3 +-
.../dag/app/launcher/ContainerLauncherImpl.java | 5 +-
.../app/rm/TaskSchedulerAppCallbackImpl.java | 89 +++++++++
.../dag/app/rm/TaskSchedulerEventHandler.java | 71 +++----
.../apache/tez/dag/app/rm/node/AMNodeEvent.java | 8 +-
.../rm/node/AMNodeEventContainerAllocated.java | 4 +-
.../rm/node/AMNodeEventNodeCountUpdated.java | 4 +-
.../app/rm/node/AMNodeEventStateChanged.java | 4 +-
.../rm/node/AMNodeEventTaskAttemptEnded.java | 4 +-
.../node/AMNodeEventTaskAttemptSucceeded.java | 4 +-
.../apache/tez/dag/app/rm/node/AMNodeImpl.java | 6 +-
.../tez/dag/app/rm/node/AMNodeTracker.java | 162 +++++-----------
.../dag/app/rm/node/PerSourceNodeTracker.java | 187 +++++++++++++++++++
.../apache/tez/dag/app/MockDAGAppMaster.java | 2 +-
.../tez/dag/app/TestMockDAGAppMaster.java | 2 +-
.../tez/dag/app/rm/TestContainerReuse.java | 62 +++---
.../app/rm/TestTaskSchedulerEventHandler.java | 11 +-
.../dag/app/rm/TestTaskSchedulerHelpers.java | 2 +-
.../tez/dag/app/rm/node/TestAMNodeTracker.java | 64 ++++---
21 files changed, 462 insertions(+), 235 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 590fe7f..604947c 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -33,5 +33,6 @@ ALL CHANGES:
TEZ-2508. rebase 06/01
TEZ-2526. Fix version for tez-history-parser.
TEZ-2621. rebase 07/14
+ TEZ-2124. Change Node tracking to work per external container source.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index e37fc2f..ec2ef66 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1440,9 +1440,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
VertexImpl v = createVertex(this, vertexName, i);
addVertex(v);
}
+
// check task resources, only check it in non-local mode
if (!appContext.isLocal()) {
for (Vertex v : vertexMap.values()) {
+ // TODO TEZ-2003 (post) Ideally, this should be per source.
if (v.getTaskResource().compareTo(appContext.getClusterInfo().getMaxContainerCapability()) > 0) {
String msg = "Vertex's TaskResource is beyond the cluster container capability," +
"Vertex=" + v.getLogIdentifier() +", Requested TaskResource=" + v.getTaskResource()
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 93b4c3f..1b55295 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -1396,7 +1396,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
if (amContainer != null) {
// inform the node about failure
task.eventHandler.handle(
- new AMNodeEventTaskAttemptEnded(amContainer.getContainer().getNodeId(),
+ new AMNodeEventTaskAttemptEnded(amContainer.getContainer().getNodeId(),
+ task.getVertex().getTaskSchedulerIdentifier(),
containerId, failedAttemptId, true));
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index a1eb2a7..a12fb04 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.dag.api.TezConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -292,7 +293,9 @@ public class ContainerLauncherImpl extends AbstractService implements
// nodes where containers will run at *this* point of time. This is
// *not* the cluster size and doesn't need to be.
- int numNodes = context.getNodeTracker().getNumNodes();
+ int yarnSourceIndex =
+ context.getTaskScheduerIdentifier(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
+ int numNodes = context.getNodeTracker().getNumNodes(yarnSourceIndex);
int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
if (poolSize < idealPoolSize) {
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java
new file mode 100644
index 0000000..ea37e94
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public class TaskSchedulerAppCallbackImpl implements TaskSchedulerService.TaskSchedulerAppCallback{
+
+ private final TaskSchedulerEventHandler tseh;
+ private final int schedulerId;
+
+ public TaskSchedulerAppCallbackImpl(TaskSchedulerEventHandler tseh, int schedulerId) {
+ this.tseh = tseh;
+ this.schedulerId = schedulerId;
+ }
+
+ @Override
+ public void taskAllocated(Object task, Object appCookie, Container container) {
+ tseh.taskAllocated(schedulerId, task, appCookie, container);
+ }
+
+ @Override
+ public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) {
+ tseh.containerCompleted(schedulerId, taskLastAllocated, containerStatus);
+ }
+
+ @Override
+ public void containerBeingReleased(ContainerId containerId) {
+ tseh.containerBeingReleased(schedulerId, containerId);
+ }
+
+ @Override
+ public void nodesUpdated(List<NodeReport> updatedNodes) {
+ tseh.nodesUpdated(schedulerId, updatedNodes);
+ }
+
+ @Override
+ public void appShutdownRequested() {
+ tseh.appShutdownRequested(schedulerId);
+ }
+
+ @Override
+ public void setApplicationRegistrationData(Resource maxContainerCapability,
+ Map<ApplicationAccessType, String> appAcls,
+ ByteBuffer clientAMSecretKey) {
+ tseh.setApplicationRegistrationData(schedulerId, maxContainerCapability, appAcls, clientAMSecretKey);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ tseh.onError(schedulerId, t);
+ }
+
+ @Override
+ public float getProgress() {
+ return tseh.getProgress(schedulerId);
+ }
+
+ @Override
+ public void preemptContainer(ContainerId containerId) {
+ tseh.preemptContainer(schedulerId, containerId);
+ }
+
+ @Override
+ public AppFinalStatus getFinalAppStatus() {
+ return tseh.getFinalAppStatus();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 4d1b43a..549db14 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -29,6 +29,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -81,8 +82,7 @@ import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import com.google.common.base.Preconditions;
-public class TaskSchedulerEventHandler extends AbstractService
- implements TaskSchedulerAppCallback,
+public class TaskSchedulerEventHandler extends AbstractService implements
EventHandler<AMSchedulerEvent> {
static final Logger LOG = LoggerFactory.getLogger(TaskSchedulerEventHandler.class);
@@ -315,7 +315,7 @@ public class TaskSchedulerEventHandler extends AbstractService
// stopped.
// AMNodeImpl blacklisting logic does not account for KILLED attempts.
sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers().
- get(attemptContainerId).getContainer().getNodeId(), attemptContainerId,
+ get(attemptContainerId).getContainer().getNodeId(), event.getSchedulerId(), attemptContainerId,
attempt.getID(), event.getState() == TaskAttemptState.FAILED));
}
}
@@ -330,7 +330,7 @@ public class TaskSchedulerEventHandler extends AbstractService
sendEvent(new AMContainerEventTASucceeded(usedContainerId,
event.getAttemptID()));
sendEvent(new AMNodeEventTaskAttemptSucceeded(appContext.getAllContainers().
- get(usedContainerId).getContainer().getNodeId(), usedContainerId,
+ get(usedContainerId).getContainer().getNodeId(), event.getSchedulerId(), usedContainerId,
event.getAttemptID()));
}
@@ -392,14 +392,16 @@ public class TaskSchedulerEventHandler extends AbstractService
private TaskSchedulerService createTaskScheduler(String host, int port, String trackingUrl,
AppContext appContext,
String schedulerClassName,
- long customAppIdIdentifier) {
+ long customAppIdIdentifier,
+ int schedulerId) {
+ TaskSchedulerAppCallback appCallback = new TaskSchedulerAppCallbackImpl(this, schedulerId);
if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
- return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
+ return new YarnTaskSchedulerService(appCallback, this.containerSignatureMatcher,
host, port, trackingUrl, appContext);
} else if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
LOG.info("Creating TaskScheduler: Local TaskScheduler");
- return new LocalTaskSchedulerService(this, this.containerSignatureMatcher,
+ return new LocalTaskSchedulerService(appCallback, this.containerSignatureMatcher,
host, port, trackingUrl, customAppIdIdentifier, appContext);
} else {
LOG.info("Creating custom TaskScheduler: " + schedulerClassName);
@@ -411,7 +413,7 @@ public class TaskSchedulerEventHandler extends AbstractService
.getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
int.class, String.class, long.class, Configuration.class);
ctor.setAccessible(true);
- return ctor.newInstance(this, appContext, host, port, trackingUrl, customAppIdIdentifier,
+ return ctor.newInstance(appCallback, appContext, host, port, trackingUrl, customAppIdIdentifier,
getConfig());
} catch (NoSuchMethodException e) {
throw new TezUncheckedException(e);
@@ -441,7 +443,7 @@ public class TaskSchedulerEventHandler extends AbstractService
LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerClasses[i] + "]=" +
customAppIdIdentifier);
taskSchedulers[i] = createTaskScheduler(host, port,
- trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier);
+ trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier, i);
}
}
@@ -525,20 +527,21 @@ public class TaskSchedulerEventHandler extends AbstractService
}
}
- // TaskSchedulerAppCallback methods
- @Override
- public synchronized void taskAllocated(Object task,
+ // TODO TEZ-2003 Consolidate TaskSchedulerAppCallback methods once these methods are moved into context
+
+ // TaskSchedulerAppCallback methods with schedulerId, where relevant
+ public synchronized void taskAllocated(int schedulerId, Object task,
Object appCookie,
Container container) {
AMSchedulerEventTALaunchRequest event =
(AMSchedulerEventTALaunchRequest) appCookie;
ContainerId containerId = container.getId();
if (appContext.getAllContainers()
- .addContainerIfNew(container, event.getSchedulerId(), event.getLauncherId(),
+ .addContainerIfNew(container, schedulerId, event.getLauncherId(),
event.getTaskCommId())) {
- appContext.getNodeTracker().nodeSeen(container.getNodeId());
+ appContext.getNodeTracker().nodeSeen(container.getNodeId(), schedulerId);
sendEvent(new AMNodeEventContainerAllocated(container
- .getNodeId(), container.getId()));
+ .getNodeId(), schedulerId, container.getId()));
}
@@ -558,8 +561,8 @@ public class TaskSchedulerEventHandler extends AbstractService
.getContainerContext().getCredentials(), event.getPriority()));
}
- @Override
- public synchronized void containerCompleted(Object task, ContainerStatus containerStatus) {
+ public synchronized void containerCompleted(int schedulerId, Object task, ContainerStatus containerStatus) {
+ // SchedulerId isn't used here since no node updates are sent out
// Inform the Containers about completion.
AMContainer amContainer = appContext.getAllContainers().get(containerStatus.getContainerId());
if (amContainer != null) {
@@ -582,8 +585,8 @@ public class TaskSchedulerEventHandler extends AbstractService
}
}
- @Override
- public synchronized void containerBeingReleased(ContainerId containerId) {
+ public synchronized void containerBeingReleased(int schedulerId, ContainerId containerId) {
+ // SchedulerId isn't used here since no node updates are sent out
AMContainer amContainer = appContext.getAllContainers().get(containerId);
if (amContainer != null) {
sendEvent(new AMContainerEventStopRequest(containerId));
@@ -591,28 +594,27 @@ public class TaskSchedulerEventHandler extends AbstractService
}
@SuppressWarnings("unchecked")
- @Override
- public synchronized void nodesUpdated(List<NodeReport> updatedNodes) {
+ public synchronized void nodesUpdated(int schedulerId, List<NodeReport> updatedNodes) {
for (NodeReport nr : updatedNodes) {
// Scheduler will find out from the node, if at all.
// Relying on the RM to not allocate containers on an unhealthy node.
- eventHandler.handle(new AMNodeEventStateChanged(nr));
+ eventHandler.handle(new AMNodeEventStateChanged(nr, schedulerId));
}
}
- @Override
- public synchronized void appShutdownRequested() {
+ public synchronized void appShutdownRequested(int schedulerId) {
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
- LOG.info("App shutdown requested by scheduler");
+ LOG.info("App shutdown requested by scheduler {}", schedulerId);
sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.AM_REBOOT));
}
- @Override
public synchronized void setApplicationRegistrationData(
+ int schedulerId,
Resource maxContainerCapability,
Map<ApplicationAccessType, String> appAcls,
ByteBuffer clientAMSecretKey) {
+ // TODO TEZ-2003 (post) Ideally clusterInfo should be available per source rather than a global view.
this.appContext.getClusterInfo().setMaxContainerCapability(
maxContainerCapability);
this.appAcls = appAcls;
@@ -623,7 +625,6 @@ public class TaskSchedulerEventHandler extends AbstractService
// TaskScheduler uses a separate thread for it's callbacks. Since this method
// returns a value which is required, the TaskScheduler wait for the call to
// complete and can hence lead to a deadlock if called from within a TSEH lock.
- @Override
public AppFinalStatus getFinalAppStatus() {
FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
StringBuffer sb = new StringBuffer();
@@ -665,24 +666,25 @@ public class TaskSchedulerEventHandler extends AbstractService
// TaskScheduler uses a separate thread for it's callbacks. Since this method
// returns a value which is required, the TaskScheduler wait for the call to
// complete and can hence lead to a deadlock if called from within a TSEH lock.
- @Override
- public float getProgress() {
+ public float getProgress(int schedulerId) {
// at this point allocate has been called and so node count must be available
// may change after YARN-1722
// This is a heartbeat in from the scheduler into the APP, and is being used to piggy-back and
// node updates from the cluster.
+
+ // Doubles as a mechanism to update node counts periodically. Hence schedulerId required.
+
// TODO Handle this in TEZ-2124. Need a way to know which scheduler is calling in.
int nodeCount = taskSchedulers[0].getClusterNodeCount();
if (nodeCount != cachedNodeCount) {
cachedNodeCount = nodeCount;
- sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount));
+ sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount, schedulerId));
}
return dagAppMaster.getProgress();
}
- @Override
- public void onError(Throwable t) {
- LOG.info("Error reported by scheduler", t);
+ public void onError(int schedulerId, Throwable t) {
+ LOG.info("Error reported by scheduler {} - {}", schedulerId, t);
sendEvent(new DAGAppMasterEventSchedulingServiceError(t));
}
@@ -697,8 +699,7 @@ public class TaskSchedulerEventHandler extends AbstractService
// the context has updated information.
}
- @Override
- public void preemptContainer(ContainerId containerId) {
+ public void preemptContainer(int schedulerId, ContainerId containerId) {
// TODO Why is this making a call back into the scheduler, when the call is originating from there.
// An AMContainer instance should already exist if an attempt is being made to preempt it
AMContainer amContainer = appContext.getAllContainers().get(containerId);
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
index a623cda..85bc513 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
@@ -24,13 +24,19 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
public class AMNodeEvent extends AbstractEvent<AMNodeEventType> {
private final NodeId nodeId;
+ private final int sourceId; // Effectively the schedulerId
- public AMNodeEvent(NodeId nodeId, AMNodeEventType type) {
+ public AMNodeEvent(NodeId nodeId, int sourceId, AMNodeEventType type) {
super(type);
this.nodeId = nodeId;
+ this.sourceId = sourceId;
}
public NodeId getNodeId() {
return this.nodeId;
}
+
+ public int getSourceId() {
+ return sourceId;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java
index 0770969..e250f42 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java
@@ -24,8 +24,8 @@ public class AMNodeEventContainerAllocated extends AMNodeEvent {
private final ContainerId containerId;
- public AMNodeEventContainerAllocated(NodeId nodeId, ContainerId containerId) {
- super(nodeId, AMNodeEventType.N_CONTAINER_ALLOCATED);
+ public AMNodeEventContainerAllocated(NodeId nodeId, int sourceId, ContainerId containerId) {
+ super(nodeId, sourceId, AMNodeEventType.N_CONTAINER_ALLOCATED);
this.containerId = containerId;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java
index 86ca1fc..3b35daf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java
@@ -22,8 +22,8 @@ public class AMNodeEventNodeCountUpdated extends AMNodeEvent {
private final int count;
- public AMNodeEventNodeCountUpdated(int nodeCount) {
- super(null, AMNodeEventType.N_NODE_COUNT_UPDATED);
+ public AMNodeEventNodeCountUpdated(int nodeCount, int sourceId) {
+ super(null, sourceId, AMNodeEventType.N_NODE_COUNT_UPDATED);
this.count = nodeCount;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java
index ca4e5bd..b371ddd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java
@@ -23,8 +23,8 @@ public class AMNodeEventStateChanged extends AMNodeEvent {
private NodeReport nodeReport;
- public AMNodeEventStateChanged(NodeReport nodeReport) {
- super(nodeReport.getNodeId(),
+ public AMNodeEventStateChanged(NodeReport nodeReport, int sourceId) {
+ super(nodeReport.getNodeId(), sourceId,
(nodeReport.getNodeState().isUnusable() ?
AMNodeEventType.N_TURNED_UNHEALTHY :
AMNodeEventType.N_TURNED_HEALTHY));
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java
index c823236..4a4cb61 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java
@@ -27,9 +27,9 @@ public class AMNodeEventTaskAttemptEnded extends AMNodeEvent {
private final ContainerId containerId;
private final TezTaskAttemptID taskAttemptId;
- public AMNodeEventTaskAttemptEnded(NodeId nodeId, ContainerId containerId,
+ public AMNodeEventTaskAttemptEnded(NodeId nodeId, int sourceId, ContainerId containerId,
TezTaskAttemptID taskAttemptId, boolean failed) {
- super(nodeId, AMNodeEventType.N_TA_ENDED);
+ super(nodeId, sourceId, AMNodeEventType.N_TA_ENDED);
this.failed = failed;
this.containerId = containerId;
this.taskAttemptId = taskAttemptId;
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java
index b07d594..2b8cb7d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java
@@ -27,9 +27,9 @@ public class AMNodeEventTaskAttemptSucceeded extends AMNodeEvent {
private final ContainerId containerId;
private final TezTaskAttemptID taskAttemptId;
- public AMNodeEventTaskAttemptSucceeded(NodeId nodeId,
+ public AMNodeEventTaskAttemptSucceeded(NodeId nodeId, int sourceId,
ContainerId containerId, TezTaskAttemptID taskAttemptId) {
- super(nodeId, AMNodeEventType.N_TA_SUCCEEDED);
+ super(nodeId, sourceId, AMNodeEventType.N_TA_SUCCEEDED);
this.containerId = containerId;
this.taskAttemptId = taskAttemptId;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index 0d8e4cd..88b36cb1f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -54,6 +54,7 @@ public class AMNodeImpl implements AMNode {
private final ReadLock readLock;
private final WriteLock writeLock;
private final NodeId nodeId;
+ private final int sourceId;
private final AppContext appContext;
private final int maxTaskFailuresPerNode;
private boolean blacklistingEnabled;
@@ -172,13 +173,14 @@ public class AMNodeImpl implements AMNode {
@SuppressWarnings("rawtypes")
- public AMNodeImpl(NodeId nodeId, int maxTaskFailuresPerNode,
+ public AMNodeImpl(NodeId nodeId, int sourceId, int maxTaskFailuresPerNode,
EventHandler eventHandler, boolean blacklistingEnabled,
AppContext appContext) {
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
this.nodeId = nodeId;
+ this.sourceId = sourceId;
this.appContext = appContext;
this.eventHandler = eventHandler;
this.blacklistingEnabled = blacklistingEnabled;
@@ -247,7 +249,7 @@ public class AMNodeImpl implements AMNode {
/* Blacklist the node with the AMNodeTracker and check if the node should be blacklisted */
protected boolean registerBadNodeAndShouldBlacklist() {
- return appContext.getNodeTracker().registerBadNodeAndShouldBlacklist(this);
+ return appContext.getNodeTracker().registerBadNodeAndShouldBlacklist(this, sourceId);
}
protected void blacklistSelf() {
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index 102cbe9..0668ff2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -18,9 +18,8 @@
package org.apache.tez.dag.app.rm.node;
-import java.util.HashSet;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.tez.dag.app.dag.DAG;
import org.slf4j.Logger;
@@ -29,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
@@ -42,23 +40,21 @@ public class AMNodeTracker extends AbstractService implements
static final Logger LOG = LoggerFactory.getLogger(AMNodeTracker.class);
- private final ConcurrentHashMap<NodeId, AMNode> nodeMap;
- private final ConcurrentHashMap<String, Set<NodeId>> blacklistMap;
+ private final ConcurrentMap<Integer, PerSourceNodeTracker> perSourceNodeTrackers;
+
@SuppressWarnings("rawtypes")
private final EventHandler eventHandler;
private final AppContext appContext;
- private int numClusterNodes;
- private boolean ignoreBlacklisting = false;
+
+ // Not final since it's setup in serviceInit
private int maxTaskFailuresPerNode;
private boolean nodeBlacklistingEnabled;
private int blacklistDisablePercent;
- float currentIgnoreBlacklistingCountThreshold = 0;
-
+
@SuppressWarnings("rawtypes")
public AMNodeTracker(EventHandler eventHandler, AppContext appContext) {
super("AMNodeMap");
- this.nodeMap = new ConcurrentHashMap<NodeId, AMNode>();
- this.blacklistMap = new ConcurrentHashMap<String, Set<NodeId>>();
+ this.perSourceNodeTrackers = new ConcurrentHashMap<>();
this.eventHandler = eventHandler;
this.appContext = appContext;
}
@@ -76,7 +72,7 @@ public class AMNodeTracker extends AbstractService implements
TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT);
LOG.info("blacklistDisablePercent is " + blacklistDisablePercent +
- ", blacklistingEnabled: " + nodeBlacklistingEnabled +
+ ", blacklistingEnabled: " + nodeBlacklistingEnabled +
", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode);
if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
@@ -85,130 +81,66 @@ public class AMNodeTracker extends AbstractService implements
+ ". Should be an integer between 0 and 100 or -1 to disabled");
}
}
-
- public void nodeSeen(NodeId nodeId) {
- if (nodeMap.putIfAbsent(nodeId, new AMNodeImpl(nodeId, maxTaskFailuresPerNode,
- eventHandler, nodeBlacklistingEnabled, appContext)) == null) {
- LOG.info("Adding new node: " + nodeId);
- }
- }
- private void addToBlackList(NodeId nodeId) {
- String host = nodeId.getHost();
-
- if (!blacklistMap.containsKey(host)) {
- blacklistMap.putIfAbsent(host, new HashSet<NodeId>());
- }
- Set<NodeId> nodes = blacklistMap.get(host);
-
- if (!nodes.contains(nodeId)) {
- nodes.add(nodeId);
- }
+ public void nodeSeen(NodeId nodeId, int sourceId) {
+ PerSourceNodeTracker nodeTracker = getAndCreateIfNeededPerSourceTracker(sourceId);
+ nodeTracker.nodeSeen(nodeId);
}
- boolean registerBadNodeAndShouldBlacklist(AMNode amNode) {
- if (nodeBlacklistingEnabled) {
- addToBlackList(amNode.getNodeId());
- computeIgnoreBlacklisting();
- return !ignoreBlacklisting;
- } else {
- return false;
- }
+
+ boolean registerBadNodeAndShouldBlacklist(AMNode amNode, int sourceId) {
+ return perSourceNodeTrackers.get(sourceId).registerBadNodeAndShouldBlacklist(amNode);
}
public void handle(AMNodeEvent rEvent) {
// No synchronization required until there's multiple dispatchers.
- NodeId nodeId = rEvent.getNodeId();
switch (rEvent.getType()) {
- case N_NODE_COUNT_UPDATED:
- AMNodeEventNodeCountUpdated event = (AMNodeEventNodeCountUpdated) rEvent;
- numClusterNodes = event.getNodeCount();
- LOG.info("Num cluster nodes = " + numClusterNodes);
- recomputeCurrentIgnoreBlacklistingThreshold();
- computeIgnoreBlacklisting();
- break;
- case N_TURNED_UNHEALTHY:
- case N_TURNED_HEALTHY:
- AMNode amNode = nodeMap.get(nodeId);
- if (amNode == null) {
- LOG.info("Ignoring RM Health Update for unknown node: " + nodeId);
- } else {
- amNode.handle(rEvent);
- }
- break;
- default:
- nodeMap.get(nodeId).handle(rEvent);
+ case N_CONTAINER_ALLOCATED:
+ case N_TA_SUCCEEDED:
+ case N_TA_ENDED:
+ case N_IGNORE_BLACKLISTING_ENABLED:
+ case N_IGNORE_BLACKLISTING_DISABLED:
+ // All of these will only be seen after a node has been registered.
+ perSourceNodeTrackers.get(rEvent.getSourceId()).handle(rEvent);
+ break;
+ case N_TURNED_UNHEALTHY:
+ case N_TURNED_HEALTHY:
+ case N_NODE_COUNT_UPDATED:
+ // These events can be seen without a node having been marked as 'seen' before
+ getAndCreateIfNeededPerSourceTracker(rEvent.getSourceId()).handle(rEvent);
+ break;
}
}
- private void recomputeCurrentIgnoreBlacklistingThreshold() {
- if (nodeBlacklistingEnabled && blacklistDisablePercent != -1) {
- currentIgnoreBlacklistingCountThreshold =
- (float) numClusterNodes * blacklistDisablePercent / 100;
- }
+ public AMNode get(NodeId nodeId, int sourceId) {
+ return perSourceNodeTrackers.get(sourceId).get(nodeId);
}
- // May be incorrect if there's multiple NodeManagers running on a single host.
- // knownNodeCount is based on node managers, not hosts. blacklisting is
- // currently based on hosts.
- protected void computeIgnoreBlacklisting() {
-
- boolean stateChanged = false;
-
- if (!nodeBlacklistingEnabled || blacklistDisablePercent == -1 || blacklistMap.size() == 0) {
- return;
- }
- if (blacklistMap.size() >= currentIgnoreBlacklistingCountThreshold) {
- if (ignoreBlacklisting == false) {
- ignoreBlacklisting = true;
- LOG.info("Ignore Blacklisting set to true. Known: " + numClusterNodes
- + ", Blacklisted: " + blacklistMap.size());
- stateChanged = true;
- }
- } else {
- if (ignoreBlacklisting == true) {
- ignoreBlacklisting = false;
- LOG.info("Ignore blacklisting set to false. Known: "
- + numClusterNodes + ", Blacklisted: " + blacklistMap.size());
- stateChanged = true;
- }
- }
-
- if (stateChanged) {
- sendIngoreBlacklistingStateToNodes();
- }
- }
-
- private void sendIngoreBlacklistingStateToNodes() {
- AMNodeEventType eventType =
- ignoreBlacklisting ? AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED
- : AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED;
- for (NodeId nodeId : nodeMap.keySet()) {
- sendEvent(new AMNodeEvent(nodeId, eventType));
- }
- }
-
- public AMNode get(NodeId nodeId) {
- return nodeMap.get(nodeId);
- }
-
- @SuppressWarnings("unchecked")
- private void sendEvent(Event<?> event) {
- this.eventHandler.handle(event);
- }
-
- public int getNumNodes() {
- return nodeMap.size();
+ public int getNumNodes(int sourceId) {
+ return perSourceNodeTrackers.get(sourceId).getNumNodes();
}
@Private
@VisibleForTesting
- public boolean isBlacklistingIgnored() {
- return this.ignoreBlacklisting;
+ public boolean isBlacklistingIgnored(int sourceId) {
+ return perSourceNodeTrackers.get(sourceId).isBlacklistingIgnored();
}
public void dagComplete(DAG dag) {
// TODO TEZ-2337 Maybe reset failures from previous DAGs
}
+ private PerSourceNodeTracker getAndCreateIfNeededPerSourceTracker(int sourceId) {
+ PerSourceNodeTracker nodeTracker = perSourceNodeTrackers.get(sourceId);
+ if (nodeTracker == null) {
+ nodeTracker =
+ new PerSourceNodeTracker(sourceId, eventHandler, appContext, maxTaskFailuresPerNode,
+ nodeBlacklistingEnabled, blacklistDisablePercent);
+ PerSourceNodeTracker old = perSourceNodeTrackers.putIfAbsent(sourceId, nodeTracker);
+ nodeTracker = old != null ? old : nodeTracker;
+ }
+ return nodeTracker;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
new file mode 100644
index 0000000..3264708
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.app.AppContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerSourceNodeTracker {
+
+ static final Logger LOG = LoggerFactory.getLogger(PerSourceNodeTracker.class);
+
+ private final int sourceId;
+ private final ConcurrentHashMap<NodeId, AMNode> nodeMap;
+ private final ConcurrentHashMap<String, Set<NodeId>> blacklistMap;
+
+ @SuppressWarnings("rawtypes")
+ private final EventHandler eventHandler;
+ private final AppContext appContext;
+
+ private final int maxTaskFailuresPerNode;
+ private final boolean nodeBlacklistingEnabled;
+ private final int blacklistDisablePercent;
+
+ private int numClusterNodes;
+ float currentIgnoreBlacklistingCountThreshold = 0;
+ private boolean ignoreBlacklisting = false;
+
+ @SuppressWarnings("rawtypes")
+ public PerSourceNodeTracker(int sourceId, EventHandler eventHandler, AppContext appContext,
+ int maxTaskFailuresPerNode, boolean nodeBlacklistingEnabled,
+ int blacklistDisablePercent) {
+ this.sourceId = sourceId;
+ this.nodeMap = new ConcurrentHashMap<>();
+ this.blacklistMap = new ConcurrentHashMap<>();
+ this.eventHandler = eventHandler;
+ this.appContext = appContext;
+
+ this.maxTaskFailuresPerNode = maxTaskFailuresPerNode;
+ this.nodeBlacklistingEnabled = nodeBlacklistingEnabled;
+ this.blacklistDisablePercent = blacklistDisablePercent;
+ }
+
+
+
+ public void nodeSeen(NodeId nodeId) {
+ if (nodeMap.putIfAbsent(nodeId, new AMNodeImpl(nodeId, sourceId, maxTaskFailuresPerNode,
+ eventHandler, nodeBlacklistingEnabled, appContext)) == null) {
+ LOG.info("Adding new node {} to nodeTracker {}", nodeId, sourceId);
+ }
+ }
+
+ public AMNode get(NodeId nodeId) {
+ return nodeMap.get(nodeId);
+ }
+
+ public int getNumNodes() {
+ return nodeMap.size();
+ }
+
+ public void handle(AMNodeEvent rEvent) {
+ // No synchronization required until there's multiple dispatchers.
+ NodeId nodeId = rEvent.getNodeId();
+ switch (rEvent.getType()) {
+ case N_NODE_COUNT_UPDATED:
+ AMNodeEventNodeCountUpdated event = (AMNodeEventNodeCountUpdated) rEvent;
+ numClusterNodes = event.getNodeCount();
+ LOG.info("Num cluster nodes = " + numClusterNodes);
+ recomputeCurrentIgnoreBlacklistingThreshold();
+ computeIgnoreBlacklisting();
+ break;
+ case N_TURNED_UNHEALTHY:
+ case N_TURNED_HEALTHY:
+ AMNode amNode = nodeMap.get(nodeId);
+ if (amNode == null) {
+ LOG.info("Ignoring RM Health Update for unknown node: " + nodeId);
+ } else {
+ amNode.handle(rEvent);
+ }
+ break;
+ default:
+ nodeMap.get(nodeId).handle(rEvent);
+ }
+ }
+
+ boolean registerBadNodeAndShouldBlacklist(AMNode amNode) {
+ if (nodeBlacklistingEnabled) {
+ addToBlackList(amNode.getNodeId());
+ computeIgnoreBlacklisting();
+ return !ignoreBlacklisting;
+ } else {
+ return false;
+ }
+ }
+
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ public boolean isBlacklistingIgnored() {
+ return this.ignoreBlacklisting;
+ }
+
+ private void recomputeCurrentIgnoreBlacklistingThreshold() {
+ if (nodeBlacklistingEnabled && blacklistDisablePercent != -1) {
+ currentIgnoreBlacklistingCountThreshold =
+ (float) numClusterNodes * blacklistDisablePercent / 100;
+ }
+ }
+
+ // May be incorrect if there's multiple NodeManagers running on a single host.
+ // knownNodeCount is based on node managers, not hosts. blacklisting is
+ // currently based on hosts.
+ protected void computeIgnoreBlacklisting() {
+
+ boolean stateChanged = false;
+
+ if (!nodeBlacklistingEnabled || blacklistDisablePercent == -1 || blacklistMap.size() == 0) {
+ return;
+ }
+ if (blacklistMap.size() >= currentIgnoreBlacklistingCountThreshold) {
+ if (ignoreBlacklisting == false) {
+ ignoreBlacklisting = true;
+ LOG.info("Ignore Blacklisting set to true. Known: " + numClusterNodes
+ + ", Blacklisted: " + blacklistMap.size());
+ stateChanged = true;
+ }
+ } else {
+ if (ignoreBlacklisting == true) {
+ ignoreBlacklisting = false;
+ LOG.info("Ignore blacklisting set to false. Known: "
+ + numClusterNodes + ", Blacklisted: " + blacklistMap.size());
+ stateChanged = true;
+ }
+ }
+
+ if (stateChanged) {
+ sendIngoreBlacklistingStateToNodes();
+ }
+ }
+
+ private void addToBlackList(NodeId nodeId) {
+ String host = nodeId.getHost();
+
+ if (!blacklistMap.containsKey(host)) {
+ blacklistMap.putIfAbsent(host, new HashSet<NodeId>());
+ }
+ Set<NodeId> nodes = blacklistMap.get(host);
+
+ if (!nodes.contains(nodeId)) {
+ nodes.add(nodeId);
+ }
+ }
+
+ private void sendIngoreBlacklistingStateToNodes() {
+ AMNodeEventType eventType =
+ ignoreBlacklisting ? AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED
+ : AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED;
+ for (NodeId nodeId : nodeMap.keySet()) {
+ sendEvent(new AMNodeEvent(nodeId, sourceId, eventType));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void sendEvent(Event<?> event) {
+ this.eventHandler.handle(event);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 9882954..0f35bba 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -257,7 +257,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
}
public void preemptContainer(ContainerData cData) {
- getTaskSchedulerEventHandler().containerCompleted(null,
+ getTaskSchedulerEventHandler().containerCompleted(0, null,
ContainerStatus.newInstance(cData.cId, null, "Preempted", ContainerExitStatus.PREEMPTED));
cData.clear();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 42d4b0b..7584b4c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -200,7 +200,7 @@ public class TestMockDAGAppMaster {
mockLauncher.waitTillContainersLaunched();
ContainerData cData = mockLauncher.getContainers().values().iterator().next();
DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
- mockApp.getTaskSchedulerEventHandler().preemptContainer(cData.cId);
+ mockApp.getTaskSchedulerEventHandler().preemptContainer(0, cData.cId);
mockLauncher.startScheduling(true);
dagClient.waitForCompletion();
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 080c20f..62edac9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -216,9 +216,9 @@ public class TestContainerReuse {
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
verify(taskSchedulerEventHandler).taskAllocated(
- eq(ta11), any(Object.class), eq(containerHost1));
+ eq(0), eq(ta11), any(Object.class), eq(containerHost1));
verify(taskSchedulerEventHandler).taskAllocated(
- eq(ta21), any(Object.class), eq(containerHost2));
+ eq(0), eq(ta21), any(Object.class), eq(containerHost2));
// Adding the event later so that task1 assigned to containerHost1
// is deterministic.
@@ -230,7 +230,7 @@ public class TestContainerReuse {
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
verify(taskSchedulerEventHandler, times(1)).taskAllocated(
- eq(ta31), any(Object.class), eq(containerHost1));
+ eq(0), eq(ta31), any(Object.class), eq(containerHost1));
verify(rmClient, times(0)).releaseAssignedContainer(
eq(containerHost1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -245,7 +245,7 @@ public class TestContainerReuse {
while (System.currentTimeMillis() < currentTs + 5000l) {
try {
verify(taskSchedulerEventHandler,
- times(1)).containerBeingReleased(eq(containerHost2.getId()));
+ times(1)).containerBeingReleased(eq(0), eq(containerHost2.getId()));
exception = null;
break;
} catch (Throwable e) {
@@ -351,8 +351,8 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Lists.newArrayList(containerHost1, containerHost2));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(containerHost1));
- verify(taskSchedulerEventHandler).taskAllocated(eq(ta21), any(Object.class), eq(containerHost2));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(containerHost1));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta21), any(Object.class), eq(containerHost2));
// Adding the event later so that task1 assigned to containerHost1 is deterministic.
taskSchedulerEventHandler.handleEvent(lrTa31);
@@ -363,7 +363,7 @@ public class TestContainerReuse {
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta21), eq(true), eq((TaskAttemptEndReason)null));
verify(taskSchedulerEventHandler, times(0)).taskAllocated(
- eq(ta31), any(Object.class), eq(containerHost2));
+ eq(0), eq(ta31), any(Object.class), eq(containerHost2));
verify(rmClient, times(1)).releaseAssignedContainer(
eq(containerHost2.getId()));
eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
@@ -459,13 +459,13 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1));
// Task assigned to container completed successfully. Container should be re-used.
taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
- verify(taskSchedulerEventHandler).taskAllocated(eq(ta12), any(Object.class), eq(container1));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
eventHandler.reset();
@@ -475,7 +475,7 @@ public class TestContainerReuse {
taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta12), eq(true), eq((TaskAttemptEndReason)null));
- verify(taskSchedulerEventHandler).taskAllocated(eq(ta13), any(Object.class), eq(container1));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
eventHandler.reset();
@@ -483,7 +483,7 @@ public class TestContainerReuse {
// Verify no re-use if a previous task fails.
taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, null, 0));
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class), eq(container1));
+ verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container1));
verify(taskScheduler).deallocateTask(eq(ta13), eq(false), eq((TaskAttemptEndReason)null));
verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
@@ -496,7 +496,7 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container2));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(ta14), any(Object.class), eq(container2));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container2));
// Task assigned to container completed successfully. No pending requests. Container should be released.
taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, null, 0));
@@ -606,14 +606,14 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1));
// First task had profiling on. This container can not be reused further.
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
- verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta12), any(Object.class),
+ verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta12), any(Object.class),
eq(container1));
verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
@@ -652,14 +652,14 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container2));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(ta13), any(Object.class), eq(container2));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container2));
// Verify that the container can not be reused when profiling option is turned on
// Even for 2 tasks having same profiling option can have container reusability.
taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta13), eq(true), eq((TaskAttemptEndReason)null));
- verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class),
+ verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class),
eq(container2));
verify(rmClient, times(1)).releaseAssignedContainer(eq(container2.getId()));
eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
@@ -698,13 +698,13 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container3));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(ta15), any(Object.class), eq(container3));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta15), any(Object.class), eq(container3));
//Ensure task 6 (of vertex 1) is allocated to same container
taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta15), eq(true), eq((TaskAttemptEndReason)null));
- verify(taskSchedulerEventHandler).taskAllocated(eq(ta16), any(Object.class), eq(container3));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta16), any(Object.class), eq(container3));
eventHandler.reset();
taskScheduler.close();
@@ -804,7 +804,7 @@ public class TestContainerReuse {
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
verify(taskSchedulerEventHandler).taskAllocated(
- eq(ta11), any(Object.class), eq(container1));
+ eq(0), eq(ta11), any(Object.class), eq(container1));
// Send launch request for task2 (vertex2)
taskSchedulerEventHandler.handleEvent(lrEvent12);
@@ -818,7 +818,7 @@ public class TestContainerReuse {
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
verify(taskSchedulerEventHandler, times(0)).taskAllocated(
- eq(ta12), any(Object.class), eq(container1));
+ eq(0), eq(ta12), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
eventHandler.reset();
@@ -826,7 +826,7 @@ public class TestContainerReuse {
LOG.info("Sleeping to ensure that the scheduling loop runs");
Thread.sleep(3000l);
verify(taskSchedulerEventHandler).taskAllocated(
- eq(ta12), any(Object.class), eq(container1));
+ eq(0), eq(ta12), any(Object.class), eq(container1));
// TA12 completed.
taskSchedulerEventHandler.handleEvent(
@@ -940,7 +940,7 @@ public class TestContainerReuse {
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
verify(taskSchedulerEventHandler).taskAllocated(
- eq(ta11), any(Object.class), eq(container1));
+ eq(0), eq(ta11), any(Object.class), eq(container1));
// Send launch request for task2 (vertex2)
taskSchedulerEventHandler.handleEvent(lrEvent21);
@@ -953,7 +953,7 @@ public class TestContainerReuse {
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
verify(taskSchedulerEventHandler).taskAllocated(
- eq(ta21), any(Object.class), eq(container1));
+ eq(0), eq(ta21), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
// Task 2 completes.
@@ -1063,7 +1063,7 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(ta111), any(Object.class), eq(container1));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1));
assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
@@ -1071,7 +1071,7 @@ public class TestContainerReuse {
taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta111), eq(true), eq((TaskAttemptEndReason)null));
- verify(taskSchedulerEventHandler).taskAllocated(eq(ta112), any(Object.class), eq(container1));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
@@ -1114,7 +1114,7 @@ public class TestContainerReuse {
// TODO This is terrible, need a better way to ensure the scheduling loop has run
LOG.info("Sleeping to ensure that the scheduling loop runs");
Thread.sleep(6000l);
- verify(taskSchedulerEventHandler).taskAllocated(eq(ta211), any(Object.class), eq(container1));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
@@ -1124,7 +1124,7 @@ public class TestContainerReuse {
taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta211), eq(true), eq((TaskAttemptEndReason)null));
- verify(taskSchedulerEventHandler).taskAllocated(eq(ta212), any(Object.class), eq(container1));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta212), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
@@ -1237,7 +1237,7 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(ta111), any(Object.class), eq(container1));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1));
assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
@@ -1245,7 +1245,7 @@ public class TestContainerReuse {
taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta111), eq(true), eq((TaskAttemptEndReason)null));
- verify(taskSchedulerEventHandler).taskAllocated(eq(ta112), any(Object.class), eq(container1));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
@@ -1290,7 +1290,7 @@ public class TestContainerReuse {
Thread.sleep(6000l);
verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId()));
- verify(taskSchedulerEventHandler).taskAllocated(eq(ta211), any(Object.class), eq(container2));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2));
eventHandler.reset();
taskScheduler.close();
@@ -1369,7 +1369,7 @@ public class TestContainerReuse {
drainNotifier.set(false);
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta11),
+ verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta11),
any(Object.class), eq(container1));
taskScheduler.close();
taskSchedulerEventHandler.close();
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index daf1db6..005692e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -163,7 +163,7 @@ public class TestTaskSchedulerEventHandler {
AMSchedulerEventTALaunchRequest lr =
new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint,
priority, containerContext, 0, 0, 0);
- schedulerHandler.taskAllocated(mockTaskAttempt, lr, container);
+ schedulerHandler.taskAllocated(0, mockTaskAttempt, lr, container);
assertEquals(2, mockEventHandler.events.size());
assertTrue(mockEventHandler.events.get(1) instanceof AMContainerEventAssignTA);
AMContainerEventAssignTA assignEvent =
@@ -227,7 +227,7 @@ public class TestTaskSchedulerEventHandler {
when(mockStatus.getContainerId()).thenReturn(mockCId);
when(mockStatus.getDiagnostics()).thenReturn(diagnostics);
when(mockStatus.getExitStatus()).thenReturn(ContainerExitStatus.PREEMPTED);
- schedulerHandler.containerCompleted(mockTask, mockStatus);
+ schedulerHandler.containerCompleted(0, mockTask, mockStatus);
assertEquals(1, mockEventHandler.events.size());
Event event = mockEventHandler.events.get(0);
assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
@@ -257,7 +257,7 @@ public class TestTaskSchedulerEventHandler {
ContainerId mockCId = mock(ContainerId.class);
verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId)any());
when(mockAMContainerMap.get(mockCId)).thenReturn(mockAmContainer);
- schedulerHandler.preemptContainer(mockCId);
+ schedulerHandler.preemptContainer(0, mockCId);
verify(mockTaskScheduler, times(1)).deallocateContainer(mockCId);
assertEquals(1, mockEventHandler.events.size());
Event event = mockEventHandler.events.get(0);
@@ -290,7 +290,7 @@ public class TestTaskSchedulerEventHandler {
when(mockStatus.getContainerId()).thenReturn(mockCId);
when(mockStatus.getDiagnostics()).thenReturn(diagnostics);
when(mockStatus.getExitStatus()).thenReturn(ContainerExitStatus.DISKS_FAILED);
- schedulerHandler.containerCompleted(mockTask, mockStatus);
+ schedulerHandler.containerCompleted(0, mockTask, mockStatus);
assertEquals(1, mockEventHandler.events.size());
Event event = mockEventHandler.events.get(0);
assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
@@ -325,7 +325,7 @@ public class TestTaskSchedulerEventHandler {
// use -104 rather than ContainerExitStatus.KILLED_EXCEEDED_PMEM because
// ContainerExitStatus.KILLED_EXCEEDED_PMEM is only available after hadoop-2.5
when(mockStatus.getExitStatus()).thenReturn(-104);
- schedulerHandler.containerCompleted(mockTask, mockStatus);
+ schedulerHandler.containerCompleted(0, mockTask, mockStatus);
assertEquals(1, mockEventHandler.events.size());
Event event = mockEventHandler.events.get(0);
assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
@@ -383,4 +383,5 @@ public class TestTaskSchedulerEventHandler {
}
+ // TODO TEZ-2003. Add tests with multiple schedulers, and ensuring that events go out with correct IDs.
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index ffab769..04610ab 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -134,7 +134,7 @@ class TestTaskSchedulerHelpers {
@Override
public void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) {
- taskSchedulers[0] = new TaskSchedulerWithDrainableAppCallback(this,
+ taskSchedulers[0] = new TaskSchedulerWithDrainableAppCallback(new TaskSchedulerAppCallbackImpl(this, 0),
containerSignatureMatcher, host, port, trackingUrl, amrmClientAsync,
appContext);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
index d907ea0..84d2e1f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
@@ -93,12 +93,12 @@ public class TestAMNodeTracker {
amNodeTracker.start();
NodeId nodeId = NodeId.newInstance("host1", 2342);
- amNodeTracker.nodeSeen(nodeId);
+ amNodeTracker.nodeSeen(nodeId, 0);
NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
- amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport));
+ amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport, 0));
dispatcher.await();
- assertEquals(AMNodeState.UNHEALTHY, amNodeTracker.get(nodeId).getState());
+ assertEquals(AMNodeState.UNHEALTHY, amNodeTracker.get(nodeId, 0).getState());
amNodeTracker.stop();
}
@@ -114,7 +114,7 @@ public class TestAMNodeTracker {
NodeId nodeId = NodeId.newInstance("unknownhost", 2342);
NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
- amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport));
+ amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport, 0));
dispatcher.await();
amNodeTracker.stop();
@@ -142,27 +142,27 @@ public class TestAMNodeTracker {
amNodeTracker.init(conf);
amNodeTracker.start();
- amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1));
+ amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1, 0));
NodeId nodeId = NodeId.newInstance("host1", 1234);
- amNodeTracker.nodeSeen(nodeId);
+ amNodeTracker.nodeSeen(nodeId, 0);
- AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId);
+ AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, 0);
ContainerId cId1 = mock(ContainerId.class);
ContainerId cId2 = mock(ContainerId.class);
- amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId1));
- amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId2));
+ amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId1));
+ amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId2));
TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class);
TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class);
- amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId1, ta1, true));
+ amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId1, ta1, true));
dispatcher.await();
assertEquals(1, node.numFailedTAs);
assertEquals(AMNodeState.ACTIVE, node.getState());
- amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true));
+ amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId2, ta2, true));
dispatcher.await();
assertEquals(2, node.numFailedTAs);
assertEquals(1, handler.events.size());
@@ -187,44 +187,44 @@ public class TestAMNodeTracker {
amNodeTracker.init(conf);
amNodeTracker.start();
- amNodeTracker.handle(new AMNodeEventNodeCountUpdated(4));
+ amNodeTracker.handle(new AMNodeEventNodeCountUpdated(4, 0));
NodeId nodeId = NodeId.newInstance("host1", 1234);
NodeId nodeId2 = NodeId.newInstance("host2", 1234);
NodeId nodeId3 = NodeId.newInstance("host3", 1234);
NodeId nodeId4 = NodeId.newInstance("host4", 1234);
- amNodeTracker.nodeSeen(nodeId);
- amNodeTracker.nodeSeen(nodeId2);
- amNodeTracker.nodeSeen(nodeId3);
- amNodeTracker.nodeSeen(nodeId4);
- AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId);
+ amNodeTracker.nodeSeen(nodeId, 0);
+ amNodeTracker.nodeSeen(nodeId2, 0);
+ amNodeTracker.nodeSeen(nodeId3, 0);
+ amNodeTracker.nodeSeen(nodeId4, 0);
+ AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, 0);
ContainerId cId1 = mock(ContainerId.class);
ContainerId cId2 = mock(ContainerId.class);
ContainerId cId3 = mock(ContainerId.class);
- amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId1));
- amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId2));
- amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId3));
+ amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId1));
+ amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId2));
+ amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId3));
assertEquals(3, node.containers.size());
TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class);
TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class);
TezTaskAttemptID ta3 = mock(TezTaskAttemptID.class);
- amNodeTracker.handle(new AMNodeEventTaskAttemptSucceeded(nodeId, cId1, ta1));
+ amNodeTracker.handle(new AMNodeEventTaskAttemptSucceeded(nodeId, 0, cId1, ta1));
assertEquals(1, node.numSuccessfulTAs);
- amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true));
+ amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId2, ta2, true));
assertEquals(1, node.numSuccessfulTAs);
assertEquals(1, node.numFailedTAs);
assertEquals(AMNodeState.ACTIVE, node.getState());
// duplicate should not affect anything
- amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true));
+ amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId2, ta2, true));
assertEquals(1, node.numSuccessfulTAs);
assertEquals(1, node.numFailedTAs);
assertEquals(AMNodeState.ACTIVE, node.getState());
- amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId3, ta3, true));
+ amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId3, ta3, true));
dispatcher.await();
assertEquals(1, node.numSuccessfulTAs);
assertEquals(2, node.numFailedTAs);
@@ -246,20 +246,20 @@ public class TestAMNodeTracker {
ContainerId cId5 = mock(ContainerId.class);
TezTaskAttemptID ta4 = mock(TezTaskAttemptID.class);
TezTaskAttemptID ta5 = mock(TezTaskAttemptID.class);
- AMNodeImpl node2 = (AMNodeImpl) amNodeTracker.get(nodeId2);
- amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, cId4));
- amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, cId5));
+ AMNodeImpl node2 = (AMNodeImpl) amNodeTracker.get(nodeId2, 0);
+ amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, 0, cId4));
+ amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, 0, cId5));
- amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, cId4, ta4, true));
+ amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, 0, cId4, ta4, true));
assertEquals(1, node2.numFailedTAs);
assertEquals(AMNodeState.ACTIVE, node2.getState());
handler.events.clear();
- amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, cId5, ta5, true));
+ amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, 0, cId5, ta5, true));
dispatcher.await();
assertEquals(2, node2.numFailedTAs);
assertEquals(AMNodeState.FORCED_ACTIVE, node2.getState());
- AMNodeImpl node3 = (AMNodeImpl) amNodeTracker.get(nodeId3);
+ AMNodeImpl node3 = (AMNodeImpl) amNodeTracker.get(nodeId3, 0);
assertEquals(AMNodeState.FORCED_ACTIVE, node3.getState());
assertEquals(5, handler.events.size());
@@ -286,7 +286,7 @@ public class TestAMNodeTracker {
// Increase the number of nodes. BLACKLISTING should be re-enabled.
// Node 1 and Node 2 should go into BLACKLISTED state
handler.events.clear();
- amNodeTracker.handle(new AMNodeEventNodeCountUpdated(8));
+ amNodeTracker.handle(new AMNodeEventNodeCountUpdated(8, 0));
dispatcher.await();
LOG.info(("Completed waiting for dispatcher to process all pending events"));
assertEquals(AMNodeState.BLACKLISTED, node.getState());
@@ -336,4 +336,6 @@ public class TestAMNodeTracker {
doReturn(healthReportTime).when(nodeReport).getLastHealthReportTime();
return nodeReport;
}
+
+ // TODO TEZ-2003. Add tests for multiple sources.
}