You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ar...@apache.org on 2013/12/19 03:04:42 UTC
svn commit: r1552205 - in
/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java...
Author: arp
Date: Thu Dec 19 02:03:47 2013
New Revision: 1552205
URL: http://svn.apache.org/r1552205
Log:
Merge forward from trunk to branch HDFS-2832
Added:
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/
- copied from r1552204, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/
- copied from r1552204, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/checkpoint/
- copied from r1552204, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/checkpoint/
Modified:
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/ (props changed)
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt (contents, props changed)
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/conf/ (props changed)
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed)
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java
hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
Propchange: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1549949-1552204
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt Thu Dec 19 02:03:47 2013
@@ -71,6 +71,12 @@ Trunk (Unreleased)
MAPREDUCE-5014. Extend Distcp to accept a custom CopyListing.
(Srikanth Sundarrajan via amareshwari)
+ MAPREDUCE-5197. Add a service for checkpointing task state.
+ (Carlo Curino via cdouglas)
+
+ MAPREDUCE-5189. Add policies and wiring to respond to preemption requests
+ from YARN. (Carlo Curino via cdouglas)
+
BUG FIXES
MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant.
@@ -237,6 +243,15 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5656. bzip2 codec can drop records when reading data in splits
(jlowe)
+ MAPREDUCE-5623. TestJobCleanup fails because of RejectedExecutionException
+ and NPE. (jlowe)
+
+ MAPREDUCE-5679. TestJobHistoryParsing has race condition (Liyin Liang via
+ jlowe)
+
+ MAPREDUCE-5687. Fixed failure in TestYARNRunner caused by YARN-1446. (Jian He
+ via vinodkv)
+
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
@@ -290,6 +305,9 @@ Release 2.3.0 - UNRELEASED
event: TA_TOO_MANY_FETCH_FAILURE at KILLED for TaskAttemptImpl (Gera
Shegalov via jlowe)
+ MAPREDUCE-5674. Missing start and finish time in mapred.JobStatus.
+ (Chuan Liu via cnauroth)
+
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1549949-1552204
Propchange: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1535792-1536571,1536573-1552204
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Thu Dec 19 02:03:47 2013
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.PolicyProvider;
@@ -84,14 +85,17 @@ public class TaskAttemptListenerImpl ext
.newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>());
private JobTokenSecretManager jobTokenSecretManager = null;
+ private AMPreemptionPolicy preemptionPolicy;
public TaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager,
- RMHeartbeatHandler rmHeartbeatHandler) {
+ RMHeartbeatHandler rmHeartbeatHandler,
+ AMPreemptionPolicy preemptionPolicy) {
super(TaskAttemptListenerImpl.class.getName());
this.context = context;
this.jobTokenSecretManager = jobTokenSecretManager;
this.rmHeartbeatHandler = rmHeartbeatHandler;
+ this.preemptionPolicy = preemptionPolicy;
}
@Override
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Dec 19 02:03:47 2013
@@ -102,6 +102,8 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
@@ -188,8 +190,8 @@ public class MRAppMaster extends Composi
private ContainerLauncher containerLauncher;
private EventHandler<CommitterEvent> committerEventHandler;
private Speculator speculator;
- private TaskAttemptListener taskAttemptListener;
- private JobTokenSecretManager jobTokenSecretManager =
+ protected TaskAttemptListener taskAttemptListener;
+ protected JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
private JobId jobId;
private boolean newApiCommitter;
@@ -197,6 +199,7 @@ public class MRAppMaster extends Composi
private JobEventDispatcher jobEventDispatcher;
private JobHistoryEventHandler jobHistoryEventHandler;
private SpeculatorEventDispatcher speculatorEventDispatcher;
+ private AMPreemptionPolicy preemptionPolicy;
private Job job;
private Credentials jobCredentials = new Credentials(); // Filled during init
@@ -383,8 +386,12 @@ public class MRAppMaster extends Composi
committerEventHandler = createCommitterEventHandler(context, committer);
addIfService(committerEventHandler);
+ //policy handling preemption requests from RM
+ preemptionPolicy = createPreemptionPolicy(conf);
+ preemptionPolicy.init(context);
+
//service to handle requests to TaskUmbilicalProtocol
- taskAttemptListener = createTaskAttemptListener(context);
+ taskAttemptListener = createTaskAttemptListener(context, preemptionPolicy);
addIfService(taskAttemptListener);
//service to log job history events
@@ -475,6 +482,12 @@ public class MRAppMaster extends Composi
return committer;
}
+ protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) {
+ return ReflectionUtils.newInstance(conf.getClass(
+ MRJobConfig.MR_AM_PREEMPTION_POLICY,
+ NoopAMPreemptionPolicy.class, AMPreemptionPolicy.class), conf);
+ }
+
protected boolean keepJobFiles(JobConf conf) {
return (conf.getKeepTaskFilesPattern() != null || conf
.getKeepFailedTaskFiles());
@@ -692,10 +705,11 @@ public class MRAppMaster extends Composi
}
}
- protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
+ protected TaskAttemptListener createTaskAttemptListener(AppContext context,
+ AMPreemptionPolicy preemptionPolicy) {
TaskAttemptListener lis =
new TaskAttemptListenerImpl(context, jobTokenSecretManager,
- getRMHeartbeatHandler());
+ getRMHeartbeatHandler(), preemptionPolicy);
return lis;
}
@@ -805,7 +819,7 @@ public class MRAppMaster extends Composi
, containerID);
} else {
this.containerAllocator = new RMContainerAllocator(
- this.clientService, this.context);
+ this.clientService, this.context, preemptionPolicy);
}
((Service)this.containerAllocator).init(getConfig());
((Service)this.containerAllocator).start();
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Dec 19 02:03:47 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
@@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -147,13 +149,17 @@ public class RMContainerAllocator extend
private long retryInterval;
private long retrystartTime;
+ private final AMPreemptionPolicy preemptionPolicy;
+
BlockingQueue<ContainerAllocatorEvent> eventQueue
= new LinkedBlockingQueue<ContainerAllocatorEvent>();
private ScheduleStats scheduleStats = new ScheduleStats();
- public RMContainerAllocator(ClientService clientService, AppContext context) {
+ public RMContainerAllocator(ClientService clientService, AppContext context,
+ AMPreemptionPolicy preemptionPolicy) {
super(clientService, context);
+ this.preemptionPolicy = preemptionPolicy;
this.stopped = new AtomicBoolean(false);
}
@@ -361,11 +367,15 @@ public class RMContainerAllocator extend
LOG.error("Could not deallocate container for task attemptId " +
aId);
}
+ preemptionPolicy.handleCompletedContainer(event.getAttemptID());
} else if (
event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) {
ContainerFailedEvent fEv = (ContainerFailedEvent) event;
String host = getHost(fEv.getContMgrAddress());
containerFailedOnHost(host);
+ // propagate failures to preemption policy to discard checkpoints for
+ // failed tasks
+ preemptionPolicy.handleFailedContainer(event.getAttemptID());
}
}
@@ -399,7 +409,7 @@ public class RMContainerAllocator extend
}
scheduledRequests.reduces.clear();
- //preempt for making space for atleast one map
+ //preempt for making space for at least one map
int premeptionLimit = Math.max(mapResourceReqt,
(int) (maxReducePreemptionLimit * memLimit));
@@ -409,7 +419,7 @@ public class RMContainerAllocator extend
int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt);
toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
- LOG.info("Going to preempt " + toPreempt);
+ LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
assignedRequests.preemptReduce(toPreempt);
}
}
@@ -595,6 +605,14 @@ public class RMContainerAllocator extend
}
List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
+
+ // propagate preemption requests
+ final PreemptionMessage preemptReq = response.getPreemptionMessage();
+ if (preemptReq != null) {
+ preemptionPolicy.preempt(
+ new PreemptionContext(assignedRequests), preemptReq);
+ }
+
if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) {
//something changed
recalculateReduceSchedule = true;
@@ -630,7 +648,9 @@ public class RMContainerAllocator extend
String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
diagnostics));
- }
+
+ preemptionPolicy.handleCompletedContainer(attemptID);
+ }
}
return newContainers;
}
@@ -1232,4 +1252,27 @@ public class RMContainerAllocator extend
" RackLocal:" + rackLocalAssigned);
}
}
+
+ static class PreemptionContext extends AMPreemptionPolicy.Context {
+ final AssignedRequests reqs;
+
+ PreemptionContext(AssignedRequests reqs) {
+ this.reqs = reqs;
+ }
+ @Override
+ public TaskAttemptId getTaskAttempt(ContainerId container) {
+ return reqs.get(container);
+ }
+
+ @Override
+ public List<Container> getContainers(TaskType t){
+ if(TaskType.REDUCE.equals(t))
+ return new ArrayList<Container>(reqs.reduces.values());
+ if(TaskType.MAP.equals(t))
+ return new ArrayList<Container>(reqs.maps.values());
+ return null;
+ }
+
+ }
+
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Thu Dec 19 02:03:47 2013
@@ -60,7 +60,7 @@ public class TestTaskAttemptListenerImpl
JobTokenSecretManager jobTokenSecretManager,
RMHeartbeatHandler rmHeartbeatHandler,
TaskHeartbeatHandler hbHandler) {
- super(context, jobTokenSecretManager, rmHeartbeatHandler);
+ super(context, jobTokenSecretManager, rmHeartbeatHandler, null);
this.taskHeartbeatHandler = hbHandler;
}
@@ -191,7 +191,7 @@ public class TestTaskAttemptListenerImpl
mock(RMHeartbeatHandler.class);
final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptListenerImpl listener =
- new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
+ new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler;
@@ -245,7 +245,7 @@ public class TestTaskAttemptListenerImpl
mock(RMHeartbeatHandler.class);
final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptListenerImpl listener =
- new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
+ new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler;
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Thu Dec 19 02:03:47 2013
@@ -79,6 +79,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
@@ -467,7 +468,8 @@ public class MRApp extends MRAppMaster {
}
@Override
- protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
+ protected TaskAttemptListener createTaskAttemptListener(
+ AppContext context, AMPreemptionPolicy policy) {
return new TaskAttemptListener(){
@Override
public InetSocketAddress getAddress() {
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Thu Dec 19 02:03:47 2013
@@ -33,6 +33,8 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -61,6 +63,8 @@ public class MRAppBenchmark {
/**
* Runs memory and time benchmark with Mock MRApp.
+ * @param app Application to submit
+ * @throws Exception On application failure
*/
public void run(MRApp app) throws Exception {
Logger rootLogger = LogManager.getRootLogger();
@@ -133,6 +137,7 @@ public class MRAppBenchmark {
protected void serviceStart() throws Exception {
thread = new Thread(new Runnable() {
@Override
+ @SuppressWarnings("unchecked")
public void run() {
ContainerAllocatorEvent event = null;
while (!Thread.currentThread().isInterrupted()) {
@@ -192,7 +197,9 @@ public class MRAppBenchmark {
@Override
protected ContainerAllocator createContainerAllocator(
ClientService clientService, AppContext context) {
- return new RMContainerAllocator(clientService, context) {
+
+ AMPreemptionPolicy policy = new NoopAMPreemptionPolicy();
+ return new RMContainerAllocator(clientService, context, policy) {
@Override
protected ApplicationMasterProtocol createSchedulerProxy() {
return new ApplicationMasterProtocol() {
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Thu Dec 19 02:03:47 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -247,13 +248,14 @@ public class TestFail {
super(maps, reduces, false, "TimeOutTaskMRApp", true);
}
@Override
- protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
+ protected TaskAttemptListener createTaskAttemptListener(
+ AppContext context, AMPreemptionPolicy policy) {
//This will create the TaskAttemptListener with TaskHeartbeatHandler
//RPC servers are not started
//task time out is reduced
//when attempt times out, heartbeat handler will send the lost event
//leading to Attempt failure
- return new TaskAttemptListenerImpl(getContext(), null, null) {
+ return new TaskAttemptListenerImpl(getContext(), null, null, policy) {
@Override
public void startRpcServer(){};
@Override
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Thu Dec 19 02:03:47 2013
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.app;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
+
import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.isA;
@@ -1428,14 +1430,15 @@ public class TestRMContainerAllocator {
// Use this constructor when using a real job.
MyContainerAllocator(MyResourceManager rm,
ApplicationAttemptId appAttemptId, AppContext context) {
- super(createMockClientService(), context);
+ super(createMockClientService(), context, new NoopAMPreemptionPolicy());
this.rm = rm;
}
// Use this constructor when you are using a mocked job.
public MyContainerAllocator(MyResourceManager rm, Configuration conf,
ApplicationAttemptId appAttemptId, Job job) {
- super(createMockClientService(), createAppContext(appAttemptId, job));
+ super(createMockClientService(), createAppContext(appAttemptId, job),
+ new NoopAMPreemptionPolicy());
this.rm = rm;
super.init(conf);
super.start();
@@ -1444,7 +1447,8 @@ public class TestRMContainerAllocator {
public MyContainerAllocator(MyResourceManager rm, Configuration conf,
ApplicationAttemptId appAttemptId, Job job, Clock clock) {
super(createMockClientService(),
- createAppContext(appAttemptId, job, clock));
+ createAppContext(appAttemptId, job, clock),
+ new NoopAMPreemptionPolicy());
this.rm = rm;
super.init(conf);
super.start();
@@ -1671,7 +1675,8 @@ public class TestRMContainerAllocator {
ApplicationId.newInstance(1, 1));
RMContainerAllocator allocator = new RMContainerAllocator(
- mock(ClientService.class), appContext) {
+ mock(ClientService.class), appContext,
+ new NoopAMPreemptionPolicy()) {
@Override
protected void register() {
}
@@ -1721,7 +1726,8 @@ public class TestRMContainerAllocator {
@Test
public void testCompletedContainerEvent() {
RMContainerAllocator allocator = new RMContainerAllocator(
- mock(ClientService.class), mock(AppContext.class));
+ mock(ClientService.class), mock(AppContext.class),
+ new NoopAMPreemptionPolicy());
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
MRBuilderUtils.newTaskId(
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Thu Dec 19 02:03:47 2013
@@ -300,6 +300,8 @@ public class TypeConverter {
.getCleanupProgress(), fromYarn(jobreport.getJobState()),
jobPriority, jobreport.getUser(), jobreport.getJobName(), jobreport
.getJobFile(), trackingUrl, jobreport.isUber());
+ jobStatus.setStartTime(jobreport.getStartTime());
+ jobStatus.setFinishTime(jobreport.getFinishTime());
jobStatus.setFailureInfo(jobreport.getDiagnostics());
return jobStatus;
}
@@ -441,6 +443,7 @@ public class TypeConverter {
);
jobStatus.setSchedulingInfo(trackingUrl); // Set AM tracking url
jobStatus.setStartTime(application.getStartTime());
+ jobStatus.setFinishTime(application.getFinishTime());
jobStatus.setFailureInfo(application.getDiagnostics());
jobStatus.setNeededMem(application.getApplicationResourceUsageReport().getNeededResources().getMemory());
jobStatus.setNumReservedSlots(application.getApplicationResourceUsageReport().getNumReservedContainers());
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java Thu Dec 19 02:03:47 2013
@@ -27,6 +27,8 @@ import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@@ -71,6 +73,7 @@ public class TestTypeConverter {
@Test
public void testFromYarn() throws Exception {
int appStartTime = 612354;
+ int appFinishTime = 612355;
YarnApplicationState state = YarnApplicationState.RUNNING;
ApplicationId applicationId = ApplicationId.newInstance(0, 0);
ApplicationReport applicationReport = Records
@@ -78,6 +81,7 @@ public class TestTypeConverter {
applicationReport.setApplicationId(applicationId);
applicationReport.setYarnApplicationState(state);
applicationReport.setStartTime(appStartTime);
+ applicationReport.setFinishTime(appFinishTime);
applicationReport.setUser("TestTypeConverter-user");
ApplicationResourceUsageReport appUsageRpt = Records
.newRecord(ApplicationResourceUsageReport.class);
@@ -91,6 +95,7 @@ public class TestTypeConverter {
applicationReport.setApplicationResourceUsageReport(appUsageRpt);
JobStatus jobStatus = TypeConverter.fromYarn(applicationReport, "dummy-jobfile");
Assert.assertEquals(appStartTime, jobStatus.getStartTime());
+ Assert.assertEquals(appFinishTime, jobStatus.getFinishTime());
Assert.assertEquals(state.toString(), jobStatus.getState().toString());
}
@@ -172,4 +177,25 @@ public class TestTypeConverter {
Assert.assertEquals("QueueInfo children weren't properly converted",
returned.getQueueChildren().size(), 1);
}
+
+ @Test
+ public void testFromYarnJobReport() throws Exception {
+ int jobStartTime = 612354;
+ int jobFinishTime = 612355;
+ JobState state = JobState.RUNNING;
+ JobId jobId = Records.newRecord(JobId.class);
+ JobReport jobReport = Records.newRecord(JobReport.class);
+ ApplicationId applicationId = ApplicationId.newInstance(0, 0);
+ jobId.setAppId(applicationId);
+ jobId.setId(0);
+ jobReport.setJobId(jobId);
+ jobReport.setJobState(state);
+ jobReport.setStartTime(jobStartTime);
+ jobReport.setFinishTime(jobFinishTime);
+ jobReport.setUser("TestTypeConverter-user");
+ JobStatus jobStatus = TypeConverter.fromYarn(jobReport, "dummy-jobfile");
+ Assert.assertEquals(jobStartTime, jobStatus.getStartTime());
+ Assert.assertEquals(jobFinishTime, jobStatus.getFinishTime());
+ Assert.assertEquals(state.toString(), jobStatus.getState().toString());
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java Thu Dec 19 02:03:47 2013
@@ -45,5 +45,9 @@ public enum JobCounter {
TOTAL_LAUNCHED_UBERTASKS,
NUM_UBER_SUBMAPS,
NUM_UBER_SUBREDUCES,
- NUM_FAILED_UBERTASKS
+ NUM_FAILED_UBERTASKS,
+ TASKS_REQ_PREEMPT,
+ CHECKPOINTS,
+ CHECKPOINT_BYTES,
+ CHECKPOINT_TIME
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Dec 19 02:03:47 2013
@@ -459,7 +459,13 @@ public interface MRJobConfig {
public static final String MR_AM_JOB_REDUCE_PREEMPTION_LIMIT =
MR_AM_PREFIX + "job.reduce.preemption.limit";
public static final float DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 0.5f;
-
+
+ /**
+ * Policy class encoding responses to preemption requests.
+ */
+ public static final String MR_AM_PREEMPTION_POLICY =
+ MR_AM_PREFIX + "preemption.policy";
+
/** AM ACL disabled. **/
public static final String JOB_AM_ACCESS_DISABLED =
"mapreduce.job.am-access-disabled";
@@ -708,4 +714,7 @@ public interface MRJobConfig {
public static final String MR_APPLICATION_TYPE = "MAPREDUCE";
+ public static final String TASK_PREEMPTION =
+ "mapreduce.job.preemption";
+
}
Propchange: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1542123-1552204
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties Thu Dec 19 02:03:47 2013
@@ -27,3 +27,7 @@ SLOTS_MILLIS_MAPS.name= Total
SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces in occupied slots (ms)
FALLOW_SLOTS_MILLIS_MAPS.name= Total time spent by all maps waiting after reserving slots (ms)
FALLOW_SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces waiting after reserving slots (ms)
+TASKS_REQ_PREEMPT.name= Tasks that have been asked to preempt
+CHECKPOINTS.name= Number of checkpoints reported
+CHECKPOINT_BYTES.name= Total amount of bytes in checkpoints
+CHECKPOINT_TIME.name= Total time spent checkpointing (ms)
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Thu Dec 19 02:03:47 2013
@@ -339,8 +339,11 @@ public class TestJobHistoryParsing {
PrintStream stdps = System.out;
try {
System.setOut(new PrintStream(outContent));
- HistoryViewer viewer = new HistoryViewer(fc.makeQualified(
- fileInfo.getHistoryFile()).toString(), conf, true);
+ HistoryViewer viewer;
+ synchronized (fileInfo) {
+ viewer = new HistoryViewer(fc.makeQualified(
+ fileInfo.getHistoryFile()).toString(), conf, true);
+ }
viewer.print();
for (TaskInfo taskInfo : allTasks.values()) {
@@ -397,29 +400,27 @@ public class TestJobHistoryParsing {
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
- String jobhistoryDir = JobHistoryUtils
- .getHistoryIntermediateDoneDirForUser(conf);
JobHistory jobHistory = new JobHistory();
jobHistory.init(conf);
+ HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
+
+ JobHistoryParser parser;
+ JobInfo jobInfo;
+ synchronized (fileInfo) {
+ Path historyFilePath = fileInfo.getHistoryFile();
+ FSDataInputStream in = null;
+ FileContext fc = null;
+ try {
+ fc = FileContext.getFileContext(conf);
+ in = fc.open(fc.makeQualified(historyFilePath));
+ } catch (IOException ioe) {
+ LOG.info("Can not open history file: " + historyFilePath, ioe);
+ throw (new Exception("Can not open History File"));
+ }
- JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
- .getJobIndexInfo();
- String jobhistoryFileName = FileNameIndexUtils
- .getDoneFileName(jobIndexInfo);
-
- Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
- FSDataInputStream in = null;
- FileContext fc = null;
- try {
- fc = FileContext.getFileContext(conf);
- in = fc.open(fc.makeQualified(historyFilePath));
- } catch (IOException ioe) {
- LOG.info("Can not open history file: " + historyFilePath, ioe);
- throw (new Exception("Can not open History File"));
+ parser = new JobHistoryParser(in);
+ jobInfo = parser.parse();
}
-
- JobHistoryParser parser = new JobHistoryParser(in);
- JobInfo jobInfo = parser.parse();
Exception parseException = parser.getParseException();
Assert.assertNull("Caught an expected exception " + parseException,
parseException);
@@ -464,29 +465,28 @@ public class TestJobHistoryParsing {
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
- String jobhistoryDir = JobHistoryUtils
- .getHistoryIntermediateDoneDirForUser(conf);
JobHistory jobHistory = new JobHistory();
jobHistory.init(conf);
- JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
- .getJobIndexInfo();
- String jobhistoryFileName = FileNameIndexUtils
- .getDoneFileName(jobIndexInfo);
-
- Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
- FSDataInputStream in = null;
- FileContext fc = null;
- try {
- fc = FileContext.getFileContext(conf);
- in = fc.open(fc.makeQualified(historyFilePath));
- } catch (IOException ioe) {
- LOG.info("Can not open history file: " + historyFilePath, ioe);
- throw (new Exception("Can not open History File"));
- }
+ HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
+
+ JobHistoryParser parser;
+ JobInfo jobInfo;
+ synchronized (fileInfo) {
+ Path historyFilePath = fileInfo.getHistoryFile();
+ FSDataInputStream in = null;
+ FileContext fc = null;
+ try {
+ fc = FileContext.getFileContext(conf);
+ in = fc.open(fc.makeQualified(historyFilePath));
+ } catch (IOException ioe) {
+ LOG.info("Can not open history file: " + historyFilePath, ioe);
+ throw (new Exception("Can not open History File"));
+ }
- JobHistoryParser parser = new JobHistoryParser(in);
- JobInfo jobInfo = parser.parse();
+ parser = new JobHistoryParser(in);
+ jobInfo = parser.parse();
+ }
Exception parseException = parser.getParseException();
Assert.assertNull("Caught an expected exception " + parseException,
parseException);
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Thu Dec 19 02:03:47 2013
@@ -304,7 +304,7 @@ public class TestClientRedirect {
@Override
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request) throws IOException {
- return recordFactory.newRecordInstance(KillApplicationResponse.class);
+ return KillApplicationResponse.newInstance(true);
}
@Override
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java Thu Dec 19 02:03:47 2013
@@ -195,8 +195,7 @@ public class TestJobCleanup {
RunningJob job = jobClient.submitJob(jc);
JobID id = job.getID();
job.waitForCompletion();
- Counters counters = job.getCounters();
- assertTrue("No. of failed maps should be 1",counters.getCounter(JobCounter.NUM_FAILED_MAPS) == 1);
+ assertEquals("Job did not fail", JobStatus.FAILED, job.getJobState());
if (fileName != null) {
Path testFile = new Path(outDir, fileName);
@@ -242,9 +241,7 @@ public class TestJobCleanup {
job.killJob(); // kill the job
job.waitForCompletion(); // wait for the job to complete
-
- counters = job.getCounters();
- assertTrue("No. of killed maps should be 1", counters.getCounter(JobCounter.NUM_KILLED_MAPS) == 1);
+ assertEquals("Job was not killed", JobStatus.KILLED, job.getJobState());
if (fileName != null) {
Path testFile = new Path(outDir, fileName);
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java Thu Dec 19 02:03:47 2013
@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -208,7 +209,7 @@ public class TestYARNRunner extends Test
};
/* make sure kill calls finish application master */
when(clientRMProtocol.forceKillApplication(any(KillApplicationRequest.class)))
- .thenReturn(null);
+ .thenReturn(KillApplicationResponse.newInstance(true));
delegate.killApplication(appId);
verify(clientRMProtocol).forceKillApplication(any(KillApplicationRequest.class));