You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2012/02/10 02:49:30 UTC
svn commit: r1242635 [2/10] - in
/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ bin/ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/s...
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Fri Feb 10 01:49:08 2012
@@ -19,19 +19,24 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -127,7 +132,7 @@ import org.apache.hadoop.yarn.util.RackR
/**
* Implementation of TaskAttempt interface.
*/
-@SuppressWarnings({ "rawtypes", "deprecation" })
+@SuppressWarnings({ "rawtypes" })
public abstract class TaskAttemptImpl implements
org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
EventHandler<TaskAttemptEvent> {
@@ -142,7 +147,7 @@ public abstract class TaskAttemptImpl im
protected final JobConf conf;
protected final Path jobFile;
protected final int partition;
- protected final EventHandler eventHandler;
+ protected EventHandler eventHandler;
private final TaskAttemptId attemptId;
private final Clock clock;
private final org.apache.hadoop.mapred.JobID oldJobId;
@@ -910,8 +915,10 @@ public abstract class TaskAttemptImpl im
@SuppressWarnings("unchecked")
@Override
public void handle(TaskAttemptEvent event) {
- LOG.info("Processing " + event.getTaskAttemptID() +
- " of type " + event.getType());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing " + event.getTaskAttemptID() + " of type "
+ + event.getType());
+ }
writeLock.lock();
try {
final TaskAttemptState oldState = getState();
@@ -1054,7 +1061,7 @@ public abstract class TaskAttemptImpl im
}
}
- private static class RequestContainerTransition implements
+ static class RequestContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
private final boolean rescheduled;
public RequestContainerTransition(boolean rescheduled) {
@@ -1074,19 +1081,49 @@ public abstract class TaskAttemptImpl im
taskAttempt.attemptId,
taskAttempt.resourceCapability));
} else {
- int i = 0;
- String[] racks = new String[taskAttempt.dataLocalHosts.length];
+ Set<String> racks = new HashSet<String>();
for (String host : taskAttempt.dataLocalHosts) {
- racks[i++] = RackResolver.resolve(host).getNetworkLocation();
+ racks.add(RackResolver.resolve(host).getNetworkLocation());
}
- taskAttempt.eventHandler.handle(
- new ContainerRequestEvent(taskAttempt.attemptId,
- taskAttempt.resourceCapability,
- taskAttempt.dataLocalHosts, racks));
+ taskAttempt.eventHandler.handle(new ContainerRequestEvent(
+ taskAttempt.attemptId, taskAttempt.resourceCapability, taskAttempt
+ .resolveHosts(taskAttempt.dataLocalHosts), racks
+ .toArray(new String[racks.size()])));
}
}
}
+ protected String[] resolveHosts(String[] src) {
+ String[] result = new String[src.length];
+ for (int i = 0; i < src.length; i++) {
+ if (isIP(src[i])) {
+ result[i] = resolveHost(src[i]);
+ } else {
+ result[i] = src[i];
+ }
+ }
+ return result;
+ }
+
+ protected String resolveHost(String src) {
+ String result = src; // Fallback in case of failure.
+ try {
+ InetAddress addr = InetAddress.getByName(src);
+ result = addr.getHostName();
+ } catch (UnknownHostException e) {
+ LOG.warn("Failed to resolve address: " + src
+ + ". Continuing to use the same.");
+ }
+ return result;
+ }
+
+ private static final Pattern ipPattern = // Pattern for matching ip
+ Pattern.compile("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}");
+
+ protected boolean isIP(String src) {
+ return ipPattern.matcher(src).matches();
+ }
+
private static class ContainerAssignedTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings({ "unchecked" })
@@ -1278,15 +1315,11 @@ public abstract class TaskAttemptImpl im
TaskAttemptEvent event) {
//set the finish time
taskAttempt.setFinishTime();
- String taskType =
- TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()).toString();
- LOG.info("In TaskAttemptImpl taskType: " + taskType);
long slotMillis = computeSlotMillis(taskAttempt);
- JobCounterUpdateEvent jce =
- new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
- .getJobId());
+ TaskId taskId = taskAttempt.attemptId.getTaskId();
+ JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
jce.addCounterUpdate(
- taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP ?
+ taskId.getTaskType() == TaskType.MAP ?
JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
slotMillis);
taskAttempt.eventHandler.handle(jce);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Fri Feb 10 01:49:08 2012
@@ -81,7 +81,7 @@ import org.apache.hadoop.yarn.state.Stat
/**
* Implementation of Task interface.
*/
-@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+@SuppressWarnings({ "rawtypes", "unchecked" })
public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
private static final Log LOG = LogFactory.getLog(TaskImpl.class);
@@ -505,7 +505,9 @@ public abstract class TaskImpl implement
// This is always called in the Write Lock
private void addAndScheduleAttempt() {
TaskAttempt attempt = createAttempt();
- LOG.info("Created attempt " + attempt.getID());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created attempt " + attempt.getID());
+ }
switch (attempts.size()) {
case 0:
attempts = Collections.singletonMap(attempt.getID(), attempt);
@@ -537,7 +539,10 @@ public abstract class TaskImpl implement
@Override
public void handle(TaskEvent event) {
- LOG.debug("Processing " + event.getTaskID() + " of type " + event.getType());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing " + event.getTaskID() + " of type "
+ + event.getType());
+ }
try {
writeLock.lock();
TaskState oldState = getState();
@@ -559,6 +564,7 @@ public abstract class TaskImpl implement
}
private void internalError(TaskEventType type) {
+ LOG.error("Invalid event " + type + " on Task " + this.taskId);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
this.taskId.getJobId(), "Invalid event " + type +
" on Task " + this.taskId));
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Fri Feb 10 01:49:08 2012
@@ -103,6 +103,7 @@ public class LocalContainerAllocator ext
// This can happen when the connection to the RM has gone down. Keep
// re-trying until the retryInterval has expired.
if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+ LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.INTERNAL_ERROR));
throw new YarnException("Could not contact RM after " +
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Fri Feb 10 01:49:08 2012
@@ -32,8 +32,10 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
@@ -191,6 +193,11 @@ public class RecoveryService extends Com
in = fc.open(historyFile);
JobHistoryParser parser = new JobHistoryParser(in);
jobInfo = parser.parse();
+ Exception parseException = parser.getParseException();
+ if (parseException != null) {
+ LOG.info("Got an error parsing job-history file " + historyFile +
+ ", ignoring incomplete events.", parseException);
+ }
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
.getAllTasks();
for (TaskInfo taskInfo : taskInfos.values()) {
@@ -353,16 +360,24 @@ public class RecoveryService extends Com
//recover the task output
TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
attInfo.getAttemptId());
- try {
- committer.recoverTask(taskContext);
+ try {
+ TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
+ int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1);
+ if(type == TaskType.REDUCE || (type == TaskType.MAP && numReducers <= 0)) {
+ committer.recoverTask(taskContext);
+ LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
+ } else {
+ LOG.info("Will not try to recover output for "
+ + taskContext.getTaskAttemptID());
+ }
} catch (IOException e) {
+ LOG.error("Caught an exception while trying to recover task "+aId, e);
actualHandler.handle(new JobDiagnosticsUpdateEvent(
aId.getTaskId().getJobId(), "Error in recovering task output " +
e.getMessage()));
actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
JobEventType.INTERNAL_ERROR));
}
- LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
// send the done event
LOG.info("Sending done event to " + aId);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Feb 10 01:49:08 2012
@@ -30,7 +30,6 @@ import org.apache.hadoop.mapreduce.JobID
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@@ -46,9 +45,9 @@ import org.apache.hadoop.yarn.api.Applic
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -149,7 +148,7 @@ public abstract class RMCommunicator ext
LOG.info("minContainerCapability: " + minContainerCapability.getMemory());
LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory());
} catch (Exception are) {
- LOG.info("Exception while registering", are);
+ LOG.error("Exception while registering", are);
throw new YarnException(are);
}
}
@@ -183,7 +182,7 @@ public abstract class RMCommunicator ext
request.setTrackingUrl(historyUrl);
scheduler.finishApplicationMaster(request);
} catch(Exception are) {
- LOG.info("Exception while unregistering ", are);
+ LOG.error("Exception while unregistering ", are);
}
}
@@ -205,7 +204,7 @@ public abstract class RMCommunicator ext
try {
allocatorThread.join();
} catch (InterruptedException ie) {
- LOG.info("InterruptedException while stopping", ie);
+ LOG.warn("InterruptedException while stopping", ie);
}
unregister();
super.stop();
@@ -228,7 +227,7 @@ public abstract class RMCommunicator ext
// TODO: for other exceptions
}
} catch (InterruptedException e) {
- LOG.info("Allocated thread interrupted. Returning.");
+ LOG.warn("Allocated thread interrupted. Returning.");
return;
}
}
@@ -255,7 +254,9 @@ public abstract class RMCommunicator ext
if (UserGroupInformation.isSecurityEnabled()) {
String tokenURLEncodedStr = System.getenv().get(
ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
- LOG.debug("AppMasterToken is " + tokenURLEncodedStr);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AppMasterToken is " + tokenURLEncodedStr);
+ }
Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
try {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Feb 10 01:49:08 2012
@@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.JobCo
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -254,28 +255,30 @@ public class RMContainerAllocator extend
@SuppressWarnings({ "unchecked" })
protected synchronized void handleEvent(ContainerAllocatorEvent event) {
- LOG.info("Processing the event " + event.toString());
recalculateReduceSchedule = true;
if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
ContainerRequestEvent reqEvent = (ContainerRequestEvent) event;
+ JobId jobId = getJob().getID();
+ int supportedMaxContainerCapability =
+ getMaxContainerCapability().getMemory();
if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
if (mapResourceReqt == 0) {
mapResourceReqt = reqEvent.getCapability().getMemory();
int minSlotMemSize = getMinContainerCapability().getMemory();
mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize)
* minSlotMemSize;
- eventHandler.handle(new JobHistoryEvent(getJob().getID(),
+ eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
mapResourceReqt)));
LOG.info("mapResourceReqt:"+mapResourceReqt);
- if (mapResourceReqt > getMaxContainerCapability().getMemory()) {
+ if (mapResourceReqt > supportedMaxContainerCapability) {
String diagMsg = "MAP capability required is more than the supported " +
"max container capability in the cluster. Killing the Job. mapResourceReqt: " +
- mapResourceReqt + " maxContainerCapability:" + getMaxContainerCapability().getMemory();
+ mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability;
LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
- getJob().getID(), diagMsg));
- eventHandler.handle(new JobEvent(getJob().getID(), JobEventType.JOB_KILL));
+ jobId, diagMsg));
+ eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
}
}
//set the rounded off memory
@@ -288,20 +291,20 @@ public class RMContainerAllocator extend
//round off on slotsize
reduceResourceReqt = (int) Math.ceil((float)
reduceResourceReqt/minSlotMemSize) * minSlotMemSize;
- eventHandler.handle(new JobHistoryEvent(getJob().getID(),
+ eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.REDUCE,
reduceResourceReqt)));
LOG.info("reduceResourceReqt:"+reduceResourceReqt);
- if (reduceResourceReqt > getMaxContainerCapability().getMemory()) {
+ if (reduceResourceReqt > supportedMaxContainerCapability) {
String diagMsg = "REDUCE capability required is more than the " +
"supported max container capability in the cluster. Killing the " +
"Job. reduceResourceReqt: " + reduceResourceReqt +
- " maxContainerCapability:" + getMaxContainerCapability().getMemory();
+ " maxContainerCapability:" + supportedMaxContainerCapability;
LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
- getJob().getID(), diagMsg));
- eventHandler.handle(new JobEvent(getJob().getID(), JobEventType.JOB_KILL));
+ jobId, diagMsg));
+ eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
}
}
//set the rounded off memory
@@ -317,6 +320,9 @@ public class RMContainerAllocator extend
} else if (
event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
+
+ LOG.info("Processing the event " + event.toString());
+
TaskAttemptId aId = event.getAttemptID();
boolean removed = scheduledRequests.remove(aId);
@@ -543,6 +549,7 @@ public class RMContainerAllocator extend
// This can happen when the connection to the RM has gone down. Keep
// re-trying until the retryInterval has expired.
if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+ LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.INTERNAL_ERROR));
throw new YarnException("Could not contact RM after " +
@@ -578,7 +585,7 @@ public class RMContainerAllocator extend
computeIgnoreBlacklisting();
for (ContainerStatus cont : finishedContainers) {
- LOG.info("Received completed container " + cont);
+ LOG.info("Received completed container " + cont.getContainerId());
TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());
if (attemptID == null) {
LOG.error("Container complete event for unknown container id "
@@ -663,7 +670,9 @@ public class RMContainerAllocator extend
mapsHostMapping.put(host, list);
}
list.add(event.getAttemptID());
- LOG.info("Added attempt req to host " + host);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added attempt req to host " + host);
+ }
}
for (String rack: event.getRacks()) {
LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
@@ -672,7 +681,9 @@ public class RMContainerAllocator extend
mapsRackMapping.put(rack, list);
}
list.add(event.getAttemptID());
- LOG.info("Added attempt req to rack " + rack);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added attempt req to rack " + rack);
+ }
}
request = new ContainerRequest(event, PRIORITY_MAP);
}
@@ -693,18 +704,21 @@ public class RMContainerAllocator extend
containersAllocated += allocatedContainers.size();
while (it.hasNext()) {
Container allocated = it.next();
- LOG.info("Assigning container " + allocated.getId() +
- " with priority " + allocated.getPriority() +
- " to NM " + allocated.getNodeId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigning container " + allocated.getId()
+ + " with priority " + allocated.getPriority() + " to NM "
+ + allocated.getNodeId());
+ }
// check if allocated container meets memory requirements
// and whether we have any scheduled tasks that need
// a container to be assigned
boolean isAssignable = true;
Priority priority = allocated.getPriority();
+ int allocatedMemory = allocated.getResource().getMemory();
if (PRIORITY_FAST_FAIL_MAP.equals(priority)
|| PRIORITY_MAP.equals(priority)) {
- if (allocated.getResource().getMemory() < mapResourceReqt
+ if (allocatedMemory < mapResourceReqt
|| maps.isEmpty()) {
LOG.info("Cannot assign container " + allocated
+ " for a map as either "
@@ -715,7 +729,7 @@ public class RMContainerAllocator extend
}
}
else if (PRIORITY_REDUCE.equals(priority)) {
- if (allocated.getResource().getMemory() < reduceResourceReqt
+ if (allocatedMemory < reduceResourceReqt
|| reduces.isEmpty()) {
LOG.info("Cannot assign container " + allocated
+ " for a reduce as either "
@@ -729,15 +743,17 @@ public class RMContainerAllocator extend
boolean blackListed = false;
ContainerRequest assigned = null;
+ ContainerId allocatedContainerId = allocated.getId();
if (isAssignable) {
// do not assign if allocated container is on a
// blacklisted host
- blackListed = isNodeBlacklisted(allocated.getNodeId().getHost());
+ String allocatedHost = allocated.getNodeId().getHost();
+ blackListed = isNodeBlacklisted(allocatedHost);
if (blackListed) {
// we need to request for a new container
// and release the current one
LOG.info("Got allocated container on a blacklisted "
- + " host "+allocated.getNodeId().getHost()
+ + " host "+allocatedHost
+". Releasing container " + allocated);
// find the request matching this allocated container
@@ -774,11 +790,13 @@ public class RMContainerAllocator extend
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
assigned.attemptID, allocated, applicationACLs));
- assignedRequests.add(allocated.getId(), assigned.attemptID);
+ assignedRequests.add(allocatedContainerId, assigned.attemptID);
- LOG.info("Assigned container (" + allocated + ") " +
- " to task " + assigned.attemptID +
- " on node " + allocated.getNodeId().toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Assigned container (" + allocated + ") "
+ + " to task " + assigned.attemptID + " on node "
+ + allocated.getNodeId().toString());
+ }
}
else {
//not assigned to any request, release the container
@@ -793,7 +811,7 @@ public class RMContainerAllocator extend
// or if we could not assign it
if (blackListed || assigned == null) {
containersReleased++;
- release(allocated.getId());
+ release(allocatedContainerId);
}
}
}
@@ -806,10 +824,14 @@ public class RMContainerAllocator extend
LOG.info("Assigning container " + allocated + " to fast fail map");
assigned = assignToFailedMap(allocated);
} else if (PRIORITY_REDUCE.equals(priority)) {
- LOG.info("Assigning container " + allocated + " to reduce");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigning container " + allocated + " to reduce");
+ }
assigned = assignToReduce(allocated);
} else if (PRIORITY_MAP.equals(priority)) {
- LOG.info("Assigning container " + allocated + " to map");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigning container " + allocated + " to map");
+ }
assigned = assignToMap(allocated);
} else {
LOG.warn("Container allocated at unwanted priority: " + priority +
@@ -896,7 +918,9 @@ public class RMContainerAllocator extend
String host = allocated.getNodeId().getHost();
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
while (list != null && list.size() > 0) {
- LOG.info("Host matched to the request list " + host);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Host matched to the request list " + host);
+ }
TaskAttemptId tId = list.removeFirst();
if (maps.containsKey(tId)) {
assigned = maps.remove(tId);
@@ -905,7 +929,9 @@ public class RMContainerAllocator extend
jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
eventHandler.handle(jce);
hostLocalAssigned++;
- LOG.info("Assigned based on host match " + host);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigned based on host match " + host);
+ }
break;
}
}
@@ -921,7 +947,9 @@ public class RMContainerAllocator extend
jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
eventHandler.handle(jce);
rackLocalAssigned++;
- LOG.info("Assigned based on rack match " + rack);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigned based on rack match " + rack);
+ }
break;
}
}
@@ -932,7 +960,9 @@ public class RMContainerAllocator extend
new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
eventHandler.handle(jce);
- LOG.info("Assigned based on * match");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigned based on * match");
+ }
break;
}
}
@@ -952,8 +982,7 @@ public class RMContainerAllocator extend
new HashSet<TaskAttemptId>();
void add(ContainerId containerId, TaskAttemptId tId) {
- LOG.info("Assigned container " + containerId.toString()
- + " to " + tId);
+ LOG.info("Assigned container " + containerId.toString() + " to " + tId);
containerToAttemptMap.put(containerId, tId);
if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
maps.put(tId, containerId);
@@ -962,6 +991,7 @@ public class RMContainerAllocator extend
}
}
+ @SuppressWarnings("unchecked")
void preemptReduce(int toPreempt) {
List<TaskAttemptId> reduceList = new ArrayList<TaskAttemptId>
(reduces.keySet());
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Fri Feb 10 01:49:08 2012
@@ -155,13 +155,14 @@ public abstract class RMContainerRequest
lastClusterNmCount = clusterNmCount;
clusterNmCount = allocateResponse.getNumClusterNodes();
- LOG.info("getResources() for " + applicationId + ":" + " ask="
- + ask.size() + " release= " + release.size() +
- " newContainers=" + response.getAllocatedContainers().size() +
- " finishedContainers=" +
- response.getCompletedContainersStatuses().size() +
- " resourcelimit=" + availableResources +
- " knownNMs=" + clusterNmCount);
+ if (ask.size() > 0 || release.size() > 0) {
+ LOG.info("getResources() for " + applicationId + ":" + " ask="
+ + ask.size() + " release= " + release.size() + " newContainers="
+ + response.getAllocatedContainers().size() + " finishedContainers="
+ + response.getCompletedContainersStatuses().size()
+ + " resourcelimit=" + availableResources + " knownNMs="
+ + clusterNmCount);
+ }
ask.clear();
release.clear();
@@ -172,6 +173,9 @@ public abstract class RMContainerRequest
// knownNodeCount is based on node managers, not hosts. blacklisting is
// currently based on hosts.
protected void computeIgnoreBlacklisting() {
+ if (!nodeBlacklistingEnabled) {
+ return;
+ }
if (blacklistDisablePercent != -1
&& (blacklistedNodeCount != blacklistedNodes.size() ||
clusterNmCount != lastClusterNmCount)) {
@@ -200,7 +204,9 @@ public abstract class RMContainerRequest
return;
}
if (blacklistedNodes.contains(hostName)) {
- LOG.info("Host " + hostName + " is already blacklisted.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Host " + hostName + " is already blacklisted.");
+ }
return; //already blacklisted
}
Integer failures = nodeFailures.remove(hostName);
@@ -293,7 +299,9 @@ public abstract class RMContainerRequest
if (remoteRequests == null) {
remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
this.remoteRequestsTable.put(priority, remoteRequests);
- LOG.info("Added priority=" + priority);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added priority=" + priority);
+ }
}
Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
if (reqMap == null) {
@@ -313,10 +321,12 @@ public abstract class RMContainerRequest
// Note this down for next interaction with ResourceManager
ask.add(remoteRequest);
- LOG.info("addResourceRequest:" + " applicationId=" + applicationId.getId()
- + " priority=" + priority.getPriority() + " resourceName=" + resourceName
- + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
- + ask.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addResourceRequest:" + " applicationId="
+ + applicationId.getId() + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+ }
}
private void decResourceRequest(Priority priority, String resourceName,
@@ -328,16 +338,20 @@ public abstract class RMContainerRequest
// as we modify the resource requests by filtering out blacklisted hosts
// when they are added, this value may be null when being
// decremented
- LOG.debug("Not decrementing resource as " + resourceName
- + " is not present in request table");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not decrementing resource as " + resourceName
+ + " is not present in request table");
+ }
return;
}
ResourceRequest remoteRequest = reqMap.get(capability);
- LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId()
- + " priority=" + priority.getPriority() + " resourceName=" + resourceName
- + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
- + ask.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+ + applicationId.getId() + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+ }
remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
if (remoteRequest.getNumContainers() == 0) {
@@ -355,10 +369,12 @@ public abstract class RMContainerRequest
//already have it.
}
- LOG.info("AFTER decResourceRequest:" + " applicationId="
- + applicationId.getId() + " priority=" + priority.getPriority()
- + " resourceName=" + resourceName + " numContainers="
- + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.info("AFTER decResourceRequest:" + " applicationId="
+ + applicationId.getId() + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+ }
}
protected void release(ContainerId containerId) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java Fri Feb 10 01:49:08 2012
@@ -95,7 +95,13 @@ public class AppController extends Contr
* Render the /job page
*/
public void job() {
- requireJob();
+ try {
+ requireJob();
+ }
+ catch (Exception e) {
+ renderText(e.getMessage());
+ return;
+ }
render(jobPage());
}
@@ -110,7 +116,13 @@ public class AppController extends Contr
* Render the /jobcounters page
*/
public void jobCounters() {
- requireJob();
+ try {
+ requireJob();
+ }
+ catch (Exception e) {
+ renderText(e.getMessage());
+ return;
+ }
if (app.getJob() != null) {
setTitle(join("Counters for ", $(JOB_ID)));
}
@@ -121,7 +133,13 @@ public class AppController extends Contr
* Display a page showing a task's counters
*/
public void taskCounters() {
- requireTask();
+ try {
+ requireTask();
+ }
+ catch (Exception e) {
+ renderText(e.getMessage());
+ return;
+ }
if (app.getTask() != null) {
setTitle(StringHelper.join("Counters for ", $(TASK_ID)));
}
@@ -140,7 +158,13 @@ public class AppController extends Contr
* @throws IOException on any error.
*/
public void singleJobCounter() throws IOException{
- requireJob();
+ try {
+ requireJob();
+ }
+ catch (Exception e) {
+ renderText(e.getMessage());
+ return;
+ }
set(COUNTER_GROUP, URLDecoder.decode($(COUNTER_GROUP), "UTF-8"));
set(COUNTER_NAME, URLDecoder.decode($(COUNTER_NAME), "UTF-8"));
if (app.getJob() != null) {
@@ -155,7 +179,13 @@ public class AppController extends Contr
* @throws IOException on any error.
*/
public void singleTaskCounter() throws IOException{
- requireTask();
+ try {
+ requireTask();
+ }
+ catch (Exception e) {
+ renderText(e.getMessage());
+ return;
+ }
set(COUNTER_GROUP, URLDecoder.decode($(COUNTER_GROUP), "UTF-8"));
set(COUNTER_NAME, URLDecoder.decode($(COUNTER_NAME), "UTF-8"));
if (app.getTask() != null) {
@@ -176,7 +206,13 @@ public class AppController extends Contr
* Render the /tasks page
*/
public void tasks() {
- requireJob();
+ try {
+ requireJob();
+ }
+ catch (Exception e) {
+ renderText(e.getMessage());
+ return;
+ }
if (app.getJob() != null) {
try {
String tt = $(TASK_TYPE);
@@ -201,7 +237,13 @@ public class AppController extends Contr
* Render the /task page
*/
public void task() {
- requireTask();
+ try {
+ requireTask();
+ }
+ catch (Exception e) {
+ renderText(e.getMessage());
+ return;
+ }
if (app.getTask() != null) {
setTitle(join("Attempts for ", $(TASK_ID)));
}
@@ -219,7 +261,13 @@ public class AppController extends Contr
* Render the attempts page
*/
public void attempts() {
- requireJob();
+ try {
+ requireJob();
+ }
+ catch (Exception e) {
+ renderText(e.getMessage());
+ return;
+ }
if (app.getJob() != null) {
try {
String taskType = $(TASK_TYPE);
@@ -252,6 +300,13 @@ public class AppController extends Contr
*/
public void conf() {
requireJob();
+ try {
+ requireJob();
+ }
+ catch (Exception e) {
+ renderText(e.getMessage());
+ return;
+ }
render(confPage());
}
@@ -280,41 +335,43 @@ public class AppController extends Contr
void accessDenied(String s) {
setStatus(HttpServletResponse.SC_FORBIDDEN);
setTitle(join("Access denied: ", s));
- throw new RuntimeException("Access denied: " + s);
}
/**
* check for job access.
* @param job the job that is being accessed
+ * @return True if the requesting user has permission to view the job
*/
- void checkAccess(Job job) {
+ boolean checkAccess(Job job) {
UserGroupInformation callerUgi = UserGroupInformation.createRemoteUser(
request().getRemoteUser());
- if (!job.checkAccess(callerUgi, JobACL.VIEW_JOB)) {
- accessDenied("User " + request().getRemoteUser() + " does not have " +
- " permissions.");
- }
+ return job.checkAccess(callerUgi, JobACL.VIEW_JOB);
}
/**
* Ensure that a JOB_ID was passed into the page.
*/
public void requireJob() {
- try {
- if ($(JOB_ID).isEmpty()) {
- throw new RuntimeException("missing job ID");
- }
- JobId jobID = MRApps.toJobID($(JOB_ID));
- app.setJob(app.context.getJob(jobID));
- if (app.getJob() == null) {
- notFound($(JOB_ID));
- }
- /* check for acl access */
- Job job = app.context.getJob(jobID);
- checkAccess(job);
- } catch (Exception e) {
- badRequest(e.getMessage() == null ?
- e.getClass().getName() : e.getMessage());
+ if ($(JOB_ID).isEmpty()) {
+ badRequest("missing job ID");
+ throw new RuntimeException("Bad Request: Missing job ID");
+ }
+
+ JobId jobID = MRApps.toJobID($(JOB_ID));
+ app.setJob(app.context.getJob(jobID));
+ if (app.getJob() == null) {
+ notFound($(JOB_ID));
+ throw new RuntimeException("Not Found: " + $(JOB_ID));
+ }
+
+ /* check for acl access */
+ Job job = app.context.getJob(jobID);
+ if (!checkAccess(job)) {
+ accessDenied("User " + request().getRemoteUser() + " does not have " +
+ " permission to view job " + $(JOB_ID));
+ throw new RuntimeException("Access denied: User " +
+ request().getRemoteUser() + " does not have permission to view job " +
+ $(JOB_ID));
}
}
@@ -322,24 +379,30 @@ public class AppController extends Contr
* Ensure that a TASK_ID was passed into the page.
*/
public void requireTask() {
- try {
- if ($(TASK_ID).isEmpty()) {
- throw new RuntimeException("missing task ID");
- }
- TaskId taskID = MRApps.toTaskID($(TASK_ID));
- Job job = app.context.getJob(taskID.getJobId());
- app.setJob(job);
- if (app.getJob() == null) {
- notFound(MRApps.toString(taskID.getJobId()));
- } else {
- app.setTask(app.getJob().getTask(taskID));
- if (app.getTask() == null) {
- notFound($(TASK_ID));
- }
+ if ($(TASK_ID).isEmpty()) {
+ badRequest("missing task ID");
+ throw new RuntimeException("missing task ID");
+ }
+
+ TaskId taskID = MRApps.toTaskID($(TASK_ID));
+ Job job = app.context.getJob(taskID.getJobId());
+ app.setJob(job);
+ if (app.getJob() == null) {
+ notFound(MRApps.toString(taskID.getJobId()));
+ throw new RuntimeException("Not Found: " + $(JOB_ID));
+ } else {
+ app.setTask(app.getJob().getTask(taskID));
+ if (app.getTask() == null) {
+ notFound($(TASK_ID));
+ throw new RuntimeException("Not Found: " + $(TASK_ID));
}
- checkAccess(job);
- } catch (Exception e) {
- badRequest(e.getMessage());
+ }
+ if (!checkAccess(job)) {
+ accessDenied("User " + request().getRemoteUser() + " does not have " +
+ " permission to view job " + $(JOB_ID));
+ throw new RuntimeException("Access denied: User " +
+ request().getRemoteUser() + " does not have permission to view job " +
+ $(JOB_ID));
}
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppView.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppView.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppView.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppView.java Fri Feb 10 01:49:08 2012
@@ -33,7 +33,6 @@ public class AppView extends TwoColumnLa
}
protected void commonPreHead(Page.HTML<_> html) {
- html.meta_http("refresh", "10");
set(ACCORDION_ID, "nav");
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:1}");
set(THEMESWITCHER_ID, "themeswitcher");
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersPage.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersPage.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersPage.java Fri Feb 10 01:49:08 2012
@@ -27,6 +27,10 @@ public class CountersPage extends AppVie
@Override protected void preHead(Page.HTML<_> html) {
commonPreHead(html);
+
+ // Counters page is a summary. Helps to refresh automatically.
+ html.meta_http("refresh", "10");
+
String tid = $(TASK_ID);
String activeNav = "3";
if(tid == null || tid.isEmpty()) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobPage.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobPage.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobPage.java Fri Feb 10 01:49:08 2012
@@ -32,6 +32,10 @@ public class JobPage extends AppView {
set(TITLE, jobID.isEmpty() ? "Bad request: missing job ID"
: join("MapReduce Job ", $(JOB_ID)));
commonPreHead(html);
+
+ // This is a job-summary page. Helps to refresh automatically.
+ html.meta_http("refresh", "10");
+
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:2}");
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java Fri Feb 10 01:49:08 2012
@@ -116,6 +116,11 @@ public class TaskPage extends AppView {
@Override protected void preHead(Page.HTML<_> html) {
commonPreHead(html);
+
+ // This page is a list of all attempts which are limited in number. Okay to
+ // refresh automatically.
+ html.meta_http("refresh", "10");
+
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:3}");
set(DATATABLES_ID, "attempts");
set(initID(DATATABLES, "attempts"), attemptsTableInit());
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Fri Feb 10 01:49:08 2012
@@ -436,7 +436,7 @@ public class MRApp extends MRAppMaster {
return new ClientService(){
@Override
public InetSocketAddress getBindAddress() {
- return null;
+ return NetUtils.createSocketAddr("localhost:9876");
}
@Override
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Fri Feb 10 01:49:08 2012
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.app;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -29,16 +31,30 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.junit.Test;
public class MRAppBenchmark {
@@ -167,17 +183,89 @@ public class MRAppBenchmark {
}
}
+ @Test
public void benchmark1() throws Exception {
- int maps = 100000;
- int reduces = 100;
+ int maps = 100; // Adjust for benchmarking. Start with thousands.
+ int reduces = 0;
System.out.println("Running benchmark with maps:"+maps +
" reduces:"+reduces);
- run(new MRApp(maps, reduces, true, this.getClass().getName(), true));
+ run(new MRApp(maps, reduces, true, this.getClass().getName(), true) {
+
+ @Override
+ protected ContainerAllocator createContainerAllocator(
+ ClientService clientService, AppContext context) {
+ return new RMContainerAllocator(clientService, context) {
+ @Override
+ protected AMRMProtocol createSchedulerProxy() {
+ return new AMRMProtocol() {
+
+ @Override
+ public RegisterApplicationMasterResponse
+ registerApplicationMaster(
+ RegisterApplicationMasterRequest request)
+ throws YarnRemoteException {
+ RegisterApplicationMasterResponse response =
+ Records.newRecord(RegisterApplicationMasterResponse.class);
+ response.setMinimumResourceCapability(BuilderUtils
+ .newResource(1024));
+ response.setMaximumResourceCapability(BuilderUtils
+ .newResource(10240));
+ return response;
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request)
+ throws YarnRemoteException {
+ FinishApplicationMasterResponse response =
+ Records.newRecord(FinishApplicationMasterResponse.class);
+ return response;
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnRemoteException {
+
+ AllocateResponse response =
+ Records.newRecord(AllocateResponse.class);
+ List<ResourceRequest> askList = request.getAskList();
+ List<Container> containers = new ArrayList<Container>();
+ for (ResourceRequest req : askList) {
+ if (req.getHostName() != "*") {
+ continue;
+ }
+ int numContainers = req.getNumContainers();
+ for (int i = 0; i < numContainers; i++) {
+ ContainerId containerId =
+ BuilderUtils.newContainerId(
+ request.getApplicationAttemptId(),
+ request.getResponseId() + i);
+ containers.add(BuilderUtils
+ .newContainer(containerId, BuilderUtils.newNodeId("host"
+ + containerId.getId(), 2345),
+ "host" + containerId.getId() + ":5678", req
+ .getCapability(), req.getPriority(), null));
+ }
+ }
+
+ AMResponse amResponse = Records.newRecord(AMResponse.class);
+ amResponse.setAllocatedContainers(containers);
+ amResponse.setResponseId(request.getResponseId() + 1);
+ response.setAMResponse(amResponse);
+ response.setNumClusterNodes(350);
+ return response;
+ }
+ };
+ }
+ };
+ }
+ });
}
+ @Test
public void benchmark2() throws Exception {
- int maps = 4000;
- int reduces = 1000;
+ int maps = 100; // Adjust for benchmarking, start with a couple of thousands
+ int reduces = 50;
int maxConcurrentRunningTasks = 500;
System.out.println("Running benchmark with throttled running tasks with " +
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Fri Feb 10 01:49:08 2012
@@ -18,6 +18,10 @@
package org.apache.hadoop.mapreduce.v2.app;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.spy;
+
import java.util.Iterator;
import junit.framework.Assert;
@@ -35,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.junit.Test;
/**
@@ -175,6 +180,41 @@ public class TestMRApp {
app.waitForState(job, JobState.ERROR);
}
+ private final class MRAppWithSpiedJob extends MRApp {
+ private JobImpl spiedJob;
+
+ private MRAppWithSpiedJob(int maps, int reduces, boolean autoComplete,
+ String testName, boolean cleanOnStart) {
+ super(maps, reduces, autoComplete, testName, cleanOnStart);
+ }
+
+ @Override
+ protected Job createJob(Configuration conf) {
+ spiedJob = spy((JobImpl) super.createJob(conf));
+ ((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob);
+ return spiedJob;
+ }
+
+ JobImpl getSpiedJob() {
+ return this.spiedJob;
+ }
+ }
+
+ @Test
+ public void testCountersOnJobFinish() throws Exception {
+ MRAppWithSpiedJob app =
+ new MRAppWithSpiedJob(1, 1, true, this.getClass().getName(), true);
+ JobImpl job = (JobImpl)app.submit(new Configuration());
+ app.waitForState(job, JobState.SUCCEEDED);
+ app.verifyCompleted();
+ System.out.println(job.getAllCounters());
+ // Just call getCounters
+ job.getAllCounters();
+ job.getAllCounters();
+ // Should be called only once
+ verify(job, times(1)).constructFinalFullcounters();
+ }
+
@Test
public void checkJobStateTypeConversion() {
//verify that all states can be converted without
@@ -200,5 +240,6 @@ public class TestMRApp {
t.testCommitPending();
t.testCompletedMapsForReduceSlowstart();
t.testJobError();
+ t.testCountersOnJobFinish();
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Fri Feb 10 01:49:08 2012
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.event.Disp
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
+@SuppressWarnings({"unchecked", "rawtypes"})
public class TestRecovery {
private static final Log LOG = LogFactory.getLog(TestRecovery.class);
@@ -112,7 +113,7 @@ public class TestRecovery {
Assert.assertEquals("Reduce Task state not correct",
TaskState.RUNNING, reduceTask.getReport().getTaskState());
- //send the fail signal to the 1st map task attempt
+ //send the fail signal to the 1st map task attempt
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt1.getID(),
@@ -193,7 +194,7 @@ public class TestRecovery {
//RUNNING state
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
- //send the done signal to the 2nd map task
+ //send the done signal to the 2nd map task
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
mapTask2.getAttempts().values().iterator().next().getID(),
@@ -349,6 +350,151 @@ public class TestRecovery {
validateOutput();
}
+ @Test
+ public void testOutputRecoveryMapsOnly() throws Exception {
+ int runCount = 0;
+ MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),
+ true, ++runCount);
+ Configuration conf = new Configuration();
+ conf.setBoolean("mapred.mapper.new-api", true);
+ conf.setBoolean("mapred.reducer.new-api", true);
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct",
+ 3, job.getTasks().size());
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task mapTask1 = it.next();
+ Task mapTask2 = it.next();
+ Task reduceTask1 = it.next();
+
+ // all maps must be running
+ app.waitForState(mapTask1, TaskState.RUNNING);
+
+ TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
+ .next();
+
+ //before sending the TA_DONE, event make sure attempt has come to
+ //RUNNING state
+ app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+
+ // write output corresponding to map1 (This is just to validate that it is
+ //no included in the output)
+ writeBadOutput(task1Attempt1, conf);
+
+ //send the done signal to the map
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ task1Attempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait for map task to complete
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+ // Verify the shuffle-port
+ Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+
+ //stop the app before the job completes.
+ app.stop();
+
+ //rerun
+ //in rerun the map will be recovered from previous run
+ app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+ ++runCount);
+ conf = new Configuration();
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ conf.setBoolean("mapred.mapper.new-api", true);
+ conf.setBoolean("mapred.reducer.new-api", true);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct",
+ 3, job.getTasks().size());
+ it = job.getTasks().values().iterator();
+ mapTask1 = it.next();
+ mapTask2 = it.next();
+ reduceTask1 = it.next();
+
+ // map will be recovered, no need to send done
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+ // Verify the shuffle-port after recovery
+ task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+ Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+
+ app.waitForState(mapTask2, TaskState.RUNNING);
+
+ TaskAttempt task2Attempt1 = mapTask2.getAttempts().values().iterator()
+ .next();
+
+ //before sending the TA_DONE, event make sure attempt has come to
+ //RUNNING state
+ app.waitForState(task2Attempt1, TaskAttemptState.RUNNING);
+
+ //send the done signal to the map
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ task2Attempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait for map task to complete
+ app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+ // Verify the shuffle-port
+ Assert.assertEquals(5467, task2Attempt1.getShufflePort());
+
+ app.waitForState(reduceTask1, TaskState.RUNNING);
+ TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
+
+ // write output corresponding to reduce1
+ writeOutput(reduce1Attempt1, conf);
+
+ //send the done signal to the 1st reduce
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ reduce1Attempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait for first reduce task to complete
+ app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+
+ app.waitForState(job, JobState.SUCCEEDED);
+ app.verifyCompleted();
+ validateOutput();
+ }
+
+ private void writeBadOutput(TaskAttempt attempt, Configuration conf)
+ throws Exception {
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
+ TypeConverter.fromYarn(attempt.getID()));
+
+ TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
+ RecordWriter theRecordWriter = theOutputFormat
+ .getRecordWriter(tContext);
+
+ NullWritable nullWritable = NullWritable.get();
+ try {
+ theRecordWriter.write(key2, val2);
+ theRecordWriter.write(null, nullWritable);
+ theRecordWriter.write(null, val2);
+ theRecordWriter.write(nullWritable, val1);
+ theRecordWriter.write(key1, nullWritable);
+ theRecordWriter.write(key2, null);
+ theRecordWriter.write(null, null);
+ theRecordWriter.write(key1, val1);
+ } finally {
+ theRecordWriter.close(tContext);
+ }
+
+ OutputFormat outputFormat = ReflectionUtils.newInstance(
+ tContext.getOutputFormatClass(), conf);
+ OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
+ committer.commitTask(tContext);
+}
+
+
private void writeOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Fri Feb 10 01:49:08 2012
@@ -18,42 +18,40 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.IOException;
-import java.util.Map;
import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
-import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
-import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
-import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
-import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.junit.Test;
import org.junit.Assert;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.any;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
+import org.junit.Test;
/**
* Tests various functions of the JobImpl class
*/
+@SuppressWarnings({"unchecked", "rawtypes"})
public class TestJobImpl {
@Test
@@ -100,7 +98,9 @@ public class TestJobImpl {
"for successful job",
JobImpl.checkJobCompleteSuccess(mockJob));
Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
- JobImpl.checkJobCompleteSuccess(mockJob), JobState.SUCCEEDED);
+ JobImpl.checkJobCompleteSuccess(mockJob), JobState.SUCCEEDED);
+
+
}
@Test
@@ -133,5 +133,63 @@ public class TestJobImpl {
t.testJobNoTasksTransition();
t.testCheckJobCompleteSuccess();
t.testCheckJobCompleteSuccessFailed();
+ t.testCheckAccess();
+ }
+
+ @Test
+ public void testCheckAccess() {
+ // Create two unique users
+ String user1 = System.getProperty("user.name");
+ String user2 = user1 + "1234";
+ UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser(user1);
+ UserGroupInformation ugi2 = UserGroupInformation.createRemoteUser(user2);
+
+ // Create the job
+ JobID jobID = JobID.forName("job_1234567890000_0001");
+ JobId jobId = TypeConverter.toYarn(jobID);
+
+ // Setup configuration access only to user1 (owner)
+ Configuration conf1 = new Configuration();
+ conf1.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+ conf1.set(MRJobConfig.JOB_ACL_VIEW_JOB, "");
+
+ // Verify access
+ JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
+ null, null, null, true, null, 0, null);
+ Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
+ Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
+
+ // Setup configuration access to the user1 (owner) and user2
+ Configuration conf2 = new Configuration();
+ conf2.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+ conf2.set(MRJobConfig.JOB_ACL_VIEW_JOB, user2);
+
+ // Verify access
+ JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
+ null, null, null, true, null, 0, null);
+ Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
+ Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
+
+ // Setup configuration access with security enabled and access to all
+ Configuration conf3 = new Configuration();
+ conf3.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+ conf3.set(MRJobConfig.JOB_ACL_VIEW_JOB, "*");
+
+ // Verify access
+ JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
+ null, null, null, true, null, 0, null);
+ Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
+ Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
+
+ // Setup configuration access without security enabled
+ Configuration conf4 = new Configuration();
+ conf4.setBoolean(MRConfig.MR_ACLS_ENABLED, false);
+ conf4.set(MRJobConfig.JOB_ACL_VIEW_JOB, "");
+
+ // Verify access
+ JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
+ null, null, null, true, null, 0, null);
+ Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
+ Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Fri Feb 10 01:49:08 2012
@@ -18,30 +18,54 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapTaskAttemptImpl;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
@SuppressWarnings("unchecked")
public class TestTaskAttempt{
@@ -58,6 +82,96 @@ public class TestTaskAttempt{
testMRAppHistory(app);
}
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testSingleRackRequest() throws Exception {
+ TaskAttemptImpl.RequestContainerTransition rct =
+ new TaskAttemptImpl.RequestContainerTransition(false);
+
+ EventHandler eventHandler = mock(EventHandler.class);
+ String[] hosts = new String[3];
+ hosts[0] = "host1";
+ hosts[1] = "host2";
+ hosts[2] = "host3";
+ TaskSplitMetaInfo splitInfo =
+ new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l);
+
+ TaskAttemptImpl mockTaskAttempt =
+ createMapTaskAttemptImplForTest(eventHandler, splitInfo);
+ TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
+
+ rct.transition(mockTaskAttempt, mockTAEvent);
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(2)).handle(arg.capture());
+ if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
+ Assert.fail("Second Event not of type ContainerRequestEvent");
+ }
+ ContainerRequestEvent cre =
+ (ContainerRequestEvent) arg.getAllValues().get(1);
+ String[] requestedRacks = cre.getRacks();
+ //Only a single occurance of /DefaultRack
+ assertEquals(1, requestedRacks.length);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testHostResolveAttempt() throws Exception {
+ TaskAttemptImpl.RequestContainerTransition rct =
+ new TaskAttemptImpl.RequestContainerTransition(false);
+
+ EventHandler eventHandler = mock(EventHandler.class);
+ String[] hosts = new String[3];
+ hosts[0] = "192.168.1.1";
+ hosts[1] = "host2";
+ hosts[2] = "host3";
+ TaskSplitMetaInfo splitInfo =
+ new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l);
+
+ TaskAttemptImpl mockTaskAttempt =
+ createMapTaskAttemptImplForTest(eventHandler, splitInfo);
+ TaskAttemptImpl spyTa = spy(mockTaskAttempt);
+ when(spyTa.resolveHost(hosts[0])).thenReturn("host1");
+
+ TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
+ rct.transition(spyTa, mockTAEvent);
+ verify(spyTa).resolveHost(hosts[0]);
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(2)).handle(arg.capture());
+ if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
+ Assert.fail("Second Event not of type ContainerRequestEvent");
+ }
+ Map<String, Boolean> expected = new HashMap<String, Boolean>();
+ expected.put("host1", true);
+ expected.put("host2", true);
+ expected.put("host3", true);
+ ContainerRequestEvent cre =
+ (ContainerRequestEvent) arg.getAllValues().get(1);
+ String[] requestedHosts = cre.getHosts();
+ for (String h : requestedHosts) {
+ expected.remove(h);
+ }
+ assertEquals(0, expected.size());
+ }
+
+ @SuppressWarnings("rawtypes")
+ private TaskAttemptImpl createMapTaskAttemptImplForTest(
+ EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ Path jobFile = mock(Path.class);
+ JobConf jobConf = new JobConf();
+ OutputCommitter outputCommitter = mock(OutputCommitter.class);
+ Clock clock = new SystemClock();
+ TaskAttemptImpl taImpl =
+ new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+ taskSplitMetaInfo, jobConf, taListener, outputCommitter, null,
+ null, clock);
+ return taImpl;
+ }
+
private void testMRAppHistory(MRApp app) throws Exception {
Configuration conf = new Configuration();
Job job = app.submit(conf);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Fri Feb 10 01:49:08 2012
@@ -59,7 +59,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-@SuppressWarnings({ "rawtypes", "deprecation" })
+@SuppressWarnings("rawtypes")
public class TestTaskImpl {
private static final Log LOG = LogFactory.getLog(TestTaskImpl.class);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java Fri Feb 10 01:49:08 2012
@@ -777,7 +777,7 @@ public class TestAMWebServicesJobs exten
assertTrue("name not set", (name != null && !name.isEmpty()));
JSONArray counters = counterGroup.getJSONArray("counter");
for (int j = 0; j < counters.length(); j++) {
- JSONObject counter = counters.getJSONObject(i);
+ JSONObject counter = counters.getJSONObject(j);
String counterName = counter.getString("name");
assertTrue("counter name not set",
(counterName != null && !counterName.isEmpty()));