You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2009/10/18 21:27:34 UTC
svn commit: r826504 - in /hadoop/mapreduce/branches/branch-0.21: CHANGES.txt
src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java
src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
Author: cdouglas
Date: Sun Oct 18 19:27:33 2009
New Revision: 826504
URL: http://svn.apache.org/viewvc?rev=826504&view=rev
Log:
MAPREDUCE-931. Use built-in interpolation classes for making up task
runtimes in Rumen. Contributed by Dick King
Modified:
hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java
hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=826504&r1=826503&r2=826504&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Sun Oct 18 19:27:33 2009
@@ -416,6 +416,9 @@
MAPREDUCE-1063. Document gridmix benchmark. (cdouglas)
+ MAPREDUCE-931. Use built-in interpolation classes for making up task
+ runtimes in Rumen. (Dick King via cdouglas)
+
BUG FIXES
MAPREDUCE-878. Rename fair scheduler design doc to
Modified: hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java?rev=826504&r1=826503&r2=826504&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java Sun Oct 18 19:27:33 2009
@@ -39,16 +39,17 @@
final Random random;
CDFRandomGenerator(LoggedDiscreteCDF cdf) {
- random = new Random();
- rankings = new double[(int) cdf.getNumberValues() + 2];
- values = new long[(int) cdf.getNumberValues() + 2];
- initializeTables(cdf);
+ this(cdf, new Random());
}
CDFRandomGenerator(LoggedDiscreteCDF cdf, long seed) {
- random = new Random(seed);
- rankings = new double[(int) cdf.getNumberValues() + 2];
- values = new long[(int) cdf.getNumberValues() + 2];
+ this(cdf, new Random(seed));
+ }
+
+ private CDFRandomGenerator(LoggedDiscreteCDF cdf, Random random) {
+ this.random = random;
+ rankings = new double[cdf.getRankings().size() + 2];
+ values = new long[cdf.getRankings().size() + 2];
initializeTables(cdf);
}
Modified: hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java?rev=826504&r1=826503&r2=826504&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java Sun Oct 18 19:27:33 2009
@@ -56,6 +56,12 @@
private final ClusterStory cluster;
private JobConf jobConf;
+ private long seed;
+ private boolean hasRandomSeed = false;
+
+ private Map<LoggedDiscreteCDF, CDFRandomGenerator> interpolatorMap =
+ new HashMap<LoggedDiscreteCDF, CDFRandomGenerator>();
+
// TODO: Fix ZombieJob to initialize this correctly from observed data
double rackLocalOverNodeLocal = 1.5;
double rackRemoteOverNodeLocal = 3.0;
@@ -80,6 +86,8 @@
this.job = job;
this.cluster = cluster;
random = new Random(seed);
+ this.seed = seed;
+ hasRandomSeed = true;
}
/**
@@ -173,20 +181,23 @@
}
int avgHostPerSplit;
- if (splitsList.size() == 0) avgHostPerSplit = 3;
- else {
+ if (splitsList.size() == 0) {
+ avgHostPerSplit = 3;
+ } else {
avgHostPerSplit = totalHosts / splitsList.size();
- if (avgHostPerSplit == 0) avgHostPerSplit = 3;
+ if (avgHostPerSplit == 0) {
+ avgHostPerSplit = 3;
+ }
}
-
+
for (int i = splitsList.size(); i < totalMaps; i++) {
if (cluster == null) {
splitsList.add(new FileSplit(emptyPath, 0, 0, new String[0]));
} else {
MachineNode[] mNodes = cluster.getRandomMachines(avgHostPerSplit);
String[] hosts = new String[mNodes.length];
- for (int j=0; j<hosts.length; ++j) {
- hosts[j]=mNodes[j].getName();
+ for (int j = 0; j < hosts.length; ++j) {
+ hosts[j] = mNodes[j].getName();
}
// TODO set size of a split to 0 now.
splitsList.add(new FileSplit(emptyPath, 0, 0, hosts));
@@ -286,9 +297,10 @@
taskId.getId(), attemptId.getId());
}
-
private LoggedTask sanitizeLoggedTask(LoggedTask task) {
- if (task == null) return null;
+ if (task == null) {
+ return null;
+ }
if (task.getTaskType() == null) {
LOG.warn("Task " + task.getTaskID() + " has nulll TaskType");
return null;
@@ -299,17 +311,19 @@
}
return task;
}
-
+
private LoggedTaskAttempt sanitizeLoggedTaskAttempt(LoggedTaskAttempt attempt) {
- if (attempt == null) return null;
+ if (attempt == null) {
+ return null;
+ }
if (attempt.getResult() == null) {
LOG.warn("TaskAttempt " + attempt.getResult() + " has nulll Result");
return null;
}
-
+
return attempt;
}
-
+
/**
* Build task mapping and task attempt mapping, to be later used to find
* information of a particular {@link TaskID} or {@link TaskAttemptID}.
@@ -512,9 +526,13 @@
private int getLocality(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt) {
int distance = cluster.getMaximumDistance();
String rackHostName = loggedAttempt.getHostName();
- if (rackHostName == null) return distance;
+ if (rackHostName == null) {
+ return distance;
+ }
MachineNode mn = getMachineNode(rackHostName);
- if (mn == null) return distance;
+ if (mn == null) {
+ return distance;
+ }
List<LoggedLocation> locations = loggedTask.getPreferredLocations();
if (locations != null) {
for (LoggedLocation location : locations) {
@@ -696,40 +714,33 @@
"state is neither SUCCEEDED nor FAILED: " + state);
}
return reduceTime;
- } catch (IllegalArgumentException e) {
- if (e.getMessage().startsWith("no value to use to make up runtime")) {
- return 0;
- }
- throw e;
+ } catch (NoValueToMakeUpRuntime e) {
+ return 0;
}
}
private long makeUpMapRuntime(State state, int locality) {
long runtime;
// make up runtime
- if (state == State.SUCCEEDED) {
- /**
- * MapCDFs is a ArrayList of 4 possible groups: distance=0, 1, 2, and
- * the last group is "distance cannot be determined". All pig jobs would
- * have only the 4th group, and pig tasks usually do not have any
- * locality, so this group should count as "distance=2". However,
- * setup/cleanup tasks are also counted in the 4th group. These tasks do
- * not make sense.
- */
- runtime = makeUpRuntime(job.getSuccessfulMapAttemptCDFs().get(locality));
- if (runtime < 0) {
- runtime = makeUpRuntime(job.getSuccessfulMapAttemptCDFs());
- }
- } else if (state == State.FAILED) {
- runtime = makeUpRuntime(job.getFailedMapAttemptCDFs().get(locality));
- if (runtime < 0) {
- runtime = makeUpRuntime(job.getFailedMapAttemptCDFs());
- }
+ if (state == State.SUCCEEDED || state == State.FAILED) {
+ List<LoggedDiscreteCDF> cdfList =
+ state == State.SUCCEEDED ? job.getSuccessfulMapAttemptCDFs() : job
+ .getFailedMapAttemptCDFs();
+ // XXX MapCDFs is a ArrayList of 4 possible groups: distance=0, 1, 2, and
+ // the last group is "distance cannot be determined". All pig jobs
+ // would have only the 4th group, and pig tasks usually do not have
+ // any locality, so this group should count as "distance=2".
+ // However, setup/cleanup tasks are also counted in the 4th group.
+ // These tasks do not make sense.
+ try {
+ runtime = makeUpRuntime(cdfList.get(locality));
+ } catch (NoValueToMakeUpRuntime e) {
+ runtime = makeUpRuntime(cdfList);
+ }
} else {
throw new IllegalArgumentException(
"state is neither SUCCEEDED nor FAILED: " + state);
}
-
return runtime;
}
@@ -741,7 +752,7 @@
* A list of CDFs for the distribution of runtime for the 1st, 2nd,
* ... map attempts for the job.
*/
- private long makeUpRuntime(ArrayList<LoggedDiscreteCDF> mapAttemptCDFs) {
+ private long makeUpRuntime(List<LoggedDiscreteCDF> mapAttemptCDFs) {
int total = 0;
for (LoggedDiscreteCDF cdf : mapAttemptCDFs) {
total += cdf.getNumberValues();
@@ -763,39 +774,72 @@
throw new IllegalStateException("not possible to get here");
}
- // return -1 if we fail to makeup runtime with available info.
private long makeUpRuntime(LoggedDiscreteCDF loggedDiscreteCDF) {
- ArrayList<LoggedSingleRelativeRanking> rankings = new ArrayList<LoggedSingleRelativeRanking>(
- loggedDiscreteCDF.getRankings());
- if (loggedDiscreteCDF.getNumberValues() == 0) {
- return -1;
+ /*
+ * We need this odd-looking code because if a seed exists we need to ensure
+ * that only one interpolator is generated per LoggedDiscreteCDF, but if no
+ * seed exists then the potentially lengthy process of making an
+ * interpolator can happen outside the lock. makeUpRuntimeCore only locks
+ * around the two hash map accesses.
+ */
+ if (hasRandomSeed) {
+ synchronized (interpolatorMap) {
+ return makeUpRuntimeCore(loggedDiscreteCDF);
+ }
}
- LoggedSingleRelativeRanking ranking = new LoggedSingleRelativeRanking();
- ranking.setDatum(loggedDiscreteCDF.getMaximum());
- ranking.setRelativeRanking(1.0);
- rankings.add(ranking);
-
- ranking = new LoggedSingleRelativeRanking();
- ranking.setDatum(loggedDiscreteCDF.getMinimum());
- ranking.setRelativeRanking(0.0);
- rankings.add(0, ranking);
-
- double r = random.nextDouble();
- LoggedSingleRelativeRanking prevRanking = rankings.get(0);
- for (LoggedSingleRelativeRanking ranking2 : rankings) {
- double r2 = ranking2.getRelativeRanking();
- if (r < r2) {
- double r1 = prevRanking.getRelativeRanking();
- double f1 = prevRanking.getDatum();
- double f2 = ranking2.getDatum();
- double runtime = (r - r1) / (r2 - r1) * (f2 - f1) + f1;
- return (long) runtime;
+ return makeUpRuntimeCore(loggedDiscreteCDF);
+ }
+
+ private long makeUpRuntimeCore(LoggedDiscreteCDF loggedDiscreteCDF) {
+ CDFRandomGenerator interpolator;
+
+ synchronized (interpolatorMap) {
+ interpolator = interpolatorMap.get(loggedDiscreteCDF);
+ }
+
+ if (interpolator == null) {
+ if (loggedDiscreteCDF.getNumberValues() == 0) {
+ throw new NoValueToMakeUpRuntime("no value to use to make up runtime");
+ }
+
+ interpolator =
+ hasRandomSeed ? new CDFPiecewiseLinearRandomGenerator(
+ loggedDiscreteCDF, ++seed)
+ : new CDFPiecewiseLinearRandomGenerator(loggedDiscreteCDF);
+
+ /*
+ * It doesn't matter if we compute and store an interpolator twice because
+ * the two instances will be semantically identical and stateless, unless
+ * we're seeded, in which case we're not stateless but this code will be
+ * called synchronizedly.
+ */
+ synchronized (interpolatorMap) {
+ interpolatorMap.put(loggedDiscreteCDF, interpolator);
}
- prevRanking = ranking2;
}
- return rankings.get(rankings.size() - 1).getDatum();
+ return interpolator.randomValue();
+ }
+
+ static private class NoValueToMakeUpRuntime extends IllegalArgumentException {
+ static final long serialVersionUID = 1L;
+
+ NoValueToMakeUpRuntime() {
+ super();
+ }
+
+ NoValueToMakeUpRuntime(String detailMessage) {
+ super(detailMessage);
+ }
+
+ NoValueToMakeUpRuntime(String detailMessage, Throwable cause) {
+ super(detailMessage, cause);
+ }
+
+ NoValueToMakeUpRuntime(Throwable cause) {
+ super(cause);
+ }
}
private State makeUpState(int taskAttemptNumber, double[] numAttempts) {