You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2012/10/19 20:59:07 UTC
svn commit: r1400227 [1/5] - in
/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2:
./ job/impl/ webapp/
Author: sseth
Date: Fri Oct 19 18:59:06 2012
New Revision: 1400227
URL: http://svn.apache.org/viewvc?rev=1400227&view=rev
Log:
MAPREDUCE-4738. Part1. Uncomment disabled unit tests. (sseth)
Modified:
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskAttempt.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebApp.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServices.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesAttempts.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesJobConf.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesJobs.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/webapp/TestAMWebServicesTasks.java
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java?rev=1400227&r1=1400226&r2=1400227&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java Fri Oct 19 18:59:06 2012
@@ -1,283 +1,283 @@
-///**
-//* Licensed to the Apache Software Foundation (ASF) under one
-//* or more contributor license agreements. See the NOTICE file
-//* distributed with this work for additional information
-//* regarding copyright ownership. The ASF licenses this file
-//* to you under the Apache License, Version 2.0 (the
-//* "License"); you may not use this file except in compliance
-//* with the License. You may obtain a copy of the License at
-//*
-//* http://www.apache.org/licenses/LICENSE-2.0
-//*
-//* Unless required by applicable law or agreed to in writing, software
-//* distributed under the License is distributed on an "AS IS" BASIS,
-//* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//* See the License for the specific language governing permissions and
-//* limitations under the License.
-//*/
-//
-//package org.apache.hadoop.mapreduce.v2.app2;
-//
-//import java.util.ArrayList;
-//import java.util.List;
-//import java.util.concurrent.BlockingQueue;
-//import java.util.concurrent.LinkedBlockingQueue;
-//
-//import org.apache.hadoop.conf.Configuration;
-//import org.apache.hadoop.mapreduce.v2.api.records.JobState;
-//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-//import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
-//import org.apache.hadoop.mapreduce.v2.app2.job.Job;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptContainerAssignedEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
-//import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocatorEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerAllocator;
-//import org.apache.hadoop.yarn.YarnException;
-//import org.apache.hadoop.yarn.api.AMRMProtocol;
-//import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-//import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-//import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-//import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-//import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-//import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-//import org.apache.hadoop.yarn.api.records.AMResponse;
-//import org.apache.hadoop.yarn.api.records.Container;
-//import org.apache.hadoop.yarn.api.records.ContainerId;
-//import org.apache.hadoop.yarn.api.records.NodeId;
-//import org.apache.hadoop.yarn.api.records.ResourceRequest;
-//import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-//import org.apache.hadoop.yarn.factories.RecordFactory;
-//import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-//import org.apache.hadoop.yarn.service.AbstractService;
-//import org.apache.hadoop.yarn.util.BuilderUtils;
-//import org.apache.hadoop.yarn.util.Records;
-//import org.apache.log4j.Level;
-//import org.apache.log4j.LogManager;
-//import org.apache.log4j.Logger;
-//import org.junit.Test;
-//
-//public class MRAppBenchmark {
-//
-// private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-//
-// /**
-// * Runs memory and time benchmark with Mock MRApp.
-// */
-// public void run(MRApp app) throws Exception {
-// Logger rootLogger = LogManager.getRootLogger();
-// rootLogger.setLevel(Level.WARN);
-// long startTime = System.currentTimeMillis();
-// Job job = app.submit(new Configuration());
-// while (!job.getReport().getJobState().equals(JobState.SUCCEEDED)) {
-// printStat(job, startTime);
-// Thread.sleep(2000);
-// }
-// printStat(job, startTime);
-// }
-//
-// private void printStat(Job job, long startTime) throws Exception {
-// long currentTime = System.currentTimeMillis();
-// Runtime.getRuntime().gc();
-// long mem = Runtime.getRuntime().totalMemory()
-// - Runtime.getRuntime().freeMemory();
-// System.out.println("JobState:" + job.getState() +
-// " CompletedMaps:" + job.getCompletedMaps() +
-// " CompletedReduces:" + job.getCompletedReduces() +
-// " Memory(total-free)(KB):" + mem/1024 +
-// " ElapsedTime(ms):" + (currentTime - startTime));
-// }
-//
-// //Throttles the maximum number of concurrent running tasks.
-// //This affects the memory requirement since
-// //org.apache.hadoop.mapred.MapTask/ReduceTask is loaded in memory for all
-// //running task and discarded once the task is launched.
-// static class ThrottledMRApp extends MRApp {
-//
-// int maxConcurrentRunningTasks;
-// volatile int concurrentRunningTasks;
-// ThrottledMRApp(int maps, int reduces, int maxConcurrentRunningTasks) {
-// super(maps, reduces, true, "ThrottledMRApp", true);
-// this.maxConcurrentRunningTasks = maxConcurrentRunningTasks;
-// }
-//
-// @Override
-// protected void attemptLaunched(TaskAttemptId attemptID) {
-// super.attemptLaunched(attemptID);
-// //the task is launched and sends done immediately
-// concurrentRunningTasks--;
-// }
-//
-// @Override
-// protected ContainerAllocator createContainerAllocator(
-// ClientService clientService, AppContext context) {
-// return new ThrottledContainerAllocator();
-// }
-//
-// class ThrottledContainerAllocator extends AbstractService
-// implements ContainerAllocator {
-// private int containerCount;
-// private Thread thread;
-// private BlockingQueue<ContainerAllocatorEvent> eventQueue =
-// new LinkedBlockingQueue<ContainerAllocatorEvent>();
-// public ThrottledContainerAllocator() {
-// super("ThrottledContainerAllocator");
-// }
-// @Override
-// public void handle(ContainerAllocatorEvent event) {
-// try {
-// eventQueue.put(event);
-// } catch (InterruptedException e) {
-// throw new YarnException(e);
-// }
-// }
-// @Override
-// public void start() {
-// thread = new Thread(new Runnable() {
-// @Override
-// public void run() {
-// ContainerAllocatorEvent event = null;
-// while (!Thread.currentThread().isInterrupted()) {
-// try {
-// if (concurrentRunningTasks < maxConcurrentRunningTasks) {
-// event = eventQueue.take();
-// ContainerId cId =
-// recordFactory.newRecordInstance(ContainerId.class);
-// cId.setApplicationAttemptId(
-// getContext().getApplicationAttemptId());
-// cId.setId(containerCount++);
-// //System.out.println("Allocating " + containerCount);
-//
-// Container container =
-// recordFactory.newRecordInstance(Container.class);
-// container.setId(cId);
-// NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
-// nodeId.setHost("dummy");
-// nodeId.setPort(1234);
-// container.setNodeId(nodeId);
-// container.setContainerToken(null);
-// container.setNodeHttpAddress("localhost:8042");
-// getContext().getEventHandler()
-// .handle(
-// new TaskAttemptContainerAssignedEvent(event
-// .getAttemptID(), container, null));
-// concurrentRunningTasks++;
-// } else {
-// Thread.sleep(1000);
-// }
-// } catch (InterruptedException e) {
-// System.out.println("Returning, interrupted");
-// return;
-// }
-// }
-// }
-// });
-// thread.start();
-// super.start();
-// }
-//
-// @Override
-// public void stop() {
-// thread.interrupt();
-// super.stop();
-// }
-// }
-// }
-//
-// @Test
-// public void benchmark1() throws Exception {
-// int maps = 100; // Adjust for benchmarking. Start with thousands.
-// int reduces = 0;
-// System.out.println("Running benchmark with maps:"+maps +
-// " reduces:"+reduces);
-// run(new MRApp(maps, reduces, true, this.getClass().getName(), true) {
-//
-// @Override
-// protected ContainerAllocator createContainerAllocator(
-// ClientService clientService, AppContext context) {
-// return new RMContainerAllocator(clientService, context) {
-// @Override
-// protected AMRMProtocol createSchedulerProxy() {
-// return new AMRMProtocol() {
-//
-// @Override
-// public RegisterApplicationMasterResponse
-// registerApplicationMaster(
-// RegisterApplicationMasterRequest request)
-// throws YarnRemoteException {
-// RegisterApplicationMasterResponse response =
-// Records.newRecord(RegisterApplicationMasterResponse.class);
-// response.setMinimumResourceCapability(BuilderUtils
-// .newResource(1024));
-// response.setMaximumResourceCapability(BuilderUtils
-// .newResource(10240));
-// return response;
-// }
-//
-// @Override
-// public FinishApplicationMasterResponse finishApplicationMaster(
-// FinishApplicationMasterRequest request)
-// throws YarnRemoteException {
-// FinishApplicationMasterResponse response =
-// Records.newRecord(FinishApplicationMasterResponse.class);
-// return response;
-// }
-//
-// @Override
-// public AllocateResponse allocate(AllocateRequest request)
-// throws YarnRemoteException {
-//
-// AllocateResponse response =
-// Records.newRecord(AllocateResponse.class);
-// List<ResourceRequest> askList = request.getAskList();
-// List<Container> containers = new ArrayList<Container>();
-// for (ResourceRequest req : askList) {
-// if (req.getHostName() != "*") {
-// continue;
-// }
-// int numContainers = req.getNumContainers();
-// for (int i = 0; i < numContainers; i++) {
-// ContainerId containerId =
-// BuilderUtils.newContainerId(
-// request.getApplicationAttemptId(),
-// request.getResponseId() + i);
-// containers.add(BuilderUtils
-// .newContainer(containerId, BuilderUtils.newNodeId("host"
-// + containerId.getId(), 2345),
-// "host" + containerId.getId() + ":5678", req
-// .getCapability(), req.getPriority(), null));
-// }
-// }
-//
-// AMResponse amResponse = Records.newRecord(AMResponse.class);
-// amResponse.setAllocatedContainers(containers);
-// amResponse.setResponseId(request.getResponseId() + 1);
-// response.setAMResponse(amResponse);
-// response.setNumClusterNodes(350);
-// return response;
-// }
-// };
-// }
-// };
-// }
-// });
-// }
-//
-// @Test
-// public void benchmark2() throws Exception {
-// int maps = 100; // Adjust for benchmarking, start with a couple of thousands
-// int reduces = 50;
-// int maxConcurrentRunningTasks = 500;
-//
-// System.out.println("Running benchmark with throttled running tasks with " +
-// "maxConcurrentRunningTasks:" + maxConcurrentRunningTasks +
-// " maps:" + maps + " reduces:" + reduces);
-// run(new ThrottledMRApp(maps, reduces, maxConcurrentRunningTasks));
-// }
-//
-// public static void main(String[] args) throws Exception {
-// MRAppBenchmark benchmark = new MRAppBenchmark();
-// benchmark.benchmark1();
-// benchmark.benchmark2();
-// }
-//
-//}
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerAllocator;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+public class MRAppBenchmark {
+
+ private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+ /**
+ * Runs memory and time benchmark with Mock MRApp.
+ */
+ public void run(MRApp app) throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.WARN);
+ long startTime = System.currentTimeMillis();
+ Job job = app.submit(new Configuration());
+ while (!job.getReport().getJobState().equals(JobState.SUCCEEDED)) {
+ printStat(job, startTime);
+ Thread.sleep(2000);
+ }
+ printStat(job, startTime);
+ }
+
+ private void printStat(Job job, long startTime) throws Exception {
+ long currentTime = System.currentTimeMillis();
+ Runtime.getRuntime().gc();
+ long mem = Runtime.getRuntime().totalMemory()
+ - Runtime.getRuntime().freeMemory();
+ System.out.println("JobState:" + job.getState() +
+ " CompletedMaps:" + job.getCompletedMaps() +
+ " CompletedReduces:" + job.getCompletedReduces() +
+ " Memory(total-free)(KB):" + mem/1024 +
+ " ElapsedTime(ms):" + (currentTime - startTime));
+ }
+
+ //Throttles the maximum number of concurrent running tasks.
+ //This affects the memory requirement since
+ //org.apache.hadoop.mapred.MapTask/ReduceTask is loaded in memory for all
+ //running task and discarded once the task is launched.
+ static class ThrottledMRApp extends MRApp {
+
+ int maxConcurrentRunningTasks;
+ volatile int concurrentRunningTasks;
+ ThrottledMRApp(int maps, int reduces, int maxConcurrentRunningTasks) {
+ super(maps, reduces, true, "ThrottledMRApp", true);
+ this.maxConcurrentRunningTasks = maxConcurrentRunningTasks;
+ }
+
+ @Override
+ protected void attemptLaunched(TaskAttemptId attemptID) {
+ super.attemptLaunched(attemptID);
+ //the task is launched and sends done immediately
+ concurrentRunningTasks--;
+ }
+
+ @Override
+ protected ContainerAllocator createContainerAllocator(
+ ClientService clientService, AppContext context) {
+ return new ThrottledContainerAllocator();
+ }
+
+ class ThrottledContainerAllocator extends AbstractService
+ implements ContainerAllocator {
+ private int containerCount;
+ private Thread thread;
+ private BlockingQueue<ContainerAllocatorEvent> eventQueue =
+ new LinkedBlockingQueue<ContainerAllocatorEvent>();
+ public ThrottledContainerAllocator() {
+ super("ThrottledContainerAllocator");
+ }
+ @Override
+ public void handle(ContainerAllocatorEvent event) {
+ try {
+ eventQueue.put(event);
+ } catch (InterruptedException e) {
+ throw new YarnException(e);
+ }
+ }
+ @Override
+ public void start() {
+ thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ ContainerAllocatorEvent event = null;
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ if (concurrentRunningTasks < maxConcurrentRunningTasks) {
+ event = eventQueue.take();
+ ContainerId cId =
+ recordFactory.newRecordInstance(ContainerId.class);
+ cId.setApplicationAttemptId(
+ getContext().getApplicationAttemptId());
+ cId.setId(containerCount++);
+ //System.out.println("Allocating " + containerCount);
+
+ Container container =
+ recordFactory.newRecordInstance(Container.class);
+ container.setId(cId);
+ NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
+ nodeId.setHost("dummy");
+ nodeId.setPort(1234);
+ container.setNodeId(nodeId);
+ container.setContainerToken(null);
+ container.setNodeHttpAddress("localhost:8042");
+ getContext().getEventHandler()
+ .handle(
+ new TaskAttemptContainerAssignedEvent(event
+ .getAttemptID(), container, null));
+ concurrentRunningTasks++;
+ } else {
+ Thread.sleep(1000);
+ }
+ } catch (InterruptedException e) {
+ System.out.println("Returning, interrupted");
+ return;
+ }
+ }
+ }
+ });
+ thread.start();
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ thread.interrupt();
+ super.stop();
+ }
+ }
+ }
+
+ @Test
+ public void benchmark1() throws Exception {
+ int maps = 100; // Adjust for benchmarking. Start with thousands.
+ int reduces = 0;
+ System.out.println("Running benchmark with maps:"+maps +
+ " reduces:"+reduces);
+ run(new MRApp(maps, reduces, true, this.getClass().getName(), true) {
+
+ @Override
+ protected ContainerAllocator createContainerAllocator(
+ ClientService clientService, AppContext context) {
+ return new RMContainerAllocator(clientService, context) {
+ @Override
+ protected AMRMProtocol createSchedulerProxy() {
+ return new AMRMProtocol() {
+
+ @Override
+ public RegisterApplicationMasterResponse
+ registerApplicationMaster(
+ RegisterApplicationMasterRequest request)
+ throws YarnRemoteException {
+ RegisterApplicationMasterResponse response =
+ Records.newRecord(RegisterApplicationMasterResponse.class);
+ response.setMinimumResourceCapability(BuilderUtils
+ .newResource(1024));
+ response.setMaximumResourceCapability(BuilderUtils
+ .newResource(10240));
+ return response;
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request)
+ throws YarnRemoteException {
+ FinishApplicationMasterResponse response =
+ Records.newRecord(FinishApplicationMasterResponse.class);
+ return response;
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnRemoteException {
+
+ AllocateResponse response =
+ Records.newRecord(AllocateResponse.class);
+ List<ResourceRequest> askList = request.getAskList();
+ List<Container> containers = new ArrayList<Container>();
+ for (ResourceRequest req : askList) {
+ if (req.getHostName() != "*") {
+ continue;
+ }
+ int numContainers = req.getNumContainers();
+ for (int i = 0; i < numContainers; i++) {
+ ContainerId containerId =
+ BuilderUtils.newContainerId(
+ request.getApplicationAttemptId(),
+ request.getResponseId() + i);
+ containers.add(BuilderUtils
+ .newContainer(containerId, BuilderUtils.newNodeId("host"
+ + containerId.getId(), 2345),
+ "host" + containerId.getId() + ":5678", req
+ .getCapability(), req.getPriority(), null));
+ }
+ }
+
+ AMResponse amResponse = Records.newRecord(AMResponse.class);
+ amResponse.setAllocatedContainers(containers);
+ amResponse.setResponseId(request.getResponseId() + 1);
+ response.setAMResponse(amResponse);
+ response.setNumClusterNodes(350);
+ return response;
+ }
+ };
+ }
+ };
+ }
+ });
+ }
+
+ @Test
+ public void benchmark2() throws Exception {
+ int maps = 100; // Adjust for benchmarking, start with a couple of thousands
+ int reduces = 50;
+ int maxConcurrentRunningTasks = 500;
+
+ System.out.println("Running benchmark with throttled running tasks with " +
+ "maxConcurrentRunningTasks:" + maxConcurrentRunningTasks +
+ " maps:" + maps + " reduces:" + reduces);
+ run(new ThrottledMRApp(maps, reduces, maxConcurrentRunningTasks));
+ }
+
+ public static void main(String[] args) throws Exception {
+ MRAppBenchmark benchmark = new MRAppBenchmark();
+ benchmark.benchmark1();
+ benchmark.benchmark2();
+ }
+
+}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskAttempt.java?rev=1400227&r1=1400226&r2=1400227&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskAttempt.java Fri Oct 19 18:59:06 2012
@@ -1,650 +1,650 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.hadoop.mapreduce.v2.app2.job.impl;
-//
-//import static org.junit.Assert.assertEquals;
-//import static org.junit.Assert.assertFalse;
-//import static org.mockito.Mockito.mock;
-//import static org.mockito.Mockito.spy;
-//import static org.mockito.Mockito.times;
-//import static org.mockito.Mockito.verify;
-//import static org.mockito.Mockito.when;
-//
-//import java.io.IOException;
-//import java.net.InetSocketAddress;
-//import java.util.Arrays;
-//import java.util.HashMap;
-//import java.util.Iterator;
-//import java.util.Map;
-//
-//import junit.framework.Assert;
-//
-//import org.apache.hadoop.conf.Configuration;
-//import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-//import org.apache.hadoop.fs.FileStatus;
-//import org.apache.hadoop.fs.FileSystem;
-//import org.apache.hadoop.fs.Path;
-//import org.apache.hadoop.fs.RawLocalFileSystem;
-//import org.apache.hadoop.io.DataInputByteBuffer;
-//import org.apache.hadoop.io.Text;
-//import org.apache.hadoop.mapred.JobConf;
-//import org.apache.hadoop.mapred.MapTaskAttemptImpl;
-//import org.apache.hadoop.mapred.WrappedJvmID;
-//import org.apache.hadoop.mapreduce.JobCounter;
-//import org.apache.hadoop.mapreduce.MRJobConfig;
-//import org.apache.hadoop.mapreduce.OutputCommitter;
-//import org.apache.hadoop.mapreduce.TypeConverter;
-//import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
-//import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
-//import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-//import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
-//import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-//import org.apache.hadoop.mapreduce.v2.api.records.JobState;
-//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
-//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
-//import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
-//import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
-//import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
-//import org.apache.hadoop.mapreduce.v2.app2.AppContext;
-//import org.apache.hadoop.mapreduce.v2.app2.ControlledClock;
-//import org.apache.hadoop.mapreduce.v2.app2.MRApp;
-//import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
-//import org.apache.hadoop.mapreduce.v2.app2.job.Job;
-//import org.apache.hadoop.mapreduce.v2.app2.job.Task;
-//import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptContainerAssignedEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptContainerLaunchedEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
-//import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestEvent;
-//import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
-//import org.apache.hadoop.security.Credentials;
-//import org.apache.hadoop.security.UserGroupInformation;
-//import org.apache.hadoop.security.token.Token;
-//import org.apache.hadoop.security.token.TokenIdentifier;
-//import org.apache.hadoop.yarn.Clock;
-//import org.apache.hadoop.yarn.ClusterInfo;
-//import org.apache.hadoop.yarn.SystemClock;
-//import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-//import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-//import org.apache.hadoop.yarn.api.records.ApplicationId;
-//import org.apache.hadoop.yarn.api.records.Container;
-//import org.apache.hadoop.yarn.api.records.ContainerId;
-//import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-//import org.apache.hadoop.yarn.api.records.NodeId;
-//import org.apache.hadoop.yarn.api.records.Resource;
-//import org.apache.hadoop.yarn.event.Event;
-//import org.apache.hadoop.yarn.event.EventHandler;
-//import org.apache.hadoop.yarn.util.BuilderUtils;
-//import org.junit.Test;
-//import org.mockito.ArgumentCaptor;
-//
-//@SuppressWarnings({"unchecked", "rawtypes"})
-//public class TestTaskAttempt{
-// @Test
-// public void testAttemptContainerRequest() throws Exception {
-// //WARNING: This test must run first. This is because there is an
-// // optimization where the credentials passed in are cached statically so
-// // they do not need to be recomputed when creating a new
-// // ContainerLaunchContext. if other tests run first this code will cache
-// // their credentials and this test will fail trying to look for the
-// // credentials it inserted in.
-// final Text SECRET_KEY_ALIAS = new Text("secretkeyalias");
-// final byte[] SECRET_KEY = ("secretkey").getBytes();
-// Map<ApplicationAccessType, String> acls =
-// new HashMap<ApplicationAccessType, String>(1);
-// acls.put(ApplicationAccessType.VIEW_APP, "otheruser");
-// ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
-// JobId jobId = MRBuilderUtils.newJobId(appId, 1);
-// TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
-// Path jobFile = mock(Path.class);
-//
-// EventHandler eventHandler = mock(EventHandler.class);
-// TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-// when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
-//
-// JobConf jobConf = new JobConf();
-// jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
-// jobConf.setBoolean("fs.file.impl.disable.cache", true);
-// jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
-//
-// // setup UGI for security so tokens and keys are preserved
-// jobConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
-// UserGroupInformation.setConfiguration(jobConf);
-//
-// Credentials credentials = new Credentials();
-// credentials.addSecretKey(SECRET_KEY_ALIAS, SECRET_KEY);
-// Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
-// ("tokenid").getBytes(), ("tokenpw").getBytes(),
-// new Text("tokenkind"), new Text("tokenservice"));
-//
-// TaskAttemptImpl taImpl =
-// new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
-// mock(TaskSplitMetaInfo.class), jobConf, taListener,
-// mock(OutputCommitter.class), jobToken, credentials,
-// new SystemClock(), null);
-//
-// jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());
-// ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
-//
-// ContainerLaunchContext launchCtx =
-// TaskAttemptImpl.createContainerLaunchContext(acls, containerId,
-// jobConf, jobToken, taImpl.createRemoteTask(),
-// TypeConverter.fromYarn(jobId), mock(Resource.class),
-// mock(WrappedJvmID.class), taListener,
-// credentials);
-//
-// Assert.assertEquals("ACLs mismatch", acls, launchCtx.getApplicationACLs());
-// Credentials launchCredentials = new Credentials();
-//
-// DataInputByteBuffer dibb = new DataInputByteBuffer();
-// dibb.reset(launchCtx.getContainerTokens());
-// launchCredentials.readTokenStorageStream(dibb);
-//
-// // verify all tokens specified for the task attempt are in the launch context
-// for (Token<? extends TokenIdentifier> token : credentials.getAllTokens()) {
-// Token<? extends TokenIdentifier> launchToken =
-// launchCredentials.getToken(token.getService());
-// Assert.assertNotNull("Token " + token.getService() + " is missing",
-// launchToken);
-// Assert.assertEquals("Token " + token.getService() + " mismatch",
-// token, launchToken);
-// }
-//
-// // verify the secret key is in the launch context
-// Assert.assertNotNull("Secret key missing",
-// launchCredentials.getSecretKey(SECRET_KEY_ALIAS));
-// Assert.assertTrue("Secret key mismatch", Arrays.equals(SECRET_KEY,
-// launchCredentials.getSecretKey(SECRET_KEY_ALIAS)));
-// }
-//
-// static public class StubbedFS extends RawLocalFileSystem {
-// @Override
-// public FileStatus getFileStatus(Path f) throws IOException {
-// return new FileStatus(1, false, 1, 1, 1, f);
-// }
-// }
-//
-// @Test
-// public void testMRAppHistoryForMap() throws Exception {
-// MRApp app = new FailingAttemptsMRApp(1, 0);
-// testMRAppHistory(app);
-// }
-//
-// @Test
-// public void testMRAppHistoryForReduce() throws Exception {
-// MRApp app = new FailingAttemptsMRApp(0, 1);
-// testMRAppHistory(app);
-// }
-//
-// @Test
-// public void testSingleRackRequest() throws Exception {
-// TaskAttemptImpl.RequestContainerTransition rct =
-// new TaskAttemptImpl.RequestContainerTransition(false);
-//
-// EventHandler eventHandler = mock(EventHandler.class);
-// String[] hosts = new String[3];
-// hosts[0] = "host1";
-// hosts[1] = "host2";
-// hosts[2] = "host3";
-// TaskSplitMetaInfo splitInfo =
-// new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l);
-//
-// TaskAttemptImpl mockTaskAttempt =
-// createMapTaskAttemptImplForTest(eventHandler, splitInfo);
-// TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
-//
-// rct.transition(mockTaskAttempt, mockTAEvent);
-//
-// ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
-// verify(eventHandler, times(2)).handle(arg.capture());
-// if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
-// Assert.fail("Second Event not of type ContainerRequestEvent");
-// }
-// ContainerRequestEvent cre =
-// (ContainerRequestEvent) arg.getAllValues().get(1);
-// String[] requestedRacks = cre.getRacks();
-// //Only a single occurrence of /DefaultRack
-// assertEquals(1, requestedRacks.length);
-// }
-//
-// @Test
-// public void testHostResolveAttempt() throws Exception {
-// TaskAttemptImpl.RequestContainerTransition rct =
-// new TaskAttemptImpl.RequestContainerTransition(false);
-//
-// EventHandler eventHandler = mock(EventHandler.class);
-// String[] hosts = new String[3];
-// hosts[0] = "192.168.1.1";
-// hosts[1] = "host2";
-// hosts[2] = "host3";
-// TaskSplitMetaInfo splitInfo =
-// new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l);
-//
-// TaskAttemptImpl mockTaskAttempt =
-// createMapTaskAttemptImplForTest(eventHandler, splitInfo);
-// TaskAttemptImpl spyTa = spy(mockTaskAttempt);
-// when(spyTa.resolveHost(hosts[0])).thenReturn("host1");
-//
-// TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
-// rct.transition(spyTa, mockTAEvent);
-// verify(spyTa).resolveHost(hosts[0]);
-// ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
-// verify(eventHandler, times(2)).handle(arg.capture());
-// if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
-// Assert.fail("Second Event not of type ContainerRequestEvent");
-// }
-// Map<String, Boolean> expected = new HashMap<String, Boolean>();
-// expected.put("host1", true);
-// expected.put("host2", true);
-// expected.put("host3", true);
-// ContainerRequestEvent cre =
-// (ContainerRequestEvent) arg.getAllValues().get(1);
-// String[] requestedHosts = cre.getHosts();
-// for (String h : requestedHosts) {
-// expected.remove(h);
-// }
-// assertEquals(0, expected.size());
-// }
-//
-// @Test
-// public void testSlotMillisCounterUpdate() throws Exception {
-// verifySlotMillis(2048, 2048, 1024);
-// verifySlotMillis(2048, 1024, 1024);
-// verifySlotMillis(10240, 1024, 2048);
-// }
-//
-// public void verifySlotMillis(int mapMemMb, int reduceMemMb,
-// int minContainerSize) throws Exception {
-// Clock actualClock = new SystemClock();
-// ControlledClock clock = new ControlledClock(actualClock);
-// clock.setTime(10);
-// MRApp app =
-// new MRApp(1, 1, false, "testSlotMillisCounterUpdate", true, clock);
-// Configuration conf = new Configuration();
-// conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemMb);
-// conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, reduceMemMb);
-// app.setClusterInfo(new ClusterInfo(BuilderUtils
-// .newResource(minContainerSize), BuilderUtils.newResource(10240)));
-//
-// Job job = app.submit(conf);
-// app.waitForState(job, JobState.RUNNING);
-// Map<TaskId, Task> tasks = job.getTasks();
-// Assert.assertEquals("Num tasks is not correct", 2, tasks.size());
-// Iterator<Task> taskIter = tasks.values().iterator();
-// Task mTask = taskIter.next();
-// app.waitForState(mTask, TaskState.RUNNING);
-// Task rTask = taskIter.next();
-// app.waitForState(rTask, TaskState.RUNNING);
-// Map<TaskAttemptId, TaskAttempt> mAttempts = mTask.getAttempts();
-// Assert.assertEquals("Num attempts is not correct", 1, mAttempts.size());
-// Map<TaskAttemptId, TaskAttempt> rAttempts = rTask.getAttempts();
-// Assert.assertEquals("Num attempts is not correct", 1, rAttempts.size());
-// TaskAttempt mta = mAttempts.values().iterator().next();
-// TaskAttempt rta = rAttempts.values().iterator().next();
-// app.waitForState(mta, TaskAttemptState.RUNNING);
-// app.waitForState(rta, TaskAttemptState.RUNNING);
-//
-// clock.setTime(11);
-// app.getContext()
-// .getEventHandler()
-// .handle(new TaskAttemptEvent(mta.getID(), TaskAttemptEventType.TA_DONE));
-// app.getContext()
-// .getEventHandler()
-// .handle(new TaskAttemptEvent(rta.getID(), TaskAttemptEventType.TA_DONE));
-// app.waitForState(job, JobState.SUCCEEDED);
-// Assert.assertEquals(mta.getFinishTime(), 11);
-// Assert.assertEquals(mta.getLaunchTime(), 10);
-// Assert.assertEquals(rta.getFinishTime(), 11);
-// Assert.assertEquals(rta.getLaunchTime(), 10);
-// Assert.assertEquals((int) Math.ceil((float) mapMemMb / minContainerSize),
-// job.getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_MAPS)
-// .getValue());
-// Assert.assertEquals(
-// (int) Math.ceil((float) reduceMemMb / minContainerSize), job
-// .getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_REDUCES)
-// .getValue());
-// }
-//
-// private TaskAttemptImpl createMapTaskAttemptImplForTest(
-// EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
-// Clock clock = new SystemClock();
-// return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
-// }
-//
-// private TaskAttemptImpl createMapTaskAttemptImplForTest(
-// EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
-// ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
-// JobId jobId = MRBuilderUtils.newJobId(appId, 1);
-// TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
-// TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-// Path jobFile = mock(Path.class);
-// JobConf jobConf = new JobConf();
-// OutputCommitter outputCommitter = mock(OutputCommitter.class);
-// TaskAttemptImpl taImpl =
-// new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
-// taskSplitMetaInfo, jobConf, taListener, outputCommitter, null,
-// null, clock, null);
-// return taImpl;
-// }
-//
-// private void testMRAppHistory(MRApp app) throws Exception {
-// Configuration conf = new Configuration();
-// Job job = app.submit(conf);
-// app.waitForState(job, JobState.FAILED);
-// Map<TaskId, Task> tasks = job.getTasks();
-//
-// Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
-// Task task = tasks.values().iterator().next();
-// Assert.assertEquals("Task state not correct", TaskState.FAILED, task
-// .getReport().getTaskState());
-// Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator().next()
-// .getAttempts();
-// Assert.assertEquals("Num attempts is not correct", 4, attempts.size());
-//
-// Iterator<TaskAttempt> it = attempts.values().iterator();
-// TaskAttemptReport report = it.next().getReport();
-// Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
-// report.getTaskAttemptState());
-// Assert.assertEquals("Diagnostic Information is not Correct",
-// "Test Diagnostic Event", report.getDiagnosticInfo());
-// report = it.next().getReport();
-// Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
-// report.getTaskAttemptState());
-// }
-//
-// static class FailingAttemptsMRApp extends MRApp {
-// FailingAttemptsMRApp(int maps, int reduces) {
-// super(maps, reduces, true, "FailingAttemptsMRApp", true);
-// }
-//
-// @Override
-// protected void attemptLaunched(TaskAttemptId attemptID) {
-// getContext().getEventHandler().handle(
-// new TaskAttemptDiagnosticsUpdateEvent(attemptID,
-// "Test Diagnostic Event"));
-// getContext().getEventHandler().handle(
-// new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
-// }
-//
-// protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
-// AppContext context) {
-// return new EventHandler<JobHistoryEvent>() {
-// @Override
-// public void handle(JobHistoryEvent event) {
-// if (event.getType() == org.apache.hadoop.mapreduce.jobhistory.EventType.MAP_ATTEMPT_FAILED) {
-// TaskAttemptUnsuccessfulCompletion datum = (TaskAttemptUnsuccessfulCompletion) event
-// .getHistoryEvent().getDatum();
-// Assert.assertEquals("Diagnostic Information is not Correct",
-// "Test Diagnostic Event", datum.get(8).toString());
-// }
-// }
-// };
-// }
-// }
-//
-// @Test
-// public void testLaunchFailedWhileKilling() throws Exception {
-// ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
-// ApplicationAttemptId appAttemptId =
-// BuilderUtils.newApplicationAttemptId(appId, 0);
-// JobId jobId = MRBuilderUtils.newJobId(appId, 1);
-// TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
-// TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
-// Path jobFile = mock(Path.class);
-//
-// MockEventHandler eventHandler = new MockEventHandler();
-// TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-// when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
-//
-// JobConf jobConf = new JobConf();
-// jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
-// jobConf.setBoolean("fs.file.impl.disable.cache", true);
-// jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
-// jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
-//
-// TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
-// when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
-//
-// TaskAttemptImpl taImpl =
-// new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
-// splits, jobConf, taListener,
-// mock(OutputCommitter.class), mock(Token.class), new Credentials(),
-// new SystemClock(), null);
-//
-// NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
-// ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
-// Container container = mock(Container.class);
-// when(container.getId()).thenReturn(contId);
-// when(container.getNodeId()).thenReturn(nid);
-//
-// taImpl.handle(new TaskAttemptEvent(attemptId,
-// TaskAttemptEventType.TA_SCHEDULE));
-// taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
-// container, mock(Map.class)));
-// taImpl.handle(new TaskAttemptEvent(attemptId,
-// TaskAttemptEventType.TA_KILL));
-// taImpl.handle(new TaskAttemptEvent(attemptId,
-// TaskAttemptEventType.TA_CONTAINER_CLEANED));
-// taImpl.handle(new TaskAttemptEvent(attemptId,
-// TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
-// assertFalse(eventHandler.internalError);
-// }
-//
-// @Test
-// public void testContainerCleanedWhileRunning() throws Exception {
-// ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
-// ApplicationAttemptId appAttemptId =
-// BuilderUtils.newApplicationAttemptId(appId, 0);
-// JobId jobId = MRBuilderUtils.newJobId(appId, 1);
-// TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
-// TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
-// Path jobFile = mock(Path.class);
-//
-// MockEventHandler eventHandler = new MockEventHandler();
-// TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-// when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
-//
-// JobConf jobConf = new JobConf();
-// jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
-// jobConf.setBoolean("fs.file.impl.disable.cache", true);
-// jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
-// jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
-//
-// TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
-// when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
-//
-// AppContext appCtx = mock(AppContext.class);
-// ClusterInfo clusterInfo = mock(ClusterInfo.class);
-// Resource resource = mock(Resource.class);
-// when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
-// when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
-// when(resource.getMemory()).thenReturn(1024);
-//
-// TaskAttemptImpl taImpl =
-// new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
-// splits, jobConf, taListener,
-// mock(OutputCommitter.class), mock(Token.class), new Credentials(),
-// new SystemClock(), appCtx);
-//
-// NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
-// ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
-// Container container = mock(Container.class);
-// when(container.getId()).thenReturn(contId);
-// when(container.getNodeId()).thenReturn(nid);
-// when(container.getNodeHttpAddress()).thenReturn("localhost:0");
-//
-// taImpl.handle(new TaskAttemptEvent(attemptId,
-// TaskAttemptEventType.TA_SCHEDULE));
-// taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
-// container, mock(Map.class)));
-// taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
-// assertEquals("Task attempt is not in running state", taImpl.getState(),
-// TaskAttemptState.RUNNING);
-// taImpl.handle(new TaskAttemptEvent(attemptId,
-// TaskAttemptEventType.TA_CONTAINER_CLEANED));
-// assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
-// eventHandler.internalError);
-// }
-//
-// @Test
-// public void testContainerCleanedWhileCommitting() throws Exception {
-// ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
-// ApplicationAttemptId appAttemptId =
-// BuilderUtils.newApplicationAttemptId(appId, 0);
-// JobId jobId = MRBuilderUtils.newJobId(appId, 1);
-// TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
-// TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
-// Path jobFile = mock(Path.class);
-//
-// MockEventHandler eventHandler = new MockEventHandler();
-// TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-// when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
-//
-// JobConf jobConf = new JobConf();
-// jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
-// jobConf.setBoolean("fs.file.impl.disable.cache", true);
-// jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
-// jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
-//
-// TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
-// when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
-//
-// AppContext appCtx = mock(AppContext.class);
-// ClusterInfo clusterInfo = mock(ClusterInfo.class);
-// Resource resource = mock(Resource.class);
-// when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
-// when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
-// when(resource.getMemory()).thenReturn(1024);
-//
-// TaskAttemptImpl taImpl =
-// new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
-// splits, jobConf, taListener,
-// mock(OutputCommitter.class), mock(Token.class), new Credentials(),
-// new SystemClock(), appCtx);
-//
-// NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
-// ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
-// Container container = mock(Container.class);
-// when(container.getId()).thenReturn(contId);
-// when(container.getNodeId()).thenReturn(nid);
-// when(container.getNodeHttpAddress()).thenReturn("localhost:0");
-//
-// taImpl.handle(new TaskAttemptEvent(attemptId,
-// TaskAttemptEventType.TA_SCHEDULE));
-// taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
-// container, mock(Map.class)));
-// taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
-// taImpl.handle(new TaskAttemptEvent(attemptId,
-// TaskAttemptEventType.TA_COMMIT_PENDING));
-//
-// assertEquals("Task attempt is not in commit pending state", taImpl.getState(),
-// TaskAttemptState.COMMIT_PENDING);
-// taImpl.handle(new TaskAttemptEvent(attemptId,
-// TaskAttemptEventType.TA_CONTAINER_CLEANED));
-// assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
-// eventHandler.internalError);
-// }
-//
-// @Test
-// public void testDoubleTooManyFetchFailure() throws Exception {
-// ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
-// ApplicationAttemptId appAttemptId =
-// BuilderUtils.newApplicationAttemptId(appId, 0);
-// JobId jobId = MRBuilderUtils.newJobId(appId, 1);
-// TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
-// TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
-// Path jobFile = mock(Path.class);
-//
-// MockEventHandler eventHandler = new MockEventHandler();
-// TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-// when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
-//
-// JobConf jobConf = new JobConf();
-// jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
-// jobConf.setBoolean("fs.file.impl.disable.cache", true);
-// jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
-// jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
-//
-// TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
-// when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
-//
-// AppContext appCtx = mock(AppContext.class);
-// ClusterInfo clusterInfo = mock(ClusterInfo.class);
-// Resource resource = mock(Resource.class);
-// when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
-// when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
-// when(resource.getMemory()).thenReturn(1024);
-//
-// TaskAttemptImpl taImpl =
-// new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
-// splits, jobConf, taListener,
-// mock(OutputCommitter.class), mock(Token.class), new Credentials(),
-// new SystemClock(), appCtx);
-//
-// NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
-// ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
-// Container container = mock(Container.class);
-// when(container.getId()).thenReturn(contId);
-// when(container.getNodeId()).thenReturn(nid);
-// when(container.getNodeHttpAddress()).thenReturn("localhost:0");
-//
-// taImpl.handle(new TaskAttemptEvent(attemptId,
-// TaskAttemptEventType.TA_SCHEDULE));
-// taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
-// container, mock(Map.class)));
-// taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
-// taImpl.handle(new TaskAttemptEvent(attemptId,
-// TaskAttemptEventType.TA_DONE));
-// taImpl.handle(new TaskAttemptEvent(attemptId,
-// TaskAttemptEventType.TA_CONTAINER_CLEANED));
-//
-// assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
-// TaskAttemptState.SUCCEEDED);
-// taImpl.handle(new TaskAttemptEvent(attemptId,
-// TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
-// assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
-// TaskAttemptState.FAILED);
-// taImpl.handle(new TaskAttemptEvent(attemptId,
-// TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
-// assertEquals("Task attempt is not in FAILED state, still", taImpl.getState(),
-// TaskAttemptState.FAILED);
-// assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
-// eventHandler.internalError);
-// }
-//
-// public static class MockEventHandler implements EventHandler {
-// public boolean internalError;
-//
-// @Override
-// public void handle(Event event) {
-// if (event instanceof JobEvent) {
-// JobEvent je = ((JobEvent) event);
-// if (JobEventType.INTERNAL_ERROR == je.getType()) {
-// internalError = true;
-// }
-// }
-// }
-//
-// };
-//}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.job.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapTaskAttemptImpl;
+import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.ControlledClock;
+import org.apache.hadoop.mapreduce.v2.app2.MRApp;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptContainerLaunchedEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class TestTaskAttempt{
+ @Test
+ public void testAttemptContainerRequest() throws Exception {
+ //WARNING: This test must run first. This is because there is an
+ // optimization where the credentials passed in are cached statically so
+ // they do not need to be recomputed when creating a new
+ // ContainerLaunchContext. if other tests run first this code will cache
+ // their credentials and this test will fail trying to look for the
+ // credentials it inserted in.
+ final Text SECRET_KEY_ALIAS = new Text("secretkeyalias");
+ final byte[] SECRET_KEY = ("secretkey").getBytes();
+ Map<ApplicationAccessType, String> acls =
+ new HashMap<ApplicationAccessType, String>(1);
+ acls.put(ApplicationAccessType.VIEW_APP, "otheruser");
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ Path jobFile = mock(Path.class);
+
+ EventHandler eventHandler = mock(EventHandler.class);
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+
+ // setup UGI for security so tokens and keys are preserved
+ jobConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(jobConf);
+
+ Credentials credentials = new Credentials();
+ credentials.addSecretKey(SECRET_KEY_ALIAS, SECRET_KEY);
+ Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
+ ("tokenid").getBytes(), ("tokenpw").getBytes(),
+ new Text("tokenkind"), new Text("tokenservice"));
+
+ TaskAttemptImpl taImpl =
+ new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+ mock(TaskSplitMetaInfo.class), jobConf, taListener,
+ mock(OutputCommitter.class), jobToken, credentials,
+ new SystemClock(), null);
+
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());
+ ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
+
+ ContainerLaunchContext launchCtx =
+ TaskAttemptImpl.createContainerLaunchContext(acls, containerId,
+ jobConf, jobToken, taImpl.createRemoteTask(),
+ TypeConverter.fromYarn(jobId), mock(Resource.class),
+ mock(WrappedJvmID.class), taListener,
+ credentials);
+
+ Assert.assertEquals("ACLs mismatch", acls, launchCtx.getApplicationACLs());
+ Credentials launchCredentials = new Credentials();
+
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ dibb.reset(launchCtx.getContainerTokens());
+ launchCredentials.readTokenStorageStream(dibb);
+
+ // verify all tokens specified for the task attempt are in the launch context
+ for (Token<? extends TokenIdentifier> token : credentials.getAllTokens()) {
+ Token<? extends TokenIdentifier> launchToken =
+ launchCredentials.getToken(token.getService());
+ Assert.assertNotNull("Token " + token.getService() + " is missing",
+ launchToken);
+ Assert.assertEquals("Token " + token.getService() + " mismatch",
+ token, launchToken);
+ }
+
+ // verify the secret key is in the launch context
+ Assert.assertNotNull("Secret key missing",
+ launchCredentials.getSecretKey(SECRET_KEY_ALIAS));
+ Assert.assertTrue("Secret key mismatch", Arrays.equals(SECRET_KEY,
+ launchCredentials.getSecretKey(SECRET_KEY_ALIAS)));
+ }
+
+ static public class StubbedFS extends RawLocalFileSystem {
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ return new FileStatus(1, false, 1, 1, 1, f);
+ }
+ }
+
+ @Test
+ public void testMRAppHistoryForMap() throws Exception {
+ MRApp app = new FailingAttemptsMRApp(1, 0);
+ testMRAppHistory(app);
+ }
+
+ @Test
+ public void testMRAppHistoryForReduce() throws Exception {
+ MRApp app = new FailingAttemptsMRApp(0, 1);
+ testMRAppHistory(app);
+ }
+
+ @Test
+ public void testSingleRackRequest() throws Exception {
+ TaskAttemptImpl.RequestContainerTransition rct =
+ new TaskAttemptImpl.RequestContainerTransition(false);
+
+ EventHandler eventHandler = mock(EventHandler.class);
+ String[] hosts = new String[3];
+ hosts[0] = "host1";
+ hosts[1] = "host2";
+ hosts[2] = "host3";
+ TaskSplitMetaInfo splitInfo =
+ new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l);
+
+ TaskAttemptImpl mockTaskAttempt =
+ createMapTaskAttemptImplForTest(eventHandler, splitInfo);
+ TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
+
+ rct.transition(mockTaskAttempt, mockTAEvent);
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(2)).handle(arg.capture());
+ if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
+ Assert.fail("Second Event not of type ContainerRequestEvent");
+ }
+ ContainerRequestEvent cre =
+ (ContainerRequestEvent) arg.getAllValues().get(1);
+ String[] requestedRacks = cre.getRacks();
+ //Only a single occurrence of /DefaultRack
+ assertEquals(1, requestedRacks.length);
+ }
+
+ @Test
+ public void testHostResolveAttempt() throws Exception {
+ TaskAttemptImpl.RequestContainerTransition rct =
+ new TaskAttemptImpl.RequestContainerTransition(false);
+
+ EventHandler eventHandler = mock(EventHandler.class);
+ String[] hosts = new String[3];
+ hosts[0] = "192.168.1.1";
+ hosts[1] = "host2";
+ hosts[2] = "host3";
+ TaskSplitMetaInfo splitInfo =
+ new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l);
+
+ TaskAttemptImpl mockTaskAttempt =
+ createMapTaskAttemptImplForTest(eventHandler, splitInfo);
+ TaskAttemptImpl spyTa = spy(mockTaskAttempt);
+ when(spyTa.resolveHost(hosts[0])).thenReturn("host1");
+
+ TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
+ rct.transition(spyTa, mockTAEvent);
+ verify(spyTa).resolveHost(hosts[0]);
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(2)).handle(arg.capture());
+ if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
+ Assert.fail("Second Event not of type ContainerRequestEvent");
+ }
+ Map<String, Boolean> expected = new HashMap<String, Boolean>();
+ expected.put("host1", true);
+ expected.put("host2", true);
+ expected.put("host3", true);
+ ContainerRequestEvent cre =
+ (ContainerRequestEvent) arg.getAllValues().get(1);
+ String[] requestedHosts = cre.getHosts();
+ for (String h : requestedHosts) {
+ expected.remove(h);
+ }
+ assertEquals(0, expected.size());
+ }
+
+ @Test
+ public void testSlotMillisCounterUpdate() throws Exception {
+ verifySlotMillis(2048, 2048, 1024);
+ verifySlotMillis(2048, 1024, 1024);
+ verifySlotMillis(10240, 1024, 2048);
+ }
+
+ public void verifySlotMillis(int mapMemMb, int reduceMemMb,
+ int minContainerSize) throws Exception {
+ Clock actualClock = new SystemClock();
+ ControlledClock clock = new ControlledClock(actualClock);
+ clock.setTime(10);
+ MRApp app =
+ new MRApp(1, 1, false, "testSlotMillisCounterUpdate", true, clock);
+ Configuration conf = new Configuration();
+ conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemMb);
+ conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, reduceMemMb);
+ app.setClusterInfo(new ClusterInfo(BuilderUtils
+ .newResource(minContainerSize), BuilderUtils.newResource(10240)));
+
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Map<TaskId, Task> tasks = job.getTasks();
+ Assert.assertEquals("Num tasks is not correct", 2, tasks.size());
+ Iterator<Task> taskIter = tasks.values().iterator();
+ Task mTask = taskIter.next();
+ app.waitForState(mTask, TaskState.RUNNING);
+ Task rTask = taskIter.next();
+ app.waitForState(rTask, TaskState.RUNNING);
+ Map<TaskAttemptId, TaskAttempt> mAttempts = mTask.getAttempts();
+ Assert.assertEquals("Num attempts is not correct", 1, mAttempts.size());
+ Map<TaskAttemptId, TaskAttempt> rAttempts = rTask.getAttempts();
+ Assert.assertEquals("Num attempts is not correct", 1, rAttempts.size());
+ TaskAttempt mta = mAttempts.values().iterator().next();
+ TaskAttempt rta = rAttempts.values().iterator().next();
+ app.waitForState(mta, TaskAttemptState.RUNNING);
+ app.waitForState(rta, TaskAttemptState.RUNNING);
+
+ clock.setTime(11);
+ app.getContext()
+ .getEventHandler()
+ .handle(new TaskAttemptEvent(mta.getID(), TaskAttemptEventType.TA_DONE));
+ app.getContext()
+ .getEventHandler()
+ .handle(new TaskAttemptEvent(rta.getID(), TaskAttemptEventType.TA_DONE));
+ app.waitForState(job, JobState.SUCCEEDED);
+ Assert.assertEquals(mta.getFinishTime(), 11);
+ Assert.assertEquals(mta.getLaunchTime(), 10);
+ Assert.assertEquals(rta.getFinishTime(), 11);
+ Assert.assertEquals(rta.getLaunchTime(), 10);
+ Assert.assertEquals((int) Math.ceil((float) mapMemMb / minContainerSize),
+ job.getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_MAPS)
+ .getValue());
+ Assert.assertEquals(
+ (int) Math.ceil((float) reduceMemMb / minContainerSize), job
+ .getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_REDUCES)
+ .getValue());
+ }
+
+ private TaskAttemptImpl createMapTaskAttemptImplForTest(
+ EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
+ Clock clock = new SystemClock();
+ return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
+ }
+
+ private TaskAttemptImpl createMapTaskAttemptImplForTest(
+ EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ Path jobFile = mock(Path.class);
+ JobConf jobConf = new JobConf();
+ OutputCommitter outputCommitter = mock(OutputCommitter.class);
+ TaskAttemptImpl taImpl =
+ new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+ taskSplitMetaInfo, jobConf, taListener, outputCommitter, null,
+ null, clock, null);
+ return taImpl;
+ }
+
+ private void testMRAppHistory(MRApp app) throws Exception {
+ Configuration conf = new Configuration();
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.FAILED);
+ Map<TaskId, Task> tasks = job.getTasks();
+
+ Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
+ Task task = tasks.values().iterator().next();
+ Assert.assertEquals("Task state not correct", TaskState.FAILED, task
+ .getReport().getTaskState());
+ Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator().next()
+ .getAttempts();
+ Assert.assertEquals("Num attempts is not correct", 4, attempts.size());
+
+ Iterator<TaskAttempt> it = attempts.values().iterator();
+ TaskAttemptReport report = it.next().getReport();
+ Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+ report.getTaskAttemptState());
+ Assert.assertEquals("Diagnostic Information is not Correct",
+ "Test Diagnostic Event", report.getDiagnosticInfo());
+ report = it.next().getReport();
+ Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+ report.getTaskAttemptState());
+ }
+
+ static class FailingAttemptsMRApp extends MRApp {
+ FailingAttemptsMRApp(int maps, int reduces) {
+ super(maps, reduces, true, "FailingAttemptsMRApp", true);
+ }
+
+ @Override
+ protected void attemptLaunched(TaskAttemptId attemptID) {
+ getContext().getEventHandler().handle(
+ new TaskAttemptDiagnosticsUpdateEvent(attemptID,
+ "Test Diagnostic Event"));
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+ }
+
+ protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+ AppContext context) {
+ return new EventHandler<JobHistoryEvent>() {
+ @Override
+ public void handle(JobHistoryEvent event) {
+ if (event.getType() == org.apache.hadoop.mapreduce.jobhistory.EventType.MAP_ATTEMPT_FAILED) {
+ TaskAttemptUnsuccessfulCompletion datum = (TaskAttemptUnsuccessfulCompletion) event
+ .getHistoryEvent().getDatum();
+ Assert.assertEquals("Diagnostic Information is not Correct",
+ "Test Diagnostic Event", datum.get(8).toString());
+ }
+ }
+ };
+ }
+ }
+
+ @Test
+ public void testLaunchFailedWhileKilling() throws Exception {
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(appId, 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+ TaskAttemptImpl taImpl =
+ new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+ splits, jobConf, taListener,
+ mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+ new SystemClock(), null);
+
+ NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+ ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+ container, mock(Map.class)));
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_KILL));
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
+ assertFalse(eventHandler.internalError);
+ }
+
+ @Test
+ public void testContainerCleanedWhileRunning() throws Exception {
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(appId, 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ Resource resource = mock(Resource.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+ when(resource.getMemory()).thenReturn(1024);
+
+ TaskAttemptImpl taImpl =
+ new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+ splits, jobConf, taListener,
+ mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+ new SystemClock(), appCtx);
+
+ NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+ ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+ container, mock(Map.class)));
+ taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+ assertEquals("Task attempt is not in running state", taImpl.getState(),
+ TaskAttemptState.RUNNING);
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
+ eventHandler.internalError);
+ }
+
+ @Test
+ public void testContainerCleanedWhileCommitting() throws Exception {
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(appId, 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ Resource resource = mock(Resource.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+ when(resource.getMemory()).thenReturn(1024);
+
+ TaskAttemptImpl taImpl =
+ new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+ splits, jobConf, taListener,
+ mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+ new SystemClock(), appCtx);
+
+ NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+ ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+ container, mock(Map.class)));
+ taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_COMMIT_PENDING));
+
+ assertEquals("Task attempt is not in commit pending state", taImpl.getState(),
+ TaskAttemptState.COMMIT_PENDING);
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
+ eventHandler.internalError);
+ }
+
+ @Test
+ public void testDoubleTooManyFetchFailure() throws Exception {
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(appId, 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ Resource resource = mock(Resource.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+ when(resource.getMemory()).thenReturn(1024);
+
+ TaskAttemptImpl taImpl =
+ new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+ splits, jobConf, taListener,
+ mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+ new SystemClock(), appCtx);
+
+ NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+ ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+ container, mock(Map.class)));
+ taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_DONE));
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+
+ assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
+ TaskAttemptState.SUCCEEDED);
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+ assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
+ TaskAttemptState.FAILED);
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+ assertEquals("Task attempt is not in FAILED state, still", taImpl.getState(),
+ TaskAttemptState.FAILED);
+ assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
+ eventHandler.internalError);
+ }
+
+ public static class MockEventHandler implements EventHandler {
+ public boolean internalError;
+
+ @Override
+ public void handle(Event event) {
+ if (event instanceof JobEvent) {
+ JobEvent je = ((JobEvent) event);
+ if (JobEventType.INTERNAL_ERROR == je.getType()) {
+ internalError = true;
+ }
+ }
+ }
+
+ };
+}