You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by su...@apache.org on 2013/01/22 20:33:25 UTC
svn commit: r1437113 [2/5] - in
/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ conf/
dev-support/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduc...
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Tue Jan 22 19:33:02 2013
@@ -165,6 +165,7 @@ public abstract class TaskAttemptImpl im
private Token<JobTokenIdentifier> jobToken;
private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
private static String initialClasspath = null;
+ private static String initialAppClasspath = null;
private static Object commonContainerSpecLock = new Object();
private static ContainerLaunchContext commonContainerSpec = null;
private static final Object classpathLock = new Object();
@@ -527,7 +528,10 @@ public abstract class TaskAttemptImpl im
//TODO:create the resource reqt for this Task attempt
this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
- this.resourceCapability.setMemory(getMemoryRequired(conf, taskId.getTaskType()));
+ this.resourceCapability.setMemory(
+ getMemoryRequired(conf, taskId.getTaskType()));
+ this.resourceCapability.setVirtualCores(
+ getCpuRequired(conf, taskId.getTaskType()));
this.dataLocalHosts = dataLocalHosts;
RackResolver.init(conf);
@@ -551,6 +555,21 @@ public abstract class TaskAttemptImpl im
return memory;
}
+ private int getCpuRequired(Configuration conf, TaskType taskType) {
+ int vcores = 1;
+ if (taskType == TaskType.MAP) {
+ vcores =
+ conf.getInt(MRJobConfig.MAP_CPU_VCORES,
+ MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+ } else if (taskType == TaskType.REDUCE) {
+ vcores =
+ conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
+ MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+ }
+
+ return vcores;
+ }
+
/**
* Create a {@link LocalResource} record with all the given parameters.
*/
@@ -581,6 +600,7 @@ public abstract class TaskAttemptImpl im
Map<String, String> env = new HashMap<String, String>();
MRApps.setClasspath(env, conf);
initialClasspath = env.get(Environment.CLASSPATH.name());
+ initialAppClasspath = env.get(Environment.APP_CLASSPATH.name());
initialClasspathFlag.set(true);
return initialClasspath;
}
@@ -679,6 +699,13 @@ public abstract class TaskAttemptImpl im
environment,
Environment.CLASSPATH.name(),
getInitialClasspath(conf));
+
+ if (initialAppClasspath != null) {
+ Apps.addToEnvironment(
+ environment,
+ Environment.APP_CLASSPATH.name(),
+ initialAppClasspath);
+ }
} catch (IOException e) {
throw new YarnException(e);
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Tue Jan 22 19:33:02 2013
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -100,6 +101,7 @@ public class RecoveryService extends Com
private final ApplicationAttemptId applicationAttemptId;
private final OutputCommitter committer;
+ private final boolean newApiCommitter;
private final Dispatcher dispatcher;
private final ControlledClock clock;
@@ -113,10 +115,11 @@ public class RecoveryService extends Com
private volatile boolean recoveryMode = false;
public RecoveryService(ApplicationAttemptId applicationAttemptId,
- Clock clock, OutputCommitter committer) {
+ Clock clock, OutputCommitter committer, boolean newApiCommitter) {
super("RecoveringDispatcher");
this.applicationAttemptId = applicationAttemptId;
this.committer = committer;
+ this.newApiCommitter = newApiCommitter;
this.dispatcher = createRecoveryDispatcher();
this.clock = new ControlledClock(clock);
addService((Service) dispatcher);
@@ -205,18 +208,18 @@ public class RecoveryService extends Com
throws IOException {
FSDataInputStream in = null;
Path historyFile = null;
- String jobName =
+ String jobId =
TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
.toString();
String jobhistoryDir =
- JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf);
+ JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
Path histDirPath =
FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
// read the previous history file
historyFile =
fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
- jobName, (applicationAttemptId.getAttemptId() - 1)));
+ jobId, (applicationAttemptId.getAttemptId() - 1)));
LOG.info("History file is at " + historyFile);
in = fc.open(historyFile);
return in;
@@ -360,8 +363,17 @@ public class RecoveryService extends Com
switch (state) {
case SUCCEEDED:
//recover the task output
- TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
- attInfo.getAttemptId());
+
+ // check the committer type and construct corresponding context
+ TaskAttemptContext taskContext = null;
+ if(newApiCommitter) {
+ taskContext = new TaskAttemptContextImpl(getConfig(),
+ attInfo.getAttemptId());
+ } else {
+ taskContext = new org.apache.hadoop.mapred.TaskAttemptContextImpl(new JobConf(getConfig()),
+ TypeConverter.fromYarn(aId));
+ }
+
try {
TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1);
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Tue Jan 22 19:33:02 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -62,7 +63,8 @@ import org.apache.hadoop.yarn.service.Ab
/**
* Registers/unregisters to RM and sends heartbeats to RM.
*/
-public abstract class RMCommunicator extends AbstractService {
+public abstract class RMCommunicator extends AbstractService
+ implements RMHeartbeatHandler {
private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
private int rmPollInterval;//millis
protected ApplicationId applicationId;
@@ -77,6 +79,8 @@ public abstract class RMCommunicator ext
private Resource minContainerCapability;
private Resource maxContainerCapability;
protected Map<ApplicationAccessType, String> applicationACLs;
+ private volatile long lastHeartbeatTime;
+ private ConcurrentLinkedQueue<Runnable> heartbeatCallbacks;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@@ -95,6 +99,7 @@ public abstract class RMCommunicator ext
this.applicationId = context.getApplicationID();
this.applicationAttemptId = context.getApplicationAttemptId();
this.stopped = new AtomicBoolean(false);
+ this.heartbeatCallbacks = new ConcurrentLinkedQueue<Runnable>();
}
@Override
@@ -136,14 +141,19 @@ public abstract class RMCommunicator ext
protected void register() {
//Register
- InetSocketAddress serviceAddr = clientService.getBindAddress();
+ InetSocketAddress serviceAddr = null;
+ if (clientService != null ) {
+ serviceAddr = clientService.getBindAddress();
+ }
try {
RegisterApplicationMasterRequest request =
recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
request.setApplicationAttemptId(applicationAttemptId);
- request.setHost(serviceAddr.getHostName());
- request.setRpcPort(serviceAddr.getPort());
- request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
+ if (serviceAddr != null) {
+ request.setHost(serviceAddr.getHostName());
+ request.setRpcPort(serviceAddr.getPort());
+ request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
+ }
RegisterApplicationMasterResponse response =
scheduler.registerApplicationMaster(request);
minContainerCapability = response.getMinimumResourceCapability();
@@ -236,8 +246,12 @@ public abstract class RMCommunicator ext
return;
} catch (Exception e) {
LOG.error("ERROR IN CONTACTING RM. ", e);
+ continue;
// TODO: for other exceptions
}
+
+ lastHeartbeatTime = context.getClock().getTime();
+ executeHeartbeatCallbacks();
} catch (InterruptedException e) {
if (!stopped.get()) {
LOG.warn("Allocated thread interrupted. Returning.");
@@ -295,6 +309,23 @@ public abstract class RMCommunicator ext
protected abstract void heartbeat() throws Exception;
+ private void executeHeartbeatCallbacks() {
+ Runnable callback = null;
+ while ((callback = heartbeatCallbacks.poll()) != null) {
+ callback.run();
+ }
+ }
+
+ @Override
+ public long getLastHeartbeatTime() {
+ return lastHeartbeatTime;
+ }
+
+ @Override
+ public void runOnNextHeartbeat(Runnable callback) {
+ heartbeatCallbacks.add(callback);
+ }
+
public void setShouldUnregister(boolean shouldUnregister) {
this.shouldUnregister = shouldUnregister;
LOG.info("RMCommunicator notified that shouldUnregistered is: "
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo Tue Jan 22 19:33:02 2013
@@ -1 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
org.apache.hadoop.mapreduce.v2.app.MRClientSecurityInfo
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Tue Jan 22 19:33:02 2013
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertNul
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -32,6 +34,7 @@ import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
@@ -41,7 +44,9 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.Test;
@@ -51,8 +56,9 @@ public class TestTaskAttemptListenerImpl
public MockTaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager,
+ RMHeartbeatHandler rmHeartbeatHandler,
TaskHeartbeatHandler hbHandler) {
- super(context, jobTokenSecretManager);
+ super(context, jobTokenSecretManager, rmHeartbeatHandler);
this.taskHeartbeatHandler = hbHandler;
}
@@ -76,9 +82,12 @@ public class TestTaskAttemptListenerImpl
public void testGetTask() throws IOException {
AppContext appCtx = mock(AppContext.class);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+ RMHeartbeatHandler rmHeartbeatHandler =
+ mock(RMHeartbeatHandler.class);
TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
MockTaskAttemptListenerImpl listener =
- new MockTaskAttemptListenerImpl(appCtx, secret, hbHandler);
+ new MockTaskAttemptListenerImpl(appCtx, secret,
+ rmHeartbeatHandler, hbHandler);
Configuration conf = new Configuration();
listener.init(conf);
listener.start();
@@ -145,16 +154,21 @@ public class TestTaskAttemptListenerImpl
.thenReturn(Arrays.copyOfRange(taskEvents, 0, 2));
when(mockJob.getTaskAttemptCompletionEvents(2, 100))
.thenReturn(Arrays.copyOfRange(taskEvents, 2, 4));
- when(mockJob.getMapAttemptCompletionEvents(0, 100)).thenReturn(mapEvents);
- when(mockJob.getMapAttemptCompletionEvents(0, 2)).thenReturn(mapEvents);
- when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(empty);
+ when(mockJob.getMapAttemptCompletionEvents(0, 100)).thenReturn(
+ TypeConverter.fromYarn(mapEvents));
+ when(mockJob.getMapAttemptCompletionEvents(0, 2)).thenReturn(
+ TypeConverter.fromYarn(mapEvents));
+ when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(
+ TypeConverter.fromYarn(empty));
AppContext appCtx = mock(AppContext.class);
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+ RMHeartbeatHandler rmHeartbeatHandler =
+ mock(RMHeartbeatHandler.class);
final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptListenerImpl listener =
- new TaskAttemptListenerImpl(appCtx, secret) {
+ new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler;
@@ -191,4 +205,46 @@ public class TestTaskAttemptListenerImpl
return tce;
}
+ @Test
+ public void testCommitWindow() throws IOException {
+ SystemClock clock = new SystemClock();
+
+ org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
+ mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
+ when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
+ AppContext appCtx = mock(AppContext.class);
+ when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
+ when(appCtx.getClock()).thenReturn(clock);
+ JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+ RMHeartbeatHandler rmHeartbeatHandler =
+ mock(RMHeartbeatHandler.class);
+ final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+ TaskAttemptListenerImpl listener =
+ new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
+ @Override
+ protected void registerHeartbeatHandler(Configuration conf) {
+ taskHeartbeatHandler = hbHandler;
+ }
+ };
+
+ Configuration conf = new Configuration();
+ listener.init(conf);
+ listener.start();
+
+ // verify commit not allowed when RM heartbeat has not occurred recently
+ TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
+ boolean canCommit = listener.canCommit(tid);
+ assertFalse(canCommit);
+ verify(mockTask, never()).canCommit(any(TaskAttemptId.class));
+
+ // verify commit allowed when RM heartbeat is recent
+ when(rmHeartbeatHandler.getLastHeartbeatTime()).thenReturn(clock.getTime());
+ canCommit = listener.canCommit(tid);
+ assertTrue(canCommit);
+ verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class));
+
+ listener.stop();
+ }
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Tue Jan 22 19:33:02 2013
@@ -18,14 +18,9 @@
package org.apache.hadoop.mapreduce.jobhistory;
-import static junit.framework.Assert.assertFalse;
-import static junit.framework.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static junit.framework.Assert.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
import java.io.File;
import java.io.IOException;
@@ -51,7 +46,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
import org.mockito.Mockito;
-import org.mockito.verification.VerificationMode;
public class TestJobHistoryEventHandler {
@@ -260,13 +254,15 @@ public class TestJobHistoryEventHandler
}
}
- private AppContext mockAppContext(JobId jobId) {
+ private AppContext mockAppContext(ApplicationId appId) {
+ JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appId));
AppContext mockContext = mock(AppContext.class);
Job mockJob = mock(Job.class);
when(mockJob.getTotalMaps()).thenReturn(10);
when(mockJob.getTotalReduces()).thenReturn(10);
when(mockJob.getName()).thenReturn("mockjob");
when(mockContext.getJob(jobId)).thenReturn(mockJob);
+ when(mockContext.getApplicationID()).thenReturn(appId);
return mockContext;
}
@@ -279,7 +275,7 @@ public class TestJobHistoryEventHandler
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
- AppContext mockAppContext = mockAppContext(jobId);
+ AppContext mockAppContext = mockAppContext(appId);
}
private JobHistoryEvent getEventToEnqueue(JobId jobId) {
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue Jan 22 19:33:02 2013
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.JobContext;
@@ -74,6 +75,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
@@ -207,6 +209,16 @@ public class MRApp extends MRAppMaster {
@Override
public void init(Configuration conf) {
+ try {
+ //Create the staging directory if it does not exist
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+ FileSystem fs = getFileSystem(conf);
+ fs.mkdirs(stagingDir);
+ } catch (Exception e) {
+ throw new YarnException("Error creating staging dir", e);
+ }
+
super.init(conf);
if (this.clusterInfo != null) {
getContext().getClusterInfo().setMinContainerCapability(
@@ -215,9 +227,9 @@ public class MRApp extends MRAppMaster {
this.clusterInfo.getMaxContainerCapability());
} else {
getContext().getClusterInfo().setMinContainerCapability(
- BuilderUtils.newResource(1024));
+ BuilderUtils.newResource(1024, 1));
getContext().getClusterInfo().setMaxContainerCapability(
- BuilderUtils.newResource(10240));
+ BuilderUtils.newResource(10240, 1));
}
}
@@ -387,7 +399,8 @@ public class MRApp extends MRAppMaster {
}
@Override
- protected Job createJob(Configuration conf) {
+ protected Job createJob(Configuration conf, JobStateInternal forcedState,
+ String diagnostic) {
UserGroupInformation currentUser = null;
try {
currentUser = UserGroupInformation.getCurrentUser();
@@ -397,7 +410,8 @@ public class MRApp extends MRAppMaster {
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
- isNewApiCommitter(), currentUser.getUserName(), getContext());
+ isNewApiCommitter(), currentUser.getUserName(), getContext(),
+ forcedState, diagnostic);
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class,
@@ -489,7 +503,8 @@ public class MRApp extends MRAppMaster {
return new MRAppContainerAllocator();
}
- protected class MRAppContainerAllocator implements ContainerAllocator {
+ protected class MRAppContainerAllocator
+ implements ContainerAllocator, RMHeartbeatHandler {
private int containerCount;
@Override
@@ -514,6 +529,16 @@ public class MRApp extends MRAppMaster {
new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
container, null));
}
+
+ @Override
+ public long getLastHeartbeatTime() {
+ return getContext().getClock().getTime();
+ }
+
+ @Override
+ public void runOnNextHeartbeat(Runnable callback) {
+ callback.run();
+ }
}
@Override
@@ -566,7 +591,8 @@ public class MRApp extends MRAppMaster {
}
};
- return new CommitterEventHandler(context, stubbedCommitter);
+ return new CommitterEventHandler(context, stubbedCommitter,
+ getRMHeartbeatHandler());
}
@Override
@@ -618,13 +644,14 @@ public class MRApp extends MRAppMaster {
public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Clock clock,
- boolean newApiCommitter, String user, AppContext appContext) {
+ boolean newApiCommitter, String user, AppContext appContext,
+ JobStateInternal forcedState, String diagnostic) {
super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
conf, eventHandler, taskAttemptListener,
new JobTokenSecretManager(), new Credentials(), clock,
getCompletedTaskFromPreviousRun(), metrics,
newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
- appContext);
+ appContext, forcedState, diagnostic);
// This "this leak" is okay because the retained pointer is in an
// instance variable.
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Tue Jan 22 19:33:02 2013
@@ -207,9 +207,9 @@ public class MRAppBenchmark {
RegisterApplicationMasterResponse response =
Records.newRecord(RegisterApplicationMasterResponse.class);
response.setMinimumResourceCapability(BuilderUtils
- .newResource(1024));
+ .newResource(1024, 1));
response.setMaximumResourceCapability(BuilderUtils
- .newResource(10240));
+ .newResource(10240, 1));
return response;
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Tue Jan 22 19:33:02 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.FileSystemCounter;
import org.apache.hadoop.mapreduce.JobACL;
@@ -556,7 +557,7 @@ public class MockJobs extends MockApps {
}
@Override
- public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+ public TaskCompletionEvent[] getMapAttemptCompletionEvents(
int startIndex, int maxEvents) {
return null;
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Tue Jan 22 19:33:02 2013
@@ -252,7 +252,7 @@ public class TestFail {
//task time out is reduced
//when attempt times out, heartbeat handler will send the lost event
//leading to Attempt failure
- return new TaskAttemptListenerImpl(getContext(), null) {
+ return new TaskAttemptListenerImpl(getContext(), null, null) {
@Override
public void startRpcServer(){};
@Override
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java Tue Jan 22 19:33:02 2013
@@ -25,8 +25,10 @@ import java.util.Arrays;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -150,14 +152,16 @@ public class TestFetchFailure {
Assert.assertEquals("Event status not correct for reduce attempt1",
TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
- TaskAttemptCompletionEvent mapEvents[] =
+ TaskCompletionEvent mapEvents[] =
job.getMapAttemptCompletionEvents(0, 2);
+ TaskCompletionEvent convertedEvents[] = TypeConverter.fromYarn(events);
Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length);
Assert.assertArrayEquals("Unexpected map events",
- Arrays.copyOfRange(events, 0, 2), mapEvents);
+ Arrays.copyOfRange(convertedEvents, 0, 2), mapEvents);
mapEvents = job.getMapAttemptCompletionEvents(2, 200);
Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length);
- Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]);
+ Assert.assertEquals("Unexpected map event", convertedEvents[2],
+ mapEvents[0]);
}
/**
@@ -395,14 +399,16 @@ public class TestFetchFailure {
Assert.assertEquals("Event status not correct for reduce attempt1",
TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
- TaskAttemptCompletionEvent mapEvents[] =
+ TaskCompletionEvent mapEvents[] =
job.getMapAttemptCompletionEvents(0, 2);
+ TaskCompletionEvent convertedEvents[] = TypeConverter.fromYarn(events);
Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length);
Assert.assertArrayEquals("Unexpected map events",
- Arrays.copyOfRange(events, 0, 2), mapEvents);
+ Arrays.copyOfRange(convertedEvents, 0, 2), mapEvents);
mapEvents = job.getMapAttemptCompletionEvents(2, 200);
Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length);
- Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]);
+ Assert.assertEquals("Unexpected map event", convertedEvents[2],
+ mapEvents[0]);
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Tue Jan 22 19:33:02 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
@@ -370,8 +371,9 @@ public class TestMRApp {
}
@Override
- protected Job createJob(Configuration conf) {
- spiedJob = spy((JobImpl) super.createJob(conf));
+ protected Job createJob(Configuration conf, JobStateInternal forcedState,
+ String diagnostic) {
+ spiedJob = spy((JobImpl) super.createJob(conf, forcedState, diagnostic));
((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob);
return spiedJob;
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java Tue Jan 22 19:33:02 2013
@@ -17,28 +17,63 @@
*/
package org.apache.hadoop.mapreduce.v2.app;
-import java.io.IOException;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
-import junit.framework.Assert;
+import java.io.File;
+import java.io.IOException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestMRAppMaster {
+ private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class);
+ static String stagingDir = "staging/";
+
+ @BeforeClass
+ public static void setup() {
+ //Do not error out if metrics are inited multiple times
+ DefaultMetricsSystem.setMiniClusterMode(true);
+ File dir = new File(stagingDir);
+ stagingDir = dir.getAbsolutePath();
+ }
+
+ @Before
+ public void cleanup() throws IOException {
+ File dir = new File(stagingDir);
+ if(dir.exists()) {
+ FileUtils.deleteDirectory(dir);
+ }
+ dir.mkdirs();
+ }
+
@Test
public void testMRAppMasterForDifferentUser() throws IOException,
InterruptedException {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000001";
String containerIdStr = "container_1317529182569_0004_000001_1";
- String stagingDir = "/tmp/staging";
+
String userName = "TestAppMasterUser";
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
@@ -49,34 +84,208 @@ public class TestMRAppMaster {
YarnConfiguration conf = new YarnConfiguration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
- Assert.assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR
+ assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR
+ ".staging", appMaster.stagingDirPath.toString());
}
+
+ @Test
+ public void testMRAppMasterMidLock() throws IOException,
+ InterruptedException {
+ String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+ String containerIdStr = "container_1317529182569_0004_000002_1";
+ String userName = "TestAppMasterUser";
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ ApplicationAttemptId applicationAttemptId = ConverterUtils
+ .toApplicationAttemptId(applicationAttemptIdStr);
+ JobId jobId = TypeConverter.toYarn(
+ TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+ Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
+ FileSystem fs = FileSystem.get(conf);
+ //Create the file, but no end file so we should unregister with an error.
+ fs.create(start).close();
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ MRAppMaster appMaster =
+ new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+ System.currentTimeMillis(), false);
+ boolean caught = false;
+ try {
+ MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+ } catch (IOException e) {
+ //The IO Exception is expected
+ LOG.info("Caught expected Exception", e);
+ caught = true;
+ }
+ assertTrue(caught);
+ assertTrue(appMaster.errorHappenedShutDown);
+ assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
+ appMaster.stop();
+ }
+
+ @Test
+ public void testMRAppMasterSuccessLock() throws IOException,
+ InterruptedException {
+ String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+ String containerIdStr = "container_1317529182569_0004_000002_1";
+ String userName = "TestAppMasterUser";
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ ApplicationAttemptId applicationAttemptId = ConverterUtils
+ .toApplicationAttemptId(applicationAttemptIdStr);
+ JobId jobId = TypeConverter.toYarn(
+ TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+ Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
+ Path end = MRApps.getEndJobCommitSuccessFile(conf, userName, jobId);
+ FileSystem fs = FileSystem.get(conf);
+ fs.create(start).close();
+ fs.create(end).close();
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ MRAppMaster appMaster =
+ new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+ System.currentTimeMillis(), false);
+ boolean caught = false;
+ try {
+ MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+ } catch (IOException e) {
+ //The IO Exception is expected
+ LOG.info("Caught expected Exception", e);
+ caught = true;
+ }
+ assertTrue(caught);
+ assertTrue(appMaster.errorHappenedShutDown);
+ assertEquals(JobStateInternal.SUCCEEDED, appMaster.forcedState);
+ appMaster.stop();
+ }
+
+ @Test
+ public void testMRAppMasterFailLock() throws IOException,
+ InterruptedException {
+ String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+ String containerIdStr = "container_1317529182569_0004_000002_1";
+ String userName = "TestAppMasterUser";
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ ApplicationAttemptId applicationAttemptId = ConverterUtils
+ .toApplicationAttemptId(applicationAttemptIdStr);
+ JobId jobId = TypeConverter.toYarn(
+ TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+ Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
+ Path end = MRApps.getEndJobCommitFailureFile(conf, userName, jobId);
+ FileSystem fs = FileSystem.get(conf);
+ fs.create(start).close();
+ fs.create(end).close();
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ MRAppMaster appMaster =
+ new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+ System.currentTimeMillis(), false);
+ boolean caught = false;
+ try {
+ MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+ } catch (IOException e) {
+ //The IO Exception is expected
+ LOG.info("Caught expected Exception", e);
+ caught = true;
+ }
+ assertTrue(caught);
+ assertTrue(appMaster.errorHappenedShutDown);
+ assertEquals(JobStateInternal.FAILED, appMaster.forcedState);
+ appMaster.stop();
+ }
+
+ @Test
+ public void testMRAppMasterMissingStaging() throws IOException,
+ InterruptedException {
+ String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+ String containerIdStr = "container_1317529182569_0004_000002_1";
+ String userName = "TestAppMasterUser";
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ ApplicationAttemptId applicationAttemptId = ConverterUtils
+ .toApplicationAttemptId(applicationAttemptIdStr);
+
+ //Delete the staging directory
+ File dir = new File(stagingDir);
+ if(dir.exists()) {
+ FileUtils.deleteDirectory(dir);
+ }
+
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ MRAppMaster appMaster =
+ new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+ System.currentTimeMillis(), false);
+ boolean caught = false;
+ try {
+ MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+ } catch (IOException e) {
+ //The IO Exception is expected
+ LOG.info("Caught expected Exception", e);
+ caught = true;
+ }
+ assertTrue(caught);
+ assertTrue(appMaster.errorHappenedShutDown);
+ //Copying the history file is disabled, but it is not really visible from
+ //here
+ assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
+ appMaster.stop();
+ }
}
class MRAppMasterTest extends MRAppMaster {
Path stagingDirPath;
private Configuration conf;
+ private boolean overrideInitAndStart;
+ ContainerAllocator mockContainerAllocator;
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, int httpPort,
long submitTime) {
+ this(applicationAttemptId, containerId, host, port, httpPort, submitTime,
+ true);
+ }
+ public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
+ ContainerId containerId, String host, int port, int httpPort,
+ long submitTime, boolean overrideInitAndStart) {
super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
+ this.overrideInitAndStart = overrideInitAndStart;
+ mockContainerAllocator = mock(ContainerAllocator.class);
}
@Override
public void init(Configuration conf) {
- this.conf = conf;
+ if (overrideInitAndStart) {
+ this.conf = conf;
+ } else {
+ super.init(conf);
+ }
+ }
+
+ @Override
+ protected void downloadTokensAndSetupUGI(Configuration conf) {
+ try {
+ this.currentUser = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ @Override
+ protected ContainerAllocator createContainerAllocator(
+ final ClientService clientService, final AppContext context) {
+ return mockContainerAllocator;
}
@Override
public void start() {
- try {
- String user = UserGroupInformation.getCurrentUser().getShortUserName();
- stagingDirPath = MRApps.getStagingAreaDir(conf, user);
- } catch (Exception e) {
- Assert.fail(e.getMessage());
+ if (overrideInitAndStart) {
+ try {
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ stagingDirPath = MRApps.getStagingAreaDir(conf, user);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ } else {
+ super.start();
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Tue Jan 22 19:33:02 2013
@@ -38,6 +38,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Assert;
@@ -70,7 +71,9 @@ import org.apache.hadoop.mapreduce.v2.ut
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -1139,7 +1142,7 @@ public class TestRMContainerAllocator {
}
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
taskAttemptId);
- Resource containerNeed = BuilderUtils.newResource(memory);
+ Resource containerNeed = BuilderUtils.newResource(memory, 1);
if (earlierFailedAttempt) {
return ContainerRequestEvent
.createContainerRequestEventForFailedContainer(attemptId,
@@ -1222,8 +1225,8 @@ public class TestRMContainerAllocator {
when(context.getApplicationAttemptId()).thenReturn(appAttemptId);
when(context.getJob(isA(JobId.class))).thenReturn(job);
when(context.getClusterInfo()).thenReturn(
- new ClusterInfo(BuilderUtils.newResource(1024), BuilderUtils
- .newResource(10240)));
+ new ClusterInfo(BuilderUtils.newResource(1024, 1), BuilderUtils
+ .newResource(10240, 1)));
when(context.getEventHandler()).thenReturn(new EventHandler() {
@Override
public void handle(Event event) {
@@ -1240,6 +1243,13 @@ public class TestRMContainerAllocator {
return context;
}
+ private static AppContext createAppContext(
+ ApplicationAttemptId appAttemptId, Job job, Clock clock) {
+ AppContext context = createAppContext(appAttemptId, job);
+ when(context.getClock()).thenReturn(clock);
+ return context;
+ }
+
private static ClientService createMockClientService() {
ClientService service = mock(ClientService.class);
when(service.getBindAddress()).thenReturn(
@@ -1264,6 +1274,15 @@ public class TestRMContainerAllocator {
super.start();
}
+ public MyContainerAllocator(MyResourceManager rm, Configuration conf,
+ ApplicationAttemptId appAttemptId, Job job, Clock clock) {
+ super(createMockClientService(),
+ createAppContext(appAttemptId, job, clock));
+ this.rm = rm;
+ super.init(conf);
+ super.start();
+ }
+
@Override
protected AMRMProtocol createSchedulerProxy() {
return this.rm.getApplicationMasterService();
@@ -1280,12 +1299,12 @@ public class TestRMContainerAllocator {
@Override
protected Resource getMinContainerCapability() {
- return BuilderUtils.newResource(1024);
+ return BuilderUtils.newResource(1024, 1);
}
@Override
protected Resource getMaxContainerCapability() {
- return BuilderUtils.newResource(10240);
+ return BuilderUtils.newResource(10240, 1);
}
public void sendRequest(ContainerRequestEvent req) {
@@ -1465,6 +1484,66 @@ public class TestRMContainerAllocator {
allocator.recalculatedReduceSchedule);
}
+ @Test
+ public void testHeartbeatHandler() throws Exception {
+ LOG.info("Running testHeartbeatHandler");
+
+ Configuration conf = new Configuration();
+ conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1);
+ ControlledClock clock = new ControlledClock(new SystemClock());
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getClock()).thenReturn(clock);
+ when(appContext.getApplicationID()).thenReturn(
+ BuilderUtils.newApplicationId(1, 1));
+
+ RMContainerAllocator allocator = new RMContainerAllocator(
+ mock(ClientService.class), appContext) {
+ @Override
+ protected void register() {
+ }
+ @Override
+ protected AMRMProtocol createSchedulerProxy() {
+ return mock(AMRMProtocol.class);
+ }
+ @Override
+ protected synchronized void heartbeat() throws Exception {
+ }
+ };
+ allocator.init(conf);
+ allocator.start();
+
+ clock.setTime(5);
+ int timeToWaitMs = 5000;
+ while (allocator.getLastHeartbeatTime() != 5 && timeToWaitMs > 0) {
+ Thread.sleep(10);
+ timeToWaitMs -= 10;
+ }
+ Assert.assertEquals(5, allocator.getLastHeartbeatTime());
+ clock.setTime(7);
+ timeToWaitMs = 5000;
+ while (allocator.getLastHeartbeatTime() != 7 && timeToWaitMs > 0) {
+ Thread.sleep(10);
+ timeToWaitMs -= 10;
+ }
+ Assert.assertEquals(7, allocator.getLastHeartbeatTime());
+
+ final AtomicBoolean callbackCalled = new AtomicBoolean(false);
+ allocator.runOnNextHeartbeat(new Runnable() {
+ @Override
+ public void run() {
+ callbackCalled.set(true);
+ }
+ });
+ clock.setTime(8);
+ timeToWaitMs = 5000;
+ while (allocator.getLastHeartbeatTime() != 8 && timeToWaitMs > 0) {
+ Thread.sleep(10);
+ timeToWaitMs -= 10;
+ }
+ Assert.assertEquals(8, allocator.getLastHeartbeatTime());
+ Assert.assertTrue(callbackCalled.get());
+ }
+
public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple();
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Tue Jan 22 19:33:02 2013
@@ -626,6 +626,115 @@ public class TestRecovery {
validateOutput();
}
+ @Test
+ public void testRecoveryWithOldCommiter() throws Exception {
+ int runCount = 0;
+ MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
+ true, ++runCount);
+ Configuration conf = new Configuration();
+ conf.setBoolean("mapred.mapper.new-api", false);
+ conf.setBoolean("mapred.reducer.new-api", false);
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct",
+ 3, job.getTasks().size());
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task mapTask1 = it.next();
+ Task reduceTask1 = it.next();
+
+ // all maps must be running
+ app.waitForState(mapTask1, TaskState.RUNNING);
+
+ TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
+ .next();
+
+ //before sending the TA_DONE, event make sure attempt has come to
+ //RUNNING state
+ app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+
+ //send the done signal to the map
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ task1Attempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait for map task to complete
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+ // Verify the shuffle-port
+ Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+
+ app.waitForState(reduceTask1, TaskState.RUNNING);
+ TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
+
+ // write output corresponding to reduce1
+ writeOutput(reduce1Attempt1, conf);
+
+ //send the done signal to the 1st reduce
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ reduce1Attempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait for first reduce task to complete
+ app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+
+ //stop the app before the job completes.
+ app.stop();
+
+ //rerun
+ //in rerun the map will be recovered from previous run
+ app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
+ ++runCount);
+ conf = new Configuration();
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ conf.setBoolean("mapred.mapper.new-api", false);
+ conf.setBoolean("mapred.reducer.new-api", false);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct",
+ 3, job.getTasks().size());
+ it = job.getTasks().values().iterator();
+ mapTask1 = it.next();
+ reduceTask1 = it.next();
+ Task reduceTask2 = it.next();
+
+ // map will be recovered, no need to send done
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+ // Verify the shuffle-port after recovery
+ task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+ Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+
+ // first reduce will be recovered, no need to send done
+ app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+
+ app.waitForState(reduceTask2, TaskState.RUNNING);
+
+ TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
+ .iterator().next();
+ //before sending the TA_DONE, event make sure attempt has come to
+ //RUNNING state
+ app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
+
+ //send the done signal to the 2nd reduce task
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ reduce2Attempt.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait to get it completed
+ app.waitForState(reduceTask2, TaskState.SUCCEEDED);
+
+ app.waitForState(job, JobState.SUCCEEDED);
+ app.verifyCompleted();
+ validateOutput();
+ }
+
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Tue Jan 22 19:33:02 2013
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
@@ -441,7 +442,7 @@ public class TestRuntimeEstimators {
}
@Override
- public TaskAttemptCompletionEvent[]
+ public TaskCompletionEvent[]
getMapAttemptCompletionEvents(int startIndex, int maxEvents) {
throw new UnsupportedOperationException("Not supported yet.");
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Tue Jan 22 19:33:02 2013
@@ -38,10 +38,13 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -71,6 +74,10 @@ import org.junit.Test;
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+ //Staging Dir exists
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+ when(fs.exists(stagingDir)).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(0);
@@ -92,6 +99,10 @@ import org.junit.Test;
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4);
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+ //Staging Dir exists
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+ when(fs.exists(stagingDir)).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(0);
@@ -117,6 +128,10 @@ import org.junit.Test;
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1);
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+ //Staging Dir exists
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+ when(fs.exists(stagingDir)).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(1);
@@ -166,6 +181,11 @@ import org.junit.Test;
}
@Override
+ public RMHeartbeatHandler getRMHeartbeatHandler() {
+ return getStubbedHeartbeatHandler(getContext());
+ }
+
+ @Override
protected void sysexit() {
}
@@ -177,6 +197,7 @@ import org.junit.Test;
@Override
protected void downloadTokensAndSetupUGI(Configuration conf) {
}
+
}
private final class MRAppTestCleanup extends MRApp {
@@ -191,7 +212,8 @@ import org.junit.Test;
}
@Override
- protected Job createJob(Configuration conf) {
+ protected Job createJob(Configuration conf, JobStateInternal forcedState,
+ String diagnostic) {
UserGroupInformation currentUser = null;
try {
currentUser = UserGroupInformation.getCurrentUser();
@@ -201,7 +223,8 @@ import org.junit.Test;
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
- isNewApiCommitter(), currentUser.getUserName(), getContext());
+ isNewApiCommitter(), currentUser.getUserName(), getContext(),
+ forcedState, diagnostic);
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class,
@@ -238,6 +261,11 @@ import org.junit.Test;
}
@Override
+ public RMHeartbeatHandler getRMHeartbeatHandler() {
+ return getStubbedHeartbeatHandler(getContext());
+ }
+
+ @Override
public void cleanupStagingDir() throws IOException {
cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
}
@@ -247,6 +275,20 @@ import org.junit.Test;
}
}
+ private static RMHeartbeatHandler getStubbedHeartbeatHandler(
+ final AppContext appContext) {
+ return new RMHeartbeatHandler() {
+ @Override
+ public long getLastHeartbeatTime() {
+ return appContext.getClock().getTime();
+ }
+ @Override
+ public void runOnNextHeartbeat(Runnable callback) {
+ callback.run();
+ }
+ };
+ }
+
@Test
public void testStagingCleanupOrder() throws Exception {
MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Tue Jan 22 19:33:02 2013
@@ -23,11 +23,13 @@ import static org.mockito.Mockito.doThro
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.File;
import java.io.IOException;
import java.util.EnumSet;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
@@ -56,6 +58,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -66,8 +69,11 @@ import org.apache.hadoop.yarn.event.Disp
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
@@ -77,10 +83,28 @@ import org.junit.Test;
@SuppressWarnings({"rawtypes"})
public class TestJobImpl {
+ static String stagingDir = "target/test-staging/";
+
+ @BeforeClass
+ public static void setup() {
+ File dir = new File(stagingDir);
+ stagingDir = dir.getAbsolutePath();
+ }
+
+ @Before
+ public void cleanup() throws IOException {
+ File dir = new File(stagingDir);
+ if(dir.exists()) {
+ FileUtils.deleteDirectory(dir);
+ }
+ dir.mkdirs();
+ }
+
@Test
public void testJobNoTasks() {
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
@@ -102,6 +126,7 @@ public class TestJobImpl {
@Test(timeout=20000)
public void testCommitJobFailsJob() throws Exception {
Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
@@ -126,6 +151,7 @@ public class TestJobImpl {
@Test(timeout=20000)
public void testCheckJobCompleteSuccess() throws Exception {
Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
@@ -150,6 +176,7 @@ public class TestJobImpl {
@Test(timeout=20000)
public void testKilledDuringSetup() throws Exception {
Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
@@ -186,6 +213,7 @@ public class TestJobImpl {
@Test(timeout=20000)
public void testKilledDuringCommit() throws Exception {
Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
@@ -210,6 +238,7 @@ public class TestJobImpl {
@Test(timeout=20000)
public void testKilledDuringFailAbort() throws Exception {
Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
@@ -251,6 +280,7 @@ public class TestJobImpl {
@Test(timeout=20000)
public void testKilledDuringKillAbort() throws Exception {
Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
@@ -315,7 +345,7 @@ public class TestJobImpl {
// Verify access
JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
- null, null, true, null, 0, null, null);
+ null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -326,7 +356,7 @@ public class TestJobImpl {
// Verify access
JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
- null, null, true, null, 0, null, null);
+ null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -337,7 +367,7 @@ public class TestJobImpl {
// Verify access
JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
- null, null, true, null, 0, null, null);
+ null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -348,7 +378,7 @@ public class TestJobImpl {
// Verify access
JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
- null, null, true, null, 0, null, null);
+ null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -359,7 +389,7 @@ public class TestJobImpl {
// Verify access
JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
- null, null, true, null, 0, null, null);
+ null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job5.checkAccess(ugi1, null));
Assert.assertTrue(job5.checkAccess(ugi2, null));
}
@@ -377,7 +407,7 @@ public class TestJobImpl {
mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null,
- mrAppMetrics, true, null, 0, null, null);
+ mrAppMetrics, true, null, 0, null, null, null, null);
job.handle(diagUpdateEvent);
String diagnostics = job.getReport().getDiagnostics();
Assert.assertNotNull(diagnostics);
@@ -388,7 +418,7 @@ public class TestJobImpl {
mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null,
- mrAppMetrics, true, null, 0, null, null);
+ mrAppMetrics, true, null, 0, null, null, null, null);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
job.handle(diagUpdateEvent);
diagnostics = job.getReport().getDiagnostics();
@@ -443,7 +473,7 @@ public class TestJobImpl {
JobImpl job = new JobImpl(jobId, Records
.newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null, null, null,
- mrAppMetrics, true, null, 0, null, null);
+ mrAppMetrics, true, null, 0, null, null, null, null);
InitTransition initTransition = getInitTransition(2);
JobEvent mockJobEvent = mock(JobEvent.class);
initTransition.transition(job, mockJobEvent);
@@ -502,13 +532,27 @@ public class TestJobImpl {
private static CommitterEventHandler createCommitterEventHandler(
Dispatcher dispatcher, OutputCommitter committer) {
- SystemClock clock = new SystemClock();
+ final SystemClock clock = new SystemClock();
AppContext appContext = mock(AppContext.class);
when(appContext.getEventHandler()).thenReturn(
dispatcher.getEventHandler());
when(appContext.getClock()).thenReturn(clock);
+ RMHeartbeatHandler heartbeatHandler = new RMHeartbeatHandler() {
+ @Override
+ public long getLastHeartbeatTime() {
+ return clock.getTime();
+ }
+ @Override
+ public void runOnNextHeartbeat(Runnable callback) {
+ callback.run();
+ }
+ };
+ ApplicationAttemptId id =
+ ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
+ when(appContext.getApplicationID()).thenReturn(id.getApplicationId());
+ when(appContext.getApplicationAttemptId()).thenReturn(id);
CommitterEventHandler handler =
- new CommitterEventHandler(appContext, committer);
+ new CommitterEventHandler(appContext, committer, heartbeatHandler);
dispatcher.register(CommitterEventType.class, handler);
return handler;
}
@@ -590,7 +634,8 @@ public class TestJobImpl {
super(jobId, applicationAttemptId, conf, eventHandler,
null, new JobTokenSecretManager(), new Credentials(),
new SystemClock(), null, MRAppMetrics.create(),
- newApiCommitter, user, System.currentTimeMillis(), null, null);
+ newApiCommitter, user, System.currentTimeMillis(), null, null, null,
+ null);
initTransition = getInitTransition(numSplits);
localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Tue Jan 22 19:33:02 2013
@@ -197,7 +197,7 @@ public class TestTaskAttempt{
conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemMb);
conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, reduceMemMb);
app.setClusterInfo(new ClusterInfo(BuilderUtils
- .newResource(minContainerSize), BuilderUtils.newResource(10240)));
+ .newResource(minContainerSize, 1), BuilderUtils.newResource(10240, 1)));
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Tue Jan 22 19:33:02 2013
@@ -1,3 +1,20 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package org.apache.hadoop.mapreduce.v2.app.launcher;
import static org.mockito.Matchers.any;
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java Tue Jan 22 19:33:02 2013
@@ -1,3 +1,20 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package org.apache.hadoop.mapreduce.v2.app.local;
import static org.mockito.Matchers.isA;
@@ -99,8 +116,8 @@ public class TestLocalContainerAllocator
when(ctx.getApplicationAttemptId()).thenReturn(attemptId);
when(ctx.getJob(isA(JobId.class))).thenReturn(job);
when(ctx.getClusterInfo()).thenReturn(
- new ClusterInfo(BuilderUtils.newResource(1024), BuilderUtils
- .newResource(10240)));
+ new ClusterInfo(BuilderUtils.newResource(1024, 1), BuilderUtils
+ .newResource(10240, 1)));
when(ctx.getEventHandler()).thenReturn(eventHandler);
return ctx;
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml Tue Jan 22 19:33:02 2013
@@ -81,6 +81,7 @@
<configuration>
<executable>protoc</executable>
<arguments>
+ <argument>-I../../../hadoop-common-project/hadoop-common/src/main/proto/</argument>
<argument>-I../../../hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/</argument>
<argument>-Isrc/main/proto/</argument>
<argument>--java_out=target/generated-sources/proto</argument>