You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by su...@apache.org on 2014/07/12 04:25:03 UTC
svn commit: r1609878 [1/2] - in
/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project: ./ dev-support/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-c...
Author: subru
Date: Sat Jul 12 02:24:40 2014
New Revision: 1609878
URL: http://svn.apache.org/r1609878
Log:
syncing YARN-1051 branch with trunk
Added:
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/
- copied from r1605891, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputSplitWithLocationInfo.java
- copied unchanged from r1605891, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputSplitWithLocationInfo.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SplitLocationInfo.java
- copied unchanged from r1605891, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SplitLocationInfo.java
Removed:
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
Modified:
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/ (props changed)
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt (contents, props changed)
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (contents, props changed)
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java
Propchange: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1603348-1609877
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt Sat Jul 12 02:24:40 2014
@@ -145,6 +145,24 @@ Trunk (Unreleased)
MAPREDUCE-5867. Fix NPE in KillAMPreemptionPolicy related to
ProportionalCapacityPreemptionPolicy (Sunil G via devaraj)
+Release 2.6.0 - UNRELEASED
+
+ INCOMPATIBLE CHANGES
+
+ NEW FEATURES
+
+ IMPROVEMENTS
+
+ OPTIMIZATIONS
+
+ BUG FIXES
+
+ MAPREDUCE-5866. TestFixedLengthInputFormat fails in windows.
+ (Varun Vasudev via cnauroth)
+
+ MAPREDUCE-5956. Made MR AM not use maxAttempts to determine if the current
+ attempt is the last retry. (Wangda Tan via zjshen)
+
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
@@ -213,6 +231,12 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5834. Increased test-timeouts in TestGridMixClasses to avoid
occassional failures. (Mit Desai via vinodkv)
+ MAPREDUCE-5896. InputSplits should indicate which locations have the block
+ cached in memory. (Sandy Ryza via kasha)
+
+ MAPREDUCE-5844. Add a configurable delay to reducer-preemption.
+ (Maysam Yabandeh via kasha)
+
OPTIMIZATIONS
BUG FIXES
@@ -267,6 +291,19 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5924. Changed TaskAttemptImpl to ignore TA_COMMIT_PENDING event
at COMMIT_PENDING state. (Zhijie Shen via jianhe)
+ MAPREDUCE-5939. StartTime showing up as the epoch time in JHS UI after
+ upgrade (Chen He via jlowe)
+
+ MAPREDUCE-5900. Changed to the interpret container preemption exit code as a
+ task attempt killing event. (Mayank Bansal via zjshen)
+
+ MAPREDUCE-5868. Fixed an issue with TestPipeApplication that was causing the
+ nightly builds to fail. (Akira Ajisaka via vinodkv)
+
+ MAPREDUCE-5517. Fixed MapReduce ApplicationMaster to not validate reduce side
+ resource configuration for deciding uber-mode on map-only jobs. (Siqi Li via
+ vinodkv)
+
Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES
@@ -275,6 +312,10 @@ Release 2.4.1 - 2014-06-23
IMPROVEMENTS
+ MAPREDUCE-5830. Added back the private API HostUtil.getTaskLogUrl(..) for
+ binary compatibility with older clients like Hive 0.13. (Akira Ajisaka via
+ vinodkv)
+
OPTIMIZATIONS
BUG FIXES
Propchange: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1603348-1609877
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Sat Jul 12 02:24:40 2014
@@ -475,8 +475,8 @@
<Match>
<Class name="org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator" />
<Or>
- <Field name="mapResourceReqt" />
- <Field name="reduceResourceReqt" />
+ <Field name="mapResourceRequest" />
+ <Field name="reduceResourceRequest" />
<Field name="maxReduceRampupLimit" />
<Field name="reduceSlowStart" />
</Or>
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Sat Jul 12 02:24:40 2014
@@ -186,7 +186,6 @@ public class MRAppMaster extends Composi
private final int nmPort;
private final int nmHttpPort;
protected final MRAppMetrics metrics;
- private final int maxAppAttempts;
private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
private List<AMInfo> amInfos;
private AppContext context;
@@ -227,14 +226,14 @@ public class MRAppMaster extends Composi
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
- long appSubmitTime, int maxAppAttempts) {
+ long appSubmitTime) {
this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
- new SystemClock(), appSubmitTime, maxAppAttempts);
+ new SystemClock(), appSubmitTime);
}
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
- Clock clock, long appSubmitTime, int maxAppAttempts) {
+ Clock clock, long appSubmitTime) {
super(MRAppMaster.class.getName());
this.clock = clock;
this.startTime = clock.getTime();
@@ -245,7 +244,6 @@ public class MRAppMaster extends Composi
this.nmPort = nmPort;
this.nmHttpPort = nmHttpPort;
this.metrics = MRAppMetrics.create();
- this.maxAppAttempts = maxAppAttempts;
logSyncer = TaskLog.createLogSyncer();
LOG.info("Created MRAppMaster for application " + applicationAttemptId);
}
@@ -258,12 +256,6 @@ public class MRAppMaster extends Composi
context = new RunningAppContext(conf);
- ((RunningAppContext)context).computeIsLastAMRetry();
- LOG.info("The specific max attempts: " + maxAppAttempts +
- " for application: " + appAttemptID.getApplicationId().getId() +
- ". Attempt num: " + appAttemptID.getAttemptId() +
- " is last retry: " + isLastAMRetry);
-
// Job name is the same as the app name util we support DAG of jobs
// for an app later
appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
@@ -1007,8 +999,8 @@ public class MRAppMaster extends Composi
successfullyUnregistered.set(true);
}
- public void computeIsLastAMRetry() {
- isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
+ public void resetIsLastAMRetry() {
+ isLastAMRetry = false;
}
}
@@ -1388,8 +1380,6 @@ public class MRAppMaster extends Composi
System.getenv(Environment.NM_HTTP_PORT.name());
String appSubmitTimeStr =
System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
- String maxAppAttempts =
- System.getenv(ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
validateInputParam(containerIdStr,
Environment.CONTAINER_ID.name());
@@ -1399,8 +1389,6 @@ public class MRAppMaster extends Composi
Environment.NM_HTTP_PORT.name());
validateInputParam(appSubmitTimeStr,
ApplicationConstants.APP_SUBMIT_TIME_ENV);
- validateInputParam(maxAppAttempts,
- ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationAttemptId applicationAttemptId =
@@ -1411,8 +1399,7 @@ public class MRAppMaster extends Composi
MRAppMaster appMaster =
new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
Integer.parseInt(nodePortString),
- Integer.parseInt(nodeHttpPortString), appSubmitTime,
- Integer.parseInt(maxAppAttempts));
+ Integer.parseInt(nodeHttpPortString), appSubmitTime);
ShutdownHookManager.get().addShutdownHook(
new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
JobConf conf = new JobConf(new YarnConfiguration());
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Sat Jul 12 02:24:40 2014
@@ -1218,22 +1218,25 @@ public class JobImpl implements org.apac
boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces);
boolean smallInput = (dataInputLength <= sysMaxBytes);
// ignoring overhead due to UberAM and statics as negligible here:
+ long requiredMapMB = conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0);
+ long requiredReduceMB = conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0);
+ long requiredMB = Math.max(requiredMapMB, requiredReduceMB);
+ int requiredMapCores = conf.getInt(
+ MRJobConfig.MAP_CPU_VCORES,
+ MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+ int requiredReduceCores = conf.getInt(
+ MRJobConfig.REDUCE_CPU_VCORES,
+ MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+ int requiredCores = Math.max(requiredMapCores, requiredReduceCores);
+ if (numReduceTasks == 0) {
+ requiredMB = requiredMapMB;
+ requiredCores = requiredMapCores;
+ }
boolean smallMemory =
- ( (Math.max(conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0),
- conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0))
- <= sysMemSizeForUberSlot)
- || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT));
- boolean smallCpu =
- (
- Math.max(
- conf.getInt(
- MRJobConfig.MAP_CPU_VCORES,
- MRJobConfig.DEFAULT_MAP_CPU_VCORES),
- conf.getInt(
- MRJobConfig.REDUCE_CPU_VCORES,
- MRJobConfig.DEFAULT_REDUCE_CPU_VCORES))
- <= sysCPUSizeForUberSlot
- );
+ (requiredMB <= sysMemSizeForUberSlot)
+ || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT);
+
+ boolean smallCpu = requiredCores <= sysCPUSizeForUberSlot;
boolean notChainJob = !isChainJob(conf);
// User has overall veto power over uberization, or user can modify
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Sat Jul 12 02:24:40 2014
@@ -185,7 +185,7 @@ public abstract class RMCommunicator ext
// if unregistration failed, isLastAMRetry needs to be recalculated
// to see whether AM really has the chance to retry
RunningAppContext raContext = (RunningAppContext) context;
- raContext.computeIsLastAMRetry();
+ raContext.resetIsLastAMRetry();
}
}
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Sat Jul 12 02:24:40 2014
@@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.RackResolver;
import com.google.common.annotations.VisibleForTesting;
@@ -143,15 +144,21 @@ public class RMContainerAllocator extend
private int lastCompletedTasks = 0;
private boolean recalculateReduceSchedule = false;
- private int mapResourceReqt;//memory
- private int reduceResourceReqt;//memory
+ private int mapResourceRequest;//memory
+ private int reduceResourceRequest;//memory
private boolean reduceStarted = false;
private float maxReduceRampupLimit = 0;
private float maxReducePreemptionLimit = 0;
+ /**
+ * after this threshold, if the container request is not allocated, it is
+ * considered delayed.
+ */
+ private long allocationDelayThresholdMs = 0;
private float reduceSlowStart = 0;
private long retryInterval;
private long retrystartTime;
+ private Clock clock;
private final AMPreemptionPolicy preemptionPolicy;
@@ -166,6 +173,7 @@ public class RMContainerAllocator extend
super(clientService, context);
this.preemptionPolicy = preemptionPolicy;
this.stopped = new AtomicBoolean(false);
+ this.clock = context.getClock();
}
@Override
@@ -180,6 +188,9 @@ public class RMContainerAllocator extend
maxReducePreemptionLimit = conf.getFloat(
MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
+ allocationDelayThresholdMs = conf.getInt(
+ MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
+ MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
RackResolver.init(conf);
retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
@@ -246,7 +257,7 @@ public class RMContainerAllocator extend
getJob().getTotalMaps(), completedMaps,
scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
assignedRequests.maps.size(), assignedRequests.reduces.size(),
- mapResourceReqt, reduceResourceReqt,
+ mapResourceRequest, reduceResourceRequest,
pendingReduces.size(),
maxReduceRampupLimit, reduceSlowStart);
recalculateReduceSchedule = false;
@@ -268,6 +279,18 @@ public class RMContainerAllocator extend
scheduleStats.log("Final Stats: ");
}
+ @Private
+ @VisibleForTesting
+ AssignedRequests getAssignedRequests() {
+ return assignedRequests;
+ }
+
+ @Private
+ @VisibleForTesting
+ ScheduledRequests getScheduledRequests() {
+ return scheduledRequests;
+ }
+
public boolean getIsReduceStarted() {
return reduceStarted;
}
@@ -303,16 +326,16 @@ public class RMContainerAllocator extend
int supportedMaxContainerCapability =
getMaxContainerCapability().getMemory();
if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
- if (mapResourceReqt == 0) {
- mapResourceReqt = reqEvent.getCapability().getMemory();
+ if (mapResourceRequest == 0) {
+ mapResourceRequest = reqEvent.getCapability().getMemory();
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
- mapResourceReqt)));
- LOG.info("mapResourceReqt:"+mapResourceReqt);
- if (mapResourceReqt > supportedMaxContainerCapability) {
+ mapResourceRequest)));
+ LOG.info("mapResourceRequest:"+ mapResourceRequest);
+ if (mapResourceRequest > supportedMaxContainerCapability) {
String diagMsg = "MAP capability required is more than the supported " +
- "max container capability in the cluster. Killing the Job. mapResourceReqt: " +
- mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability;
+ "max container capability in the cluster. Killing the Job. mapResourceRequest: " +
+ mapResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability;
LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
jobId, diagMsg));
@@ -320,20 +343,20 @@ public class RMContainerAllocator extend
}
}
//set the rounded off memory
- reqEvent.getCapability().setMemory(mapResourceReqt);
+ reqEvent.getCapability().setMemory(mapResourceRequest);
scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
} else {
- if (reduceResourceReqt == 0) {
- reduceResourceReqt = reqEvent.getCapability().getMemory();
+ if (reduceResourceRequest == 0) {
+ reduceResourceRequest = reqEvent.getCapability().getMemory();
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.REDUCE,
- reduceResourceReqt)));
- LOG.info("reduceResourceReqt:"+reduceResourceReqt);
- if (reduceResourceReqt > supportedMaxContainerCapability) {
+ reduceResourceRequest)));
+ LOG.info("reduceResourceRequest:"+ reduceResourceRequest);
+ if (reduceResourceRequest > supportedMaxContainerCapability) {
String diagMsg = "REDUCE capability required is more than the " +
"supported max container capability in the cluster. Killing the " +
- "Job. reduceResourceReqt: " + reduceResourceReqt +
+ "Job. reduceResourceRequest: " + reduceResourceRequest +
" maxContainerCapability:" + supportedMaxContainerCapability;
LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
@@ -342,7 +365,7 @@ public class RMContainerAllocator extend
}
}
//set the rounded off memory
- reqEvent.getCapability().setMemory(reduceResourceReqt);
+ reqEvent.getCapability().setMemory(reduceResourceRequest);
if (reqEvent.getEarlierAttemptFailed()) {
//add to the front of queue for fail fast
pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
@@ -394,8 +417,22 @@ public class RMContainerAllocator extend
return host;
}
- private void preemptReducesIfNeeded() {
- if (reduceResourceReqt == 0) {
+ @Private
+ @VisibleForTesting
+ synchronized void setReduceResourceRequest(int mem) {
+ this.reduceResourceRequest = mem;
+ }
+
+ @Private
+ @VisibleForTesting
+ synchronized void setMapResourceRequest(int mem) {
+ this.mapResourceRequest = mem;
+ }
+
+ @Private
+ @VisibleForTesting
+ void preemptReducesIfNeeded() {
+ if (reduceResourceRequest == 0) {
return; //no reduces
}
//check if reduces have taken over the whole cluster and there are
@@ -403,9 +440,9 @@ public class RMContainerAllocator extend
if (scheduledRequests.maps.size() > 0) {
int memLimit = getMemLimit();
int availableMemForMap = memLimit - ((assignedRequests.reduces.size() -
- assignedRequests.preemptionWaitingReduces.size()) * reduceResourceReqt);
+ assignedRequests.preemptionWaitingReduces.size()) * reduceResourceRequest);
//availableMemForMap must be sufficient to run atleast 1 map
- if (availableMemForMap < mapResourceReqt) {
+ if (availableMemForMap < mapResourceRequest) {
//to make sure new containers are given to maps and not reduces
//ramp down all scheduled reduces if any
//(since reduces are scheduled at higher priority than maps)
@@ -414,22 +451,40 @@ public class RMContainerAllocator extend
pendingReduces.add(req);
}
scheduledRequests.reduces.clear();
-
- //preempt for making space for at least one map
- int premeptionLimit = Math.max(mapResourceReqt,
- (int) (maxReducePreemptionLimit * memLimit));
-
- int preemptMem = Math.min(scheduledRequests.maps.size() * mapResourceReqt,
- premeptionLimit);
-
- int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt);
- toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
-
- LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
- assignedRequests.preemptReduce(toPreempt);
+
+ //do further checking to find the number of map requests that were
+ //hanging around for a while
+ int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps);
+ if (hangingMapRequests > 0) {
+ //preempt for making space for at least one map
+ int premeptionLimit = Math.max(mapResourceRequest,
+ (int) (maxReducePreemptionLimit * memLimit));
+
+ int preemptMem = Math.min(hangingMapRequests * mapResourceRequest,
+ premeptionLimit);
+
+ int toPreempt = (int) Math.ceil((float) preemptMem / reduceResourceRequest);
+ toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
+
+ LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
+ assignedRequests.preemptReduce(toPreempt);
+ }
}
}
}
+
+ private int getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap) {
+ if (allocationDelayThresholdMs <= 0)
+ return requestMap.size();
+ int hangingRequests = 0;
+ long currTime = clock.getTime();
+ for (ContainerRequest request: requestMap.values()) {
+ long delay = currTime - request.requestTimeMs;
+ if (delay > allocationDelayThresholdMs)
+ hangingRequests++;
+ }
+ return hangingRequests;
+ }
@Private
public void scheduleReduces(
@@ -664,7 +719,8 @@ public class RMContainerAllocator extend
@VisibleForTesting
public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont,
TaskAttemptId attemptID) {
- if (cont.getExitStatus() == ContainerExitStatus.ABORTED) {
+ if (cont.getExitStatus() == ContainerExitStatus.ABORTED
+ || cont.getExitStatus() == ContainerExitStatus.PREEMPTED) {
// killed by framework
return new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_KILL);
@@ -715,11 +771,13 @@ public class RMContainerAllocator extend
@Private
public int getMemLimit() {
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
- return headRoom + assignedRequests.maps.size() * mapResourceReqt +
- assignedRequests.reduces.size() * reduceResourceReqt;
+ return headRoom + assignedRequests.maps.size() * mapResourceRequest +
+ assignedRequests.reduces.size() * reduceResourceRequest;
}
-
- private class ScheduledRequests {
+
+ @Private
+ @VisibleForTesting
+ class ScheduledRequests {
private final LinkedList<TaskAttemptId> earlierFailedMaps =
new LinkedList<TaskAttemptId>();
@@ -729,7 +787,8 @@ public class RMContainerAllocator extend
new HashMap<String, LinkedList<TaskAttemptId>>();
private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping =
new HashMap<String, LinkedList<TaskAttemptId>>();
- private final Map<TaskAttemptId, ContainerRequest> maps =
+ @VisibleForTesting
+ final Map<TaskAttemptId, ContainerRequest> maps =
new LinkedHashMap<TaskAttemptId, ContainerRequest>();
private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces =
@@ -825,22 +884,22 @@ public class RMContainerAllocator extend
int allocatedMemory = allocated.getResource().getMemory();
if (PRIORITY_FAST_FAIL_MAP.equals(priority)
|| PRIORITY_MAP.equals(priority)) {
- if (allocatedMemory < mapResourceReqt
+ if (allocatedMemory < mapResourceRequest
|| maps.isEmpty()) {
LOG.info("Cannot assign container " + allocated
+ " for a map as either "
- + " container memory less than required " + mapResourceReqt
+ + " container memory less than required " + mapResourceRequest
+ " or no pending map tasks - maps.isEmpty="
+ maps.isEmpty());
isAssignable = false;
}
}
else if (PRIORITY_REDUCE.equals(priority)) {
- if (allocatedMemory < reduceResourceReqt
+ if (allocatedMemory < reduceResourceRequest
|| reduces.isEmpty()) {
LOG.info("Cannot assign container " + allocated
+ " for a reduce as either "
- + " container memory less than required " + reduceResourceReqt
+ + " container memory less than required " + reduceResourceRequest
+ " or no pending reduce tasks - reduces.isEmpty="
+ reduces.isEmpty());
isAssignable = false;
@@ -1119,14 +1178,18 @@ public class RMContainerAllocator extend
}
}
- private class AssignedRequests {
+ @Private
+ @VisibleForTesting
+ class AssignedRequests {
private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
new HashMap<ContainerId, TaskAttemptId>();
private final LinkedHashMap<TaskAttemptId, Container> maps =
new LinkedHashMap<TaskAttemptId, Container>();
- private final LinkedHashMap<TaskAttemptId, Container> reduces =
+ @VisibleForTesting
+ final LinkedHashMap<TaskAttemptId, Container> reduces =
new LinkedHashMap<TaskAttemptId, Container>();
- private final Set<TaskAttemptId> preemptionWaitingReduces =
+ @VisibleForTesting
+ final Set<TaskAttemptId> preemptionWaitingReduces =
new HashSet<TaskAttemptId>();
void add(Container container, TaskAttemptId tId) {
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Sat Jul 12 02:24:40 2014
@@ -29,8 +29,10 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -96,6 +98,8 @@ public abstract class RMContainerRequest
super(clientService, context);
}
+ @Private
+ @VisibleForTesting
static class ContainerRequest {
final TaskAttemptId attemptID;
final Resource capability;
@@ -103,20 +107,39 @@ public abstract class RMContainerRequest
final String[] racks;
//final boolean earlierAttemptFailed;
final Priority priority;
-
+ /**
+ * the time when this request object was formed; can be used to avoid
+ * aggressive preemption for recently placed requests
+ */
+ final long requestTimeMs;
+
public ContainerRequest(ContainerRequestEvent event, Priority priority) {
this(event.getAttemptID(), event.getCapability(), event.getHosts(),
event.getRacks(), priority);
}
-
+
+ public ContainerRequest(ContainerRequestEvent event, Priority priority,
+ long requestTimeMs) {
+ this(event.getAttemptID(), event.getCapability(), event.getHosts(),
+ event.getRacks(), priority, requestTimeMs);
+ }
+
+ public ContainerRequest(TaskAttemptId attemptID,
+ Resource capability, String[] hosts, String[] racks,
+ Priority priority) {
+ this(attemptID, capability, hosts, racks, priority,
+ System.currentTimeMillis());
+ }
+
public ContainerRequest(TaskAttemptId attemptID,
- Resource capability, String[] hosts, String[] racks,
- Priority priority) {
+ Resource capability, String[] hosts, String[] racks,
+ Priority priority, long requestTimeMs) {
this.attemptID = attemptID;
this.capability = capability;
this.hosts = hosts;
this.racks = racks;
this.priority = priority;
+ this.requestTimeMs = requestTimeMs;
}
public String toString() {
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Sat Jul 12 02:24:40 2014
@@ -24,8 +24,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.EnumSet;
-import org.junit.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -92,6 +90,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -101,6 +100,7 @@ import org.apache.hadoop.yarn.state.Stat
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.junit.Assert;
/**
@@ -228,8 +228,8 @@ public class MRApp extends MRAppMaster {
int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, Clock clock, boolean unregistered,
String assignedQueue) {
- super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System
- .currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+ super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock,
+ System.currentTimeMillis());
this.testWorkDir = new File("target", testName);
testAbsPath = new Path(testWorkDir.getAbsolutePath());
LOG.info("PathUsed: " + testAbsPath);
@@ -573,7 +573,8 @@ public class MRApp extends MRAppMaster {
Resource resource = Resource.newInstance(1234, 2);
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(cId, nodeId.toString(), "user",
- resource, System.currentTimeMillis() + 10000, 42, 42);
+ resource, System.currentTimeMillis() + 10000, 42, 42,
+ Priority.newInstance(0), 0);
Token containerToken = newContainerToken(nodeId, "password".getBytes(),
containerTokenIdentifier);
Container container = Container.newInstance(cId, nodeId,
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java Sat Jul 12 02:24:40 2014
@@ -253,6 +253,12 @@ public class TestJobEndNotifier extends
HttpServer2 server = startHttpServer();
MRApp app = spy(new MRAppWithCustomContainerAllocator(2, 2, false,
this.getClass().getName(), true, 2, false));
+ // Currently, we will have isLastRetry always equals to false at beginning
+ // of MRAppMaster, except staging area exists or commit already started at
+ // the beginning.
+ // Now manually set isLastRetry to true and this should reset to false when
+ // unregister failed.
+ app.isLastAMRetry = true;
doNothing().when(app).sysexit();
JobConf conf = new JobConf();
conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL,
@@ -265,12 +271,11 @@ public class TestJobEndNotifier extends
// Now shutdown. User should see FAILED state.
// Unregistration fails: isLastAMRetry is recalculated, this is
app.shutDownJob();
- Assert.assertTrue(app.isLastAMRetry());
- Assert.assertEquals(1, JobEndServlet.calledTimes);
- Assert.assertEquals("jobid=" + job.getID() + "&status=FAILED",
- JobEndServlet.requestUri.getQuery());
- Assert.assertEquals(JobState.FAILED.toString(),
- JobEndServlet.foundJobState);
+ Assert.assertFalse(app.isLastAMRetry());
+ // Since it's not last retry, JobEndServlet didn't called
+ Assert.assertEquals(0, JobEndServlet.calledTimes);
+ Assert.assertNull(JobEndServlet.requestUri);
+ Assert.assertNull(JobEndServlet.foundJobState);
server.stop();
}
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java Sat Jul 12 02:24:40 2014
@@ -118,7 +118,7 @@ public class TestMRAppMaster {
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMasterTest appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
- System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+ System.currentTimeMillis());
JobConf conf = new JobConf();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -147,8 +147,7 @@ public class TestMRAppMaster {
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMaster appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
- System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
- false, false);
+ System.currentTimeMillis(), false, false);
boolean caught = false;
try {
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -186,8 +185,7 @@ public class TestMRAppMaster {
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMaster appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
- System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
- false, false);
+ System.currentTimeMillis(), false, false);
boolean caught = false;
try {
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -225,8 +223,7 @@ public class TestMRAppMaster {
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMaster appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
- System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
- false, false);
+ System.currentTimeMillis(), false, false);
boolean caught = false;
try {
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -264,8 +261,7 @@ public class TestMRAppMaster {
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMaster appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
- System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
- false, false);
+ System.currentTimeMillis(), false, false);
boolean caught = false;
try {
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -285,8 +281,9 @@ public class TestMRAppMaster {
@Test (timeout = 30000)
public void testMRAppMasterMaxAppAttempts() throws IOException,
InterruptedException {
- int[] maxAppAttemtps = new int[] { 1, 2, 3 };
- Boolean[] expectedBools = new Boolean[]{ true, true, false };
+ // No matter what's the maxAppAttempt or attempt id, the isLastRetry always
+ // equals to false
+ Boolean[] expectedBools = new Boolean[]{ false, false, false };
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
String containerIdStr = "container_1317529182569_0004_000002_1";
@@ -301,10 +298,10 @@ public class TestMRAppMaster {
File stagingDir =
new File(MRApps.getStagingAreaDir(conf, userName).toString());
stagingDir.mkdirs();
- for (int i = 0; i < maxAppAttemtps.length; ++i) {
+ for (int i = 0; i < expectedBools.length; ++i) {
MRAppMasterTest appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
- System.currentTimeMillis(), maxAppAttemtps[i], false, true);
+ System.currentTimeMillis(), false, true);
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
assertEquals("isLastAMRetry is correctly computed.", expectedBools[i],
appMaster.isLastAMRetry());
@@ -399,7 +396,7 @@ public class TestMRAppMaster {
MRAppMasterTest appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
- System.currentTimeMillis(), 1, false, true);
+ System.currentTimeMillis(), false, true);
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
// Now validate the task credentials
@@ -466,16 +463,15 @@ class MRAppMasterTest extends MRAppMaste
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, int httpPort,
- long submitTime, int maxAppAttempts) {
+ long submitTime) {
this(applicationAttemptId, containerId, host, port, httpPort,
- submitTime, maxAppAttempts, true, true);
+ submitTime, true, true);
}
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, int httpPort,
- long submitTime, int maxAppAttempts, boolean overrideInit,
+ long submitTime, boolean overrideInit,
boolean overrideStart) {
- super(applicationAttemptId, containerId, host, port, httpPort, submitTime,
- maxAppAttempts);
+ super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
this.overrideInit = overrideInit;
this.overrideStart = overrideStart;
mockContainerAllocator = mock(ContainerAllocator.class);
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Sat Jul 12 02:24:40 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
@@ -28,9 +29,6 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
-import org.junit.Assert;
-import junit.framework.TestCase;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -62,13 +60,14 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.junit.Assert;
import org.junit.Test;
/**
* Make sure that the job staging directory clean up happens.
*/
- public class TestStagingCleanup extends TestCase {
+ public class TestStagingCleanup {
private Configuration conf = new Configuration();
private FileSystem fs;
@@ -81,7 +80,7 @@ import org.junit.Test;
public void testDeletionofStagingOnUnregistrationFailure()
throws IOException {
testDeletionofStagingOnUnregistrationFailure(2, false);
- testDeletionofStagingOnUnregistrationFailure(1, true);
+ testDeletionofStagingOnUnregistrationFailure(1, false);
}
@SuppressWarnings("resource")
@@ -104,7 +103,7 @@ import org.junit.Test;
appMaster.init(conf);
appMaster.start();
appMaster.shutDownJob();
- ((RunningAppContext) appMaster.getContext()).computeIsLastAMRetry();
+ ((RunningAppContext) appMaster.getContext()).resetIsLastAMRetry();
if (shouldHaveDeleted) {
Assert.assertEquals(new Boolean(true), appMaster.isLastAMRetry());
verify(fs).delete(stagingJobPath, true);
@@ -164,7 +163,11 @@ import org.junit.Test;
verify(fs, times(0)).delete(stagingJobPath, true);
}
- @Test (timeout = 30000)
+ // FIXME:
+ // Disabled this test because currently, when job state=REBOOT at shutdown
+ // when lastRetry = true in RM view, cleanup will not do.
+ // This will be supported after YARN-2261 completed
+// @Test (timeout = 30000)
public void testDeletionofStagingOnReboot() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
fs = mock(FileSystem.class);
@@ -202,7 +205,7 @@ import org.junit.Test;
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
- MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 4);
+ MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
appMaster.init(conf);
//simulate the process being killed
MRAppMaster.MRAppMasterShutdownHook hook =
@@ -210,8 +213,12 @@ import org.junit.Test;
hook.run();
verify(fs, times(0)).delete(stagingJobPath, true);
}
-
- @Test (timeout = 30000)
+
+ // FIXME:
+ // Disabled this test because currently, when shutdown hook triggered at
+ // lastRetry in RM view, cleanup will not do. This should be supported after
+ // YARN-2261 completed
+// @Test (timeout = 30000)
public void testDeletionofStagingOnKillLastTry() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
fs = mock(FileSystem.class);
@@ -226,7 +233,7 @@ import org.junit.Test;
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
- MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 1); //no retry
+ MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc); //no retry
appMaster.init(conf);
assertTrue("appMaster.isLastAMRetry() is false", appMaster.isLastAMRetry());
//simulate the process being killed
@@ -245,10 +252,10 @@ import org.junit.Test;
boolean crushUnregistration = false;
public TestMRApp(ApplicationAttemptId applicationAttemptId,
- ContainerAllocator allocator, int maxAppAttempts) {
+ ContainerAllocator allocator) {
super(applicationAttemptId, ContainerId.newInstance(
applicationAttemptId, 1), "testhost", 2222, 3333,
- System.currentTimeMillis(), maxAppAttempts);
+ System.currentTimeMillis());
this.allocator = allocator;
this.successfullyUnregistered.set(true);
}
@@ -256,7 +263,7 @@ import org.junit.Test;
public TestMRApp(ApplicationAttemptId applicationAttemptId,
ContainerAllocator allocator, JobStateInternal jobStateInternal,
int maxAppAttempts) {
- this(applicationAttemptId, allocator, maxAppAttempts);
+ this(applicationAttemptId, allocator);
this.jobStateInternal = jobStateInternal;
}
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Sat Jul 12 02:24:40 2014
@@ -657,6 +657,15 @@ public class TestJobImpl {
conf.setInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 1);
isUber = testUberDecision(conf);
Assert.assertFalse(isUber);
+
+ // enable uber mode of 0 reducer no matter how much memory assigned to reducer
+ conf = new Configuration();
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+ conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+ conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048);
+ conf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 10);
+ isUber = testUberDecision(conf);
+ Assert.assertTrue(isUber);
}
private boolean testUberDecision(Configuration conf) {
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Sat Jul 12 02:24:40 2014
@@ -65,6 +65,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
@@ -795,6 +796,178 @@ public class TestTaskAttempt{
finishTime, Long.valueOf(taImpl.getFinishTime()));
}
+ @Test
+ public void testContainerKillAfterAssigned() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 2);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+ 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ Resource resource = mock(Resource.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ when(resource.getMemory()).thenReturn(1024);
+
+ TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+ jobFile, 1, splits, jobConf, taListener, new Token(),
+ new Credentials(), new SystemClock(), appCtx);
+
+ NodeId nid = NodeId.newInstance("127.0.0.2", 0);
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
+ mock(Map.class)));
+ assertEquals("Task attempt is not in assinged state",
+ taImpl.getInternalState(), TaskAttemptStateInternal.ASSIGNED);
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_KILL));
+ assertEquals("Task should be in KILLED state",
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+ taImpl.getInternalState());
+ }
+
+ @Test
+ public void testContainerKillWhileRunning() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 2);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+ 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ Resource resource = mock(Resource.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ when(resource.getMemory()).thenReturn(1024);
+
+ TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+ jobFile, 1, splits, jobConf, taListener, new Token(),
+ new Credentials(), new SystemClock(), appCtx);
+
+ NodeId nid = NodeId.newInstance("127.0.0.2", 0);
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
+ mock(Map.class)));
+ taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+ assertEquals("Task attempt is not in running state", taImpl.getState(),
+ TaskAttemptState.RUNNING);
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_KILL));
+ assertFalse("InternalError occurred trying to handle TA_KILL",
+ eventHandler.internalError);
+ assertEquals("Task should be in KILLED state",
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+ taImpl.getInternalState());
+ }
+
+ @Test
+ public void testContainerKillWhileCommitPending() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 2);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+ 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ Resource resource = mock(Resource.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ when(resource.getMemory()).thenReturn(1024);
+
+ TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+ jobFile, 1, splits, jobConf, taListener, new Token(),
+ new Credentials(), new SystemClock(), appCtx);
+
+ NodeId nid = NodeId.newInstance("127.0.0.2", 0);
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
+ mock(Map.class)));
+ taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+ assertEquals("Task attempt is not in running state", taImpl.getState(),
+ TaskAttemptState.RUNNING);
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_COMMIT_PENDING));
+ assertEquals("Task should be in COMMIT_PENDING state",
+ TaskAttemptStateInternal.COMMIT_PENDING, taImpl.getInternalState());
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_KILL));
+ assertFalse("InternalError occurred trying to handle TA_KILL",
+ eventHandler.internalError);
+ assertEquals("Task should be in KILLED state",
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+ taImpl.getInternalState());
+ }
+
public static class MockEventHandler implements EventHandler {
public boolean internalError;
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Sat Jul 12 02:24:40 2014
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
@@ -402,7 +403,7 @@ public class TestContainerLauncherImpl {
1234), "password".getBytes(), new ContainerTokenIdentifier(
contId, containerManagerAddr, "user",
Resource.newInstance(1024, 1),
- currentTime + 10000L, 123, currentTime));
+ currentTime + 10000L, 123, currentTime, Priority.newInstance(0), 0));
}
private static class ContainerManagerForTest implements ContainerManagementProtocol {
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java?rev=1609878&r1=1605891&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java Sat Jul 12 02:24:40 2014
@@ -1959,6 +1959,22 @@ public class TestRMContainerAllocator {
TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent(
abortedStatus, attemptId);
Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
+
+ ContainerId containerId2 = ContainerId.newInstance(applicationAttemptId, 2);
+ ContainerStatus status2 = ContainerStatus.newInstance(containerId2,
+ ContainerState.RUNNING, "", 0);
+
+ ContainerStatus preemptedStatus = ContainerStatus.newInstance(containerId2,
+ ContainerState.RUNNING, "", ContainerExitStatus.PREEMPTED);
+
+ TaskAttemptEvent event2 = allocator.createContainerFinishedEvent(status2,
+ attemptId);
+ Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+ event2.getType());
+
+ TaskAttemptEvent abortedEvent2 = allocator.createContainerFinishedEvent(
+ preemptedStatus, attemptId);
+ Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent2.getType());
}
@Test
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java Sat Jul 12 02:24:40 2014
@@ -168,10 +168,14 @@ public class FileNameIndexUtils {
decodeJobHistoryFileName(jobDetails[QUEUE_NAME_INDEX]));
try{
- indexInfo.setJobStartTime(
- Long.parseLong(decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX])));
+ if (jobDetails.length <= JOB_START_TIME_INDEX) {
+ indexInfo.setJobStartTime(indexInfo.getSubmitTime());
+ } else {
+ indexInfo.setJobStartTime(
+ Long.parseLong(decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX])));
+ }
} catch (NumberFormatException e){
- LOG.warn("Unable to parse launch time from job history file "
+ LOG.warn("Unable to parse start time from job history file "
+ jhFileName + " : " + e);
}
} catch (IndexOutOfBoundsException e) {
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java Sat Jul 12 02:24:40 2014
@@ -39,6 +39,17 @@ public class TestFileNameIndexUtils {
+ FileNameIndexUtils.DELIMITER + "%s"
+ JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
+ private static final String OLD_FORMAT_BEFORE_ADD_START_TIME = "%s"
+ + FileNameIndexUtils.DELIMITER + "%s"
+ + FileNameIndexUtils.DELIMITER + "%s"
+ + FileNameIndexUtils.DELIMITER + "%s"
+ + FileNameIndexUtils.DELIMITER + "%s"
+ + FileNameIndexUtils.DELIMITER + "%s"
+ + FileNameIndexUtils.DELIMITER + "%s"
+ + FileNameIndexUtils.DELIMITER + "%s"
+ + FileNameIndexUtils.DELIMITER + "%s"
+ + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
+
private static final String JOB_HISTORY_FILE_FORMATTER = "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
@@ -236,6 +247,22 @@ public class TestFileNameIndexUtils {
}
@Test
+ public void testJobStartTimeBackwardsCompatible() throws IOException{
+ String jobHistoryFile = String.format(OLD_FORMAT_BEFORE_ADD_START_TIME,
+ JOB_ID,
+ SUBMIT_TIME,
+ USER_NAME,
+ JOB_NAME_WITH_DELIMITER_ESCAPE,
+ FINISH_TIME,
+ NUM_MAPS,
+ NUM_REDUCES,
+ JOB_STATUS,
+ QUEUE_NAME );
+ JobIndexInfo info = FileNameIndexUtils.getIndexInfo(jobHistoryFile);
+ Assert.assertEquals(info.getJobStartTime(), info.getSubmitTime());
+ }
+
+ @Test
public void testJobHistoryFileNameBackwardsCompatible() throws IOException {
JobID oldJobId = JobID.forName(JOB_ID);
JobId jobId = TypeConverter.toYarn(oldJobId);
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java Sat Jul 12 02:24:40 2014
@@ -295,6 +295,15 @@ public abstract class FileInputFormat<K,
String[] hosts) {
return new FileSplit(file, start, length, hosts);
}
+
+ /**
+ * A factory that makes the split for this class. It can be overridden
+ * by sub-classes to make sub-types
+ */
+ protected FileSplit makeSplit(Path file, long start, long length,
+ String[] hosts, String[] inMemoryHosts) {
+ return new FileSplit(file, start, length, hosts, inMemoryHosts);
+ }
/** Splits files returned by {@link #listStatus(JobConf)} when
* they're too big.*/
@@ -337,22 +346,22 @@ public abstract class FileInputFormat<K,
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
- String[] splitHosts = getSplitHosts(blkLocations,
+ String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
- splitHosts));
+ splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
- String[] splitHosts = getSplitHosts(blkLocations, length
+ String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
- splitHosts));
+ splitHosts[0], splitHosts[1]));
}
} else {
- String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
- splits.add(makeSplit(path, 0, length, splitHosts));
+ String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
+ splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}
} else {
//Create empty hosts array for zero length files
@@ -538,10 +547,30 @@ public abstract class FileInputFormat<K,
* @param blkLocations The list of block locations
* @param offset
* @param splitSize
- * @return array of hosts that contribute most to this split
+ * @return an array of hosts that contribute most to this split
* @throws IOException
*/
protected String[] getSplitHosts(BlockLocation[] blkLocations,
+ long offset, long splitSize, NetworkTopology clusterMap) throws IOException {
+ return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize,
+ clusterMap)[0];
+ }
+
+ /**
+ * This function identifies and returns the hosts that contribute
+ * most for a given split. For calculating the contribution, rack
+ * locality is treated on par with host locality, so hosts from racks
+ * that contribute the most are preferred over hosts on racks that
+ * contribute less
+ * @param blkLocations The list of block locations
+ * @param offset
+ * @param splitSize
+ * @return two arrays - one of hosts that contribute most to this split, and
+ * one of hosts that contribute most to this split that have the data
+ * cached on them
+ * @throws IOException
+ */
+ private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations,
long offset, long splitSize, NetworkTopology clusterMap)
throws IOException {
@@ -552,7 +581,8 @@ public abstract class FileInputFormat<K,
//If this is the only block, just return
if (bytesInThisBlock >= splitSize) {
- return blkLocations[startIndex].getHosts();
+ return new String[][] { blkLocations[startIndex].getHosts(),
+ blkLocations[startIndex].getCachedHosts() };
}
long bytesInFirstBlock = bytesInThisBlock;
@@ -639,7 +669,9 @@ public abstract class FileInputFormat<K,
} // for all indices
- return identifyHosts(allTopos.length, racksMap);
+ // We don't yet support cached hosts when bytesInThisBlock > splitSize
+ return new String[][] { identifyHosts(allTopos.length, racksMap),
+ new String[0]};
}
private String[] identifyHosts(int replicationFactor,
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java Sat Jul 12 02:24:40 2014
@@ -24,6 +24,7 @@ import java.io.DataOutput;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.fs.Path;
/** A section of an input file. Returned by {@link
@@ -33,7 +34,7 @@ import org.apache.hadoop.fs.Path;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit
- implements InputSplit {
+ implements InputSplitWithLocationInfo {
org.apache.hadoop.mapreduce.lib.input.FileSplit fs;
protected FileSplit() {
fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit();
@@ -62,6 +63,20 @@ public class FileSplit extends org.apach
length, hosts);
}
+ /** Constructs a split with host information
+ *
+ * @param file the file name
+ * @param start the position of the first byte in the file to process
+ * @param length the number of bytes in the file to process
+ * @param hosts the list of hosts containing the block, possibly null
+ * @param inMemoryHosts the list of hosts containing the block in memory
+ */
+ public FileSplit(Path file, long start, long length, String[] hosts,
+ String[] inMemoryHosts) {
+ fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(file, start,
+ length, hosts, inMemoryHosts);
+ }
+
public FileSplit(org.apache.hadoop.mapreduce.lib.input.FileSplit fs) {
this.fs = fs;
}
@@ -92,4 +107,9 @@ public class FileSplit extends org.apach
return fs.getLocations();
}
+ @Override
+ @Evolving
+ public SplitLocationInfo[] getLocationInfo() throws IOException {
+ return fs.getLocationInfo();
+ }
}
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java Sat Jul 12 02:24:40 2014
@@ -22,6 +22,8 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -51,10 +53,25 @@ public abstract class InputSplit {
/**
* Get the list of nodes by name where the data for the split would be local.
* The locations do not need to be serialized.
+ *
* @return a new array of the node nodes.
* @throws IOException
* @throws InterruptedException
*/
public abstract
String[] getLocations() throws IOException, InterruptedException;
+
+ /**
+ * Gets info about which nodes the input split is stored on and how it is
+ * stored at each location.
+ *
+ * @return list of <code>SplitLocationInfo</code>s describing how the split
+ * data is stored at each location. A null value indicates that all the
+ * locations have the data stored on disk.
+ * @throws IOException
+ */
+ @Evolving
+ public SplitLocationInfo[] getLocationInfo() throws IOException {
+ return null;
+ }
}
\ No newline at end of file
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Sat Jul 12 02:24:40 2014
@@ -579,7 +579,17 @@ public interface MRJobConfig {
MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold";
public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
50;
-
+
+ /**
+ * The threshold in terms of seconds after which an unsatisfied mapper request
+ * triggers reducer preemption to free space. Default 0 implies that the reduces
+ * should be preempted immediately after allocation if there is currently no
+ * room for newly allocated mappers.
+ */
+ public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC =
+ "mapreduce.job.reducer.preempt.delay.sec";
+ public static final int DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC = 0;
+
public static final String MR_AM_ENV =
MR_AM_PREFIX + "env";