You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2009/08/28 02:12:21 UTC
svn commit: r808686 [9/9] - in /hadoop/mapreduce/trunk: ./ ivy/
src/java/org/apache/hadoop/mapred/
src/test/mapred/org/apache/hadoop/tools/rumen/ src/test/tools/
src/test/tools/data/ src/test/tools/data/rumen/
src/test/tools/data/rumen/histogram-tests/...
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java?rev=808686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java Fri Aug 28 00:12:18 2009
@@ -0,0 +1,693 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.HashMap;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobHistory.Values;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobHistory;
+import org.apache.hadoop.mapred.TaskStatus.State;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * {@link ZombieJob} is a layer above {@link LoggedJob} raw JSON objects.
+ *
+ * Each <code>ZombieJob</code> object represents a job in job history. For
+ * everything that exists in job history, contents are returned unchanged
+ * faithfully. To get input splits of a non-exist task, a non-exist task
+ * attempt, or an ill-formed task attempt, proper objects are made up from
+ * statistical sketch.
+ */
+public class ZombieJob implements JobStory {
+
+ private final LoggedJob job;
+
+ private final Map<TaskID, LoggedTask> loggedTaskMap = new HashMap<TaskID, LoggedTask>();
+
+ private final Map<TaskAttemptID, LoggedTaskAttempt> loggedTaskAttemptMap = new HashMap<TaskAttemptID, LoggedTaskAttempt>();
+
+ private final Random random;
+
+ private FileSplit[] splits;
+
+ private final LoggedNetworkTopology topology;
+
+ // TODO: Fix ZombieJob to initialize this correctly from observed data
+ double rackLocalOverNodeLocal = 1.5;
+ // TODO: Fix ZombieJob to initialize this correctly from observed data
+ double rackRemoteOverNodeLocal = 3.0;
+
+ /**
+ * This constructor creates a {@link ZombieJob} with the same semantics as the
+ * {@link LoggedJob} passed in this parameter
+ *
+ * @param job
+ * The dead job this ZombieJob instance is based on.
+ * @param topology
+ * The topology of the network where the dead job ran on.
+ * @param seed
+ * Seed for the random number generator for filling in information
+ * not available from the ZombieJob.
+ */
+ public ZombieJob(LoggedJob job, LoggedNetworkTopology topology, long seed) {
+ if (job == null || topology == null) {
+ throw new IllegalArgumentException("job or topology is null");
+ }
+ this.job = job;
+ this.topology = topology;
+ random = new Random(seed);
+ }
+
+ /**
+ * This constructor creates a {@link ZombieJob} with the same semantics as the
+ * {@link LoggedJob} passed in this parameter
+ *
+ * @param job
+ * The dead job this ZombieJob instance is based on.
+ * @param topology
+ * The topology of the network where the dead job ran on.
+ */
+ public ZombieJob(LoggedJob job, LoggedNetworkTopology topology) {
+ this(job, topology, System.nanoTime());
+ }
+
+ private State convertState(JobHistory.Values status) {
+ if (status == JobHistory.Values.SUCCESS) {
+ return State.SUCCEEDED;
+ } else if (status == JobHistory.Values.FAILED) {
+ return State.FAILED;
+ } else if (status == JobHistory.Values.KILLED) {
+ return State.KILLED;
+ } else {
+ throw new IllegalArgumentException("unknown status " + status);
+ }
+ }
+
+ public JobConf getJobConf() {
+
+ // TODO : add more to jobConf ?
+ JobConf jobConf = new JobConf();
+ jobConf.setJobName(getName());
+ jobConf.setUser(getUser());
+ jobConf.setNumMapTasks(getNumberMaps());
+ jobConf.setNumReduceTasks(getNumberReduces());
+ return jobConf;
+
+ }
+
+ @Override
+ public InputSplit[] getInputSplits() {
+ if (splits == null) {
+ List<FileSplit> splitsList = new ArrayList<FileSplit>();
+ Path emptyPath = new Path("/");
+ for (LoggedTask mapTask : job.getMapTasks()) {
+ ArrayList<LoggedLocation> locations = mapTask.getPreferredLocations();
+ String[] hosts = new String[locations.size()];
+ int i = 0;
+ for (LoggedLocation location : locations) {
+ List<String> layers = location.getLayers();
+ if (layers.size() == 0) {
+ continue;
+ }
+ String host = "";
+ /*
+ * for (String layer: location.getLayers()) { host +=
+ * (File.separatorChar + layer); }
+ */
+ host = layers.get(layers.size() - 1);
+ hosts[i++] = host;
+ }
+ long mapInputBytes = mapTask.getInputBytes();
+ splitsList.add(new FileSplit(emptyPath, 0,
+ ((mapInputBytes > 0) ? mapInputBytes : 0), hosts));
+ }
+
+ // If not all map tasks are in job trace, should make up some splits
+ // for missing map tasks.
+ int totalMaps = job.getTotalMaps();
+ for (int i = splitsList.size(); i < totalMaps; i++) {
+ ArrayList<String> hosts = new ArrayList<String>();
+ // assume replication factor is 3.
+ while (hosts.size() < 3) {
+ List<LoggedNetworkTopology> racks = topology.getChildren();
+ LoggedNetworkTopology rack = racks.get(random.nextInt(racks.size()));
+ List<LoggedNetworkTopology> nodes = rack.getChildren();
+ String host = nodes.get(random.nextInt(nodes.size())).getName();
+ if (!hosts.contains(host)) {
+ hosts.add(host);
+ }
+ }
+ String[] hostsArray = hosts.toArray(new String[hosts.size()]);
+ // TODO set size of a split to 0 now.
+ splitsList.add(new FileSplit(emptyPath, 0, 0, hostsArray));
+ }
+
+ if (splitsList.size() == 0) {
+ System.err.println(job.getMapTasks().size());
+ }
+
+ splits = splitsList.toArray(new FileSplit[splitsList.size()]);
+ }
+ return splits;
+ }
+
+ @Override
+ public String getName() {
+ String jobName = job.getJobName();
+ if (jobName == null) {
+ return "";
+ } else {
+ return jobName;
+ }
+ }
+
+ @Override
+ public JobID getJobID() {
+ return JobID.forName(getLoggedJob().getJobID());
+ }
+
+ @Override
+ public int getNumberMaps() {
+ return job.getTotalMaps();
+ }
+
+ @Override
+ public int getNumberReduces() {
+ return job.getTotalReduces();
+ }
+
+ @Override
+ public long getSubmissionTime() {
+ return job.getSubmitTime() - job.getRelativeTime();
+ }
+
+ /**
+ * Mask the job ID part in a {@link TaskID}.
+ *
+ * @param taskId
+ * raw {@link TaskID} read from trace
+ * @return masked {@link TaskID} with empty {@link JobID}.
+ */
+ private TaskID maskTaskID(TaskID taskId) {
+ JobID jobId = new JobID();
+ TaskType taskType = taskId.getTaskType();
+ return new TaskID(jobId, taskType, taskId.getId());
+ }
+
+ /**
+ * Mask the job ID part in a {@link TaskAttemptID}.
+ *
+ * @param attemptId
+ * raw {@link TaskAttemptID} read from trace
+ * @return masked {@link TaskAttemptID} with empty {@link JobID}.
+ */
+ private TaskAttemptID maskAttemptID(TaskAttemptID attemptId) {
+ JobID jobId = new JobID();
+ TaskType taskType = attemptId.getTaskType();
+ TaskID taskId = attemptId.getTaskID();
+ return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), taskType,
+ taskId.getId(), attemptId.getId());
+ }
+
+ /**
+ * Build task mapping and task attempt mapping, to be later used to find
+ * information of a particular {@link TaskID} or {@link TaskAttemptID}.
+ */
+ private void buildMaps() {
+ for (LoggedTask map : job.getMapTasks()) {
+ loggedTaskMap.put(maskTaskID(TaskID.forName(map.taskID)), map);
+
+ for (LoggedTaskAttempt mapAttempt : map.getAttempts()) {
+ TaskAttemptID id = TaskAttemptID.forName(mapAttempt.getAttemptID());
+ loggedTaskAttemptMap.put(maskAttemptID(id), mapAttempt);
+ }
+ }
+ for (LoggedTask reduce : job.getReduceTasks()) {
+ loggedTaskMap.put(maskTaskID(TaskID.forName(reduce.taskID)), reduce);
+
+ for (LoggedTaskAttempt reduceAttempt : reduce.getAttempts()) {
+ TaskAttemptID id = TaskAttemptID.forName(reduceAttempt.getAttemptID());
+ loggedTaskAttemptMap.put(maskAttemptID(id), reduceAttempt);
+ }
+ }
+
+ // do not care about "other" tasks, "setup" or "clean"
+ }
+
+ @Override
+ public String getUser() {
+ return job.getUser();
+ }
+
+ /**
+ * Get the underlining {@link LoggedJob} object read directly from the trace.
+ * This is mainly for debugging.
+ *
+ * @return the underlining {@link LoggedJob} object
+ */
+ public LoggedJob getLoggedJob() {
+ return job;
+ }
+
+ /**
+ * Get a {@link TaskAttemptInfo} with a {@link TaskAttemptID} associated with
+ * taskType, taskNumber, and taskAttemptNumber. This function does not care
+ * about locality, and follows the following decision logic: 1. Make up a
+ * {@link TaskAttemptInfo} if the task attempt is missing in trace, 2. Make up
+ * a {@link TaskAttemptInfo} if the task attempt has a KILLED final status in
+ * trace, 3. Otherwise (final state is SUCCEEDED or FAILED), construct the
+ * {@link TaskAttemptInfo} from the trace.
+ */
+ public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber,
+ int taskAttemptNumber) {
+ // does not care about locality. assume default locality is NODE_LOCAL.
+ // But if both task and task attempt exist in trace, use logged locality.
+ int locality = 0;
+ LoggedTask loggedTask = getLoggedTask(taskType, taskNumber);
+ if (loggedTask == null) {
+ // TODO insert parameters
+ TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0);
+ return makeUpInfo(taskType, taskInfo, taskAttemptNumber, locality);
+ }
+
+ LoggedTaskAttempt loggedAttempt = getLoggedTaskAttempt(taskType,
+ taskNumber, taskAttemptNumber);
+ if (loggedAttempt == null) {
+ // Task exists, but attempt is missing.
+ TaskInfo taskInfo = getTaskInfo(loggedTask);
+ return makeUpInfo(taskType, taskInfo, taskAttemptNumber, locality);
+ } else {
+ // Task and TaskAttempt both exist.
+ try {
+ return getInfo(loggedTask, loggedAttempt);
+ } catch (IllegalArgumentException e) {
+ if (e.getMessage().startsWith("status cannot be")) {
+ TaskInfo taskInfo = getTaskInfo(loggedTask);
+ return makeUpInfo(taskType, taskInfo, taskAttemptNumber, locality);
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+
+ @Override
+ public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+ return getTaskInfo(getLoggedTask(taskType, taskNumber));
+ }
+
+ /**
+ * Get a {@link TaskAttemptInfo} with a {@link TaskAttemptID} associated with
+ * taskType, taskNumber, and taskAttemptNumber. This function considers
+ * locality, and follows the following decision logic: 1. Make up a
+ * {@link TaskAttemptInfo} if the task attempt is missing in trace, 2. Make up
+ * a {@link TaskAttemptInfo} if the task attempt has a KILLED final status in
+ * trace, 3. If final state is FAILED, construct a {@link TaskAttemptInfo}
+ * from the trace, without considering locality. 4. If final state is
+ * SUCCEEDED, construct a {@link TaskAttemptInfo} from the trace, with runtime
+ * scaled according to locality in simulation and locality in trace.
+ */
+ @Override
+ public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
+ int taskAttemptNumber, int locality) {
+ TaskType taskType = TaskType.MAP;
+ LoggedTask loggedTask = getLoggedTask(taskType, taskNumber);
+ if (loggedTask == null) {
+ // TODO insert parameters
+ TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0);
+ return makeUpInfo(taskType, taskInfo, taskAttemptNumber, locality);
+ }
+
+ LoggedTaskAttempt loggedAttempt = getLoggedTaskAttempt(taskType,
+ taskNumber, taskAttemptNumber);
+ if (loggedAttempt == null) {
+ // Task exists, but attempt is missing.
+ TaskInfo taskInfo = getTaskInfo(loggedTask);
+ return makeUpInfo(taskType, taskInfo, taskAttemptNumber, locality);
+ } else {
+ // Task and TaskAttempt both exist.
+ if (loggedAttempt.getResult() == Values.KILLED) {
+ TaskInfo taskInfo = getTaskInfo(loggedTask);
+ return makeUpInfo(taskType, taskInfo, taskAttemptNumber, locality);
+ } else if (loggedAttempt.getResult() == Values.FAILED) {
+ // FAILED attempt is not affected by locality
+ // XXX however, made-up FAILED attempts ARE affected by locality, since
+ // statistics are present for attempts of different locality.
+ return getInfo(loggedTask, loggedAttempt);
+ } else if (loggedAttempt.getResult() == Values.SUCCESS) {
+ int loggedLocality = getLocality(loggedTask, loggedAttempt);
+ if (locality == loggedLocality) {
+ return getInfo(loggedTask, loggedAttempt);
+ } else {
+ // attempt succeeded in trace. It is scheduled in simulation with
+ // a different locality.
+ return scaleInfo(loggedTask, loggedAttempt, locality, loggedLocality,
+ rackLocalOverNodeLocal, rackRemoteOverNodeLocal);
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "attempt result is not SUCCEEDED, FAILED or KILLED: "
+ + loggedAttempt.getResult());
+ }
+ }
+ }
+
+ private TaskAttemptInfo scaleInfo(LoggedTask loggedTask,
+ LoggedTaskAttempt loggedAttempt, int locality, int loggedLocality,
+ double rackLocalOverNodeLocal, double rackRemoteOverNodeLocal) {
+ TaskInfo taskInfo = getTaskInfo(loggedTask);
+ double[] factors = new double[] { 1.0, rackLocalOverNodeLocal,
+ rackRemoteOverNodeLocal };
+ double scaleFactor = factors[locality] / factors[loggedLocality];
+ State state = convertState(loggedAttempt.getResult());
+ if (loggedTask.getTaskType() == Values.MAP) {
+ long taskTime = 0;
+ if (loggedAttempt.getStartTime() == 0) {
+ taskTime = makeUpMapRuntime(state, locality);
+ } else {
+ taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime();
+ }
+ taskTime *= scaleFactor;
+ return new MapTaskAttemptInfo(state, taskInfo, taskTime);
+ } else if (loggedTask.getTaskType() == Values.REDUCE) {
+ /*
+ * long shuffleTime = (loggedAttempt.getShuffleFinished() -
+ * loggedAttempt.getStartTime()); long sortTime =
+ * (loggedAttempt.getSortFinished() - loggedAttempt.getShuffleFinished());
+ * long reduceTime = (loggedAttempt.getFinishTime() -
+ * loggedAttempt.getSortFinished()); reduceTime *= scaleFactor; return new
+ * ReduceTaskAttemptInfo(convertState(loggedAttempt.getResult()),
+ * taskInfo, shuffleTime, sortTime, reduceTime);
+ */
+ throw new IllegalArgumentException("taskType cannot be REDUCE");
+ } else {
+ throw new IllegalArgumentException("taskType is neither MAP nor REDUCE: "
+ + loggedTask.getTaskType());
+ }
+ }
+
+ private int getLocality(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt) {
+ ParsedHost host = getParsedHost(loggedAttempt.getHostName());
+ int distance = 2;
+ for (LoggedLocation location : loggedTask.getPreferredLocations()) {
+ ParsedHost dataNode = new ParsedHost(location);
+ distance = Math.min(distance, host.distance(dataNode));
+ }
+ return distance;
+ }
+
+ private ParsedHost getParsedHost(String hostName) {
+ try {
+ return new ParsedHost(hostName);
+ } catch (IllegalArgumentException e) {
+ // look up the host in topology, return a correct hostname
+ for (LoggedNetworkTopology rack : topology.getChildren()) {
+ for (LoggedNetworkTopology host : rack.getChildren()) {
+ if (host.getName().equals(hostName)) {
+ return new ParsedHost(
+ new String[] { rack.getName(), host.getName() });
+ }
+ }
+ }
+ return new ParsedHost(new String[] { "default-rack", hostName });
+ }
+ }
+
+ private TaskAttemptInfo getInfo(LoggedTask loggedTask,
+ LoggedTaskAttempt loggedAttempt) {
+ TaskInfo taskInfo = getTaskInfo(loggedTask);
+ State state = convertState(loggedAttempt.getResult());
+ if (loggedTask.getTaskType() == Values.MAP) {
+ long taskTime;
+ if (loggedAttempt.getStartTime() == 0) {
+ int locality = getLocality(loggedTask, loggedAttempt);
+ taskTime = makeUpMapRuntime(state, locality);
+ } else {
+ taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime();
+ }
+ if (taskTime < 0) {
+ throw new IllegalStateException(loggedAttempt.getAttemptID()
+ + " taskTime<0: " + taskTime);
+ }
+ return new MapTaskAttemptInfo(state, taskInfo, taskTime);
+ } else if (loggedTask.getTaskType() == Values.REDUCE) {
+ long startTime = loggedAttempt.getStartTime();
+ long mergeDone = loggedAttempt.getSortFinished();
+ long shuffleDone = loggedAttempt.getShuffleFinished();
+ long finishTime = loggedAttempt.getFinishTime();
+ if (startTime <= 0 || startTime >= finishTime) {
+ // have seen startTime>finishTime.
+ // haven't seen reduce task with startTime=0 ever. But if this happens,
+ // make up a reduceTime with no shuffle/merge.
+ long reduceTime = makeUpReduceRuntime(state);
+ return new ReduceTaskAttemptInfo(state, taskInfo, 0, 0, reduceTime);
+ } else {
+ if (shuffleDone <= 0) {
+ shuffleDone = startTime;
+ }
+ if (mergeDone <= 0) {
+ mergeDone = finishTime;
+ }
+ long shuffleTime = shuffleDone - startTime;
+ long mergeTime = mergeDone - shuffleDone;
+ long reduceTime = finishTime - mergeDone;
+ if (reduceTime < 0) {
+ throw new IllegalStateException(loggedAttempt.getAttemptID()
+ + " reduceTime<0: " + reduceTime);
+ }
+ return new ReduceTaskAttemptInfo(state, taskInfo, shuffleTime,
+ mergeTime, reduceTime);
+ }
+ } else {
+ throw new IllegalArgumentException("taskType is neither MAP nor REDUCE: "
+ + loggedTask.getTaskType());
+ }
+ }
+
+ private TaskInfo getTaskInfo(LoggedTask loggedTask) {
+ // TODO insert maxMemory
+ TaskInfo taskInfo = new TaskInfo(loggedTask.getInputBytes(),
+ (int) (loggedTask.getInputBytes() / loggedTask.getInputRecords()),
+ loggedTask.getOutputBytes(),
+ (int) (loggedTask.getOutputBytes() / loggedTask.getOutputRecords()), 0);
+ return taskInfo;
+ }
+
+ private TaskAttemptInfo makeUpInfo(TaskType taskType, TaskInfo taskInfo,
+ int taskAttemptNumber, int locality) {
+ if (taskType == TaskType.MAP) {
+ State state = State.SUCCEEDED;
+ long runtime = 0;
+
+ // make up state
+ state = makeUpState(taskAttemptNumber, job.getMapperTriesToSucceed());
+ runtime = makeUpMapRuntime(state, locality);
+
+ TaskAttemptInfo tai = new MapTaskAttemptInfo(state, taskInfo, runtime);
+ return tai;
+ } else if (taskType == TaskType.REDUCE) {
+ State state = State.SUCCEEDED;
+ long shuffleTime = 0;
+ long sortTime = 0;
+ long reduceTime = 0;
+
+ // TODO make up state
+ // state = makeUpState(taskAttemptNumber, job.getReducerTriesToSucceed());
+ reduceTime = makeUpReduceRuntime(state);
+ TaskAttemptInfo tai = new ReduceTaskAttemptInfo(state, taskInfo,
+ shuffleTime, sortTime, reduceTime);
+ return tai;
+ }
+
+ throw new IllegalArgumentException("taskType is neither MAP nor REDUCE: "
+ + taskType);
+ }
+
+ private long makeUpReduceRuntime(State state) {
+ long reduceTime = 0;
+ for (int i = 0; i < 5; i++) {
+ reduceTime = doMakeUpReduceRuntime(state);
+ if (reduceTime >= 0) {
+ return reduceTime;
+ }
+ }
+ return 0;
+ // throw new IllegalStateException(" made-up reduceTime<0: " + reduceTime);
+ }
+
+ private long doMakeUpReduceRuntime(State state) {
+ long reduceTime;
+ try {
+ if (state == State.SUCCEEDED) {
+ reduceTime = makeUpRuntime(job.getSuccessfulReduceAttemptCDF());
+ } else if (state == State.FAILED) {
+ reduceTime = makeUpRuntime(job.getFailedReduceAttemptCDF());
+ } else {
+ throw new IllegalArgumentException(
+ "state is neither SUCCEEDED nor FAILED: " + state);
+ }
+ return reduceTime;
+ } catch (IllegalArgumentException e) {
+ if (e.getMessage().startsWith("no value to use to make up runtime")) {
+ return 0;
+ }
+ throw e;
+ }
+ }
+
+ private long makeUpMapRuntime(State state, int locality) {
+ long runtime;
+ // make up runtime
+ if (state == State.SUCCEEDED) {
+ // XXX MapCDFs is a ArrayList of 4 possible groups: distance=0, 1, 2, and
+ // the last group is "distance cannot be determined". All pig jobs
+ // would have only the 4th group, and pig tasks usually do not have
+ // any locality, so this group should count as "distance=2".
+ // However, setup/cleanup tasks are also counted in the 4th group.
+ // These tasks do not make sense.
+ try {
+ runtime = makeUpRuntime(job.getSuccessfulMapAttemptCDFs().get(locality));
+ } catch (IllegalArgumentException e) {
+ if (e.getMessage() == "no value to use to make up runtime") {
+ runtime = makeUpRuntime(job.getSuccessfulMapAttemptCDFs());
+ } else {
+ throw e;
+ }
+ }
+ } else if (state == State.FAILED) {
+ try {
+ runtime = makeUpRuntime(job.getFailedMapAttemptCDFs().get(locality));
+ } catch (IllegalArgumentException e) {
+ if (e.getMessage() == "no value to use to make up runtime") {
+ runtime = makeUpRuntime(job.getFailedMapAttemptCDFs());
+ } else {
+ throw e;
+ }
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "state is neither SUCCEEDED nor FAILED: " + state);
+ }
+ return runtime;
+ }
+
+ private long makeUpRuntime(ArrayList<LoggedDiscreteCDF> mapAttemptCDFs) {
+ int total = 0;
+ for (LoggedDiscreteCDF cdf : mapAttemptCDFs) {
+ total += cdf.getNumberValues();
+ }
+ if (total == 0) {
+ // consider no value to use to make up runtime.
+ return 0;
+ // throw new IllegalStateException("No task attempt statistics");
+ }
+ int index = random.nextInt(total);
+ for (LoggedDiscreteCDF cdf : mapAttemptCDFs) {
+ if (index >= cdf.getNumberValues()) {
+ index -= cdf.getNumberValues();
+ } else {
+ if (index < 0) {
+ throw new IllegalStateException("application error");
+ }
+ return makeUpRuntime(cdf);
+ }
+ }
+ throw new IllegalStateException("not possible to get here");
+ }
+
+ private long makeUpRuntime(LoggedDiscreteCDF loggedDiscreteCDF) {
+ ArrayList<LoggedSingleRelativeRanking> rankings = new ArrayList<LoggedSingleRelativeRanking>(
+ loggedDiscreteCDF.getRankings());
+ if (loggedDiscreteCDF.getNumberValues() == 0) {
+ throw new IllegalArgumentException("no value to use to make up runtime");
+ }
+
+ LoggedSingleRelativeRanking ranking = new LoggedSingleRelativeRanking();
+ ranking.setDatum(loggedDiscreteCDF.getMaximum());
+ ranking.setRelativeRanking(1.0);
+ rankings.add(ranking);
+
+ ranking = new LoggedSingleRelativeRanking();
+ ranking.setDatum(loggedDiscreteCDF.getMinimum());
+ ranking.setRelativeRanking(0.0);
+ rankings.add(0, ranking);
+
+ double r = random.nextDouble();
+ LoggedSingleRelativeRanking prevRanking = rankings.get(0);
+ for (LoggedSingleRelativeRanking ranking2 : rankings) {
+ double r2 = ranking2.getRelativeRanking();
+ if (r < r2) {
+ double r1 = prevRanking.getRelativeRanking();
+ double f1 = prevRanking.getDatum();
+ double f2 = ranking2.getDatum();
+ double runtime = (r - r1) / (r2 - r1) * (f2 - f1) + f1;
+ return (long) runtime;
+ }
+ prevRanking = ranking2;
+ }
+
+ return rankings.get(rankings.size() - 1).getDatum();
+ }
+
+ private State makeUpState(int taskAttemptNumber, double[] numAttempts) {
+ if (taskAttemptNumber >= numAttempts.length - 1) {
+ // always succeed
+ return State.SUCCEEDED;
+ } else {
+ double pSucceed = numAttempts[taskAttemptNumber];
+ double pFail = 0;
+ for (int i = taskAttemptNumber + 1; i < numAttempts.length; i++) {
+ pFail += numAttempts[i];
+ }
+ return (random.nextDouble() < pSucceed / (pSucceed + pFail)) ? State.SUCCEEDED
+ : State.FAILED;
+ }
+ }
+
+ private TaskID getMaskedTaskID(TaskType taskType, int taskNumber) {
+ return new TaskID(new JobID(), taskType, taskNumber);
+ }
+
+ private LoggedTask getLoggedTask(TaskType taskType, int taskNumber) {
+ if (loggedTaskMap.isEmpty()) {
+ buildMaps();
+ }
+ return loggedTaskMap.get(getMaskedTaskID(taskType, taskNumber));
+ }
+
+ private LoggedTaskAttempt getLoggedTaskAttempt(TaskType taskType,
+ int taskNumber, int taskAttemptNumber) {
+ TaskAttemptID id = new TaskAttemptID(getMaskedTaskID(taskType, taskNumber),
+ taskAttemptNumber);
+ if (loggedTaskAttemptMap.isEmpty()) {
+ buildMaps();
+ }
+ return loggedTaskAttemptMap.get(id);
+ }
+}