You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2009/12/11 20:33:02 UTC
svn commit: r889778 [2/2] - in /hadoop/mapreduce/trunk: ./
src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/
src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/
src/tools/org/apache/hadoop/tools/rumen/
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java Fri Dec 11 19:32:58 2009
@@ -26,9 +26,16 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.hadoop.tools.rumen.ZombieJobProducer;
import org.apache.commons.logging.Log;
@@ -47,6 +54,7 @@
public static final Log LOG = LogFactory.getLog(JobFactory.class);
private final Path scratch;
+ private final float rateFactor;
private final Configuration conf;
private final ReaderThread rThread;
private final AtomicInteger sequence;
@@ -83,6 +91,7 @@
Path scratch, Configuration conf, CountDownLatch startFlag) {
sequence = new AtomicInteger(0);
this.scratch = scratch;
+ this.rateFactor = conf.getFloat(Gridmix.GRIDMIX_SUB_MUL, 1.0f);
this.jobProducer = jobProducer;
this.conf = new Configuration(conf);
this.submitter = submitter;
@@ -90,6 +99,61 @@
this.rThread = new ReaderThread();
}
+ static class MinTaskInfo extends TaskInfo {
+ public MinTaskInfo(TaskInfo info) {
+ super(info.getInputBytes(), info.getInputRecords(),
+ info.getOutputBytes(), info.getOutputRecords(),
+ info.getTaskMemory());
+ }
+ public long getInputBytes() {
+ return Math.max(0, super.getInputBytes());
+ }
+ public int getInputRecords() {
+ return Math.max(0, super.getInputRecords());
+ }
+ public long getOutputBytes() {
+ return Math.max(0, super.getOutputBytes());
+ }
+ public int getOutputRecords() {
+ return Math.max(0, super.getOutputRecords());
+ }
+ public long getTaskMemory() {
+ return Math.max(0, super.getTaskMemory());
+ }
+ }
+
+ static class FilterJobStory implements JobStory {
+
+ protected final JobStory job;
+
+ public FilterJobStory(JobStory job) {
+ this.job = job;
+ }
+ public JobConf getJobConf() { return job.getJobConf(); }
+ public String getName() { return job.getName(); }
+ public JobID getJobID() { return job.getJobID(); }
+ public String getUser() { return job.getUser(); }
+ public long getSubmissionTime() { return job.getSubmissionTime(); }
+ public InputSplit[] getInputSplits() { return job.getInputSplits(); }
+ public int getNumberMaps() { return job.getNumberMaps(); }
+ public int getNumberReduces() { return job.getNumberReduces(); }
+ public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+ return job.getTaskInfo(taskType, taskNumber);
+ }
+ public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber,
+ int taskAttemptNumber) {
+ return job.getTaskAttemptInfo(taskType, taskNumber, taskAttemptNumber);
+ }
+ public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(
+ int taskNumber, int taskAttemptNumber, int locality) {
+ return job.getMapTaskAttemptInfoAdjusted(
+ taskNumber, taskAttemptNumber, locality);
+ }
+ public Values getOutcome() {
+ return job.getOutcome();
+ }
+ }
+
/**
* Worker thread responsible for reading descriptions, assigning sequence
* numbers, and normalizing time.
@@ -107,7 +171,12 @@
} while (job != null
&& (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS ||
job.getSubmissionTime() < 0));
- return job;
+ return null == job ? null : new FilterJobStory(job) {
+ @Override
+ public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+ return new MinTaskInfo(this.job.getTaskInfo(taskType, taskNumber));
+ }
+ };
}
@Override
@@ -133,11 +202,12 @@
}
final long current = job.getSubmissionTime();
if (current < last) {
- throw new IOException(
- "JobStories are not ordered by submission time.");
+ LOG.warn("Job " + job.getJobID() + " out of order");
+ continue;
}
last = current;
- submitter.add(new GridmixJob(conf, initTime + (current - first),
+ submitter.add(new GridmixJob(conf, initTime +
+ Math.round(rateFactor * (current - first)),
job, scratch, sequence.getAndIncrement()));
} catch (IOException e) {
JobFactory.this.error = e;
@@ -179,8 +249,8 @@
/**
* Wait for the reader thread to exhaust the job trace.
*/
- public void join() throws InterruptedException {
- rThread.join();
+ public void join(long millis) throws InterruptedException {
+ rThread.join(millis);
}
/**
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java Fri Dec 11 19:32:58 2009
@@ -47,7 +47,8 @@
private final MonitorThread mThread;
private final BlockingQueue<Job> runningJobs;
private final long pollDelayMillis;
- private volatile boolean graceful = false;
+ private boolean graceful = false;
+ private boolean shutdown = false;
/**
* Create a JobMonitor with a default polling interval of 5s.
@@ -59,7 +60,7 @@
/**
* Create a JobMonitor that sleeps for the specified duration after
* polling a still-running job.
- * @param pollDelaySec Delay after polling a running job
+ * @param pollDelay Delay after polling a running job
* @param unit Time unit for pollDelaySec (rounded to milliseconds)
*/
public JobMonitor(int pollDelay, TimeUnit unit) {
@@ -80,14 +81,14 @@
* Temporary hook for recording job success.
*/
protected void onSuccess(Job job) {
- LOG.info(job.getJobName() + " succeeded");
+ LOG.info(job.getJobName() + " (" + job.getID() + ")" + " success");
}
/**
* Temporary hook for recording job failure.
*/
protected void onFailure(Job job) {
- LOG.info(job.getJobName() + " failed");
+ LOG.info(job.getJobName() + " (" + job.getID() + ")" + " failure");
}
/**
@@ -128,20 +129,30 @@
@Override
public void run() {
- boolean interrupted = false;
+ boolean graceful;
+ boolean shutdown;
while (true) {
try {
synchronized (mJobs) {
+ graceful = JobMonitor.this.graceful;
+ shutdown = JobMonitor.this.shutdown;
runningJobs.drainTo(mJobs);
}
- final boolean graceful = JobMonitor.this.graceful;
// shutdown conditions; either shutdown requested and all jobs
// have completed or abort requested and there are recently
- // submitted jobs not yet accounted for
- if (interrupted && ((!graceful && runningJobs.isEmpty()) ||
- (graceful && mJobs.isEmpty()))) {
- break;
+ // submitted jobs not in the monitored set
+ if (shutdown) {
+ if (!graceful) {
+ while (!runningJobs.isEmpty()) {
+ synchronized (mJobs) {
+ runningJobs.drainTo(mJobs);
+ }
+ }
+ break;
+ } else if (mJobs.isEmpty()) {
+ break;
+ }
}
while (!mJobs.isEmpty()) {
Job job;
@@ -161,14 +172,16 @@
// reset it here
Thread.currentThread().interrupt();
} else {
- LOG.warn("Lost job " + job.getJobName(), e);
+ LOG.warn("Lost job " + (null == job.getJobName()
+ ? "<unknown>" : job.getJobName()), e);
continue;
}
}
synchronized (mJobs) {
if (!mJobs.offer(job)) {
- LOG.error("Lost job " + job.getJobName()); // should never
- // happen
+ LOG.error("Lost job " + (null == job.getJobName()
+ ? "<unknown>" : job.getJobName())); // should never
+ // happen
}
}
break;
@@ -176,7 +189,7 @@
try {
TimeUnit.MILLISECONDS.sleep(pollDelayMillis);
} catch (InterruptedException e) {
- interrupted = true;
+ shutdown = true;
continue;
}
} catch (Throwable e) {
@@ -198,8 +211,8 @@
* called. Note that, since submission may be sporatic, this will hang
* if no form of shutdown has been requested.
*/
- public void join() throws InterruptedException {
- mThread.join();
+ public void join(long millis) throws InterruptedException {
+ mThread.join(millis);
}
/**
@@ -207,7 +220,10 @@
* Upstream submitter is assumed dead.
*/
public void abort() {
- graceful = false;
+ synchronized (mJobs) {
+ graceful = false;
+ shutdown = true;
+ }
mThread.interrupt();
}
@@ -216,7 +232,10 @@
* Upstream submitter is assumed dead.
*/
public void shutdown() {
- graceful = true;
+ synchronized (mJobs) {
+ graceful = true;
+ shutdown = true;
+ }
mThread.interrupt();
}
}
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java Fri Dec 11 19:32:58 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapred.gridmix;
import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
@@ -96,6 +97,10 @@
" (" + job.getJob().getID() + ")");
} catch (IOException e) {
LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+ if (e.getCause() instanceof ClosedByInterruptException) {
+ throw new InterruptedException("Failed to submit " +
+ job.getJob().getJobName());
+ }
} catch (ClassNotFoundException e) {
LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
}
@@ -144,11 +149,11 @@
* Continue running until all queued jobs have been submitted to the
* cluster.
*/
- public void join() throws InterruptedException {
+ public void join(long millis) throws InterruptedException {
if (!shutdown) {
throw new IllegalStateException("Cannot wait for active submit thread");
}
- sched.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ sched.awaitTermination(millis, TimeUnit.MILLISECONDS);
}
/**
Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * For every record consumed, read key + val bytes from the stream provided.
+ */
+class ReadRecordFactory extends RecordFactory {
+
+ /**
+ * Size of internal, scratch buffer to read from internal stream.
+ */
+ public static final String GRIDMIX_READ_BUF_SIZE = "gridmix.read.buffer.size";
+
+ private final byte[] buf;
+ private final InputStream src;
+ private final RecordFactory factory;
+
+ /**
+ * @param targetBytes Expected byte count.
+ * @param targetRecords Expected record count.
+ * @param src Stream to read bytes.
+ * @param conf Used to establish read buffer size. @see #GRIDMIX_READ_BUF_SIZE
+ */
+ public ReadRecordFactory(long targetBytes, long targetRecords,
+ InputStream src, Configuration conf) {
+ this(new AvgRecordFactory(targetBytes, targetRecords, conf), src, conf);
+ }
+
+ /**
+ * @param factory Factory to draw record sizes.
+ * @param src Stream to read bytes.
+ * @param conf Used to establish read buffer size. @see #GRIDMIX_READ_BUF_SIZE
+ */
+ public ReadRecordFactory(RecordFactory factory, InputStream src,
+ Configuration conf) {
+ this.src = src;
+ this.factory = factory;
+ buf = new byte[conf.getInt(GRIDMIX_READ_BUF_SIZE, 64 * 1024)];
+ }
+
+ @Override
+ public boolean next(GridmixKey key, GridmixRecord val) throws IOException {
+ if (!factory.next(key, val)) {
+ return false;
+ }
+ for (int len = (null == key ? 0 : key.getSize()) + val.getSize();
+ len > 0; len -= buf.length) {
+ IOUtils.readFully(src, buf, 0, Math.min(buf.length, len));
+ }
+ return true;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return factory.getProgress();
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.cleanup(null, src);
+ factory.close();
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Interface for producing records as inputs and outputs to tasks.
+ */
+abstract class RecordFactory implements Closeable {
+
+ /**
+ * Transform the given record or perform some operation.
+ * @return true if the record should be emitted.
+ */
+ public abstract boolean next(GridmixKey key, GridmixRecord val)
+ throws IOException;
+
+ /**
+ * Estimate of exhausted record capacity.
+ */
+ public abstract float getProgress() throws IOException;
+
+}
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java Fri Dec 11 19:32:58 2009
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -27,7 +28,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskType;
@@ -79,21 +79,58 @@
public void close() { }
}
+ static double[] getDistr(Random r, double mindist, int size) {
+ assert 0.0 <= mindist && mindist <= 1.0;
+ final double min = mindist / size;
+ final double rem = 1.0 - min * size;
+ final double[] tmp = new double[size];
+ for (int i = 0; i < tmp.length - 1; ++i) {
+ tmp[i] = r.nextDouble() * rem;
+ }
+ tmp[tmp.length - 1] = rem;
+ Arrays.sort(tmp);
+
+ final double[] ret = new double[size];
+ ret[0] = tmp[0] + min;
+ for (int i = 1; i < size; ++i) {
+ ret[i] = tmp[i] - tmp[i-1] + min;
+ }
+ return ret;
+ }
+
/**
* Generate random task data for a synthetic job.
*/
static class MockJob implements JobStory {
- public static final String MIN_BYTES_IN = "gridmix.test.min.bytes.in";
- public static final String VAR_BYTES_IN = "gridmix.test.var.bytes.in";
- public static final String MIN_BYTES_OUT = "gridmix.test.min.bytes.out";
- public static final String VAR_BYTES_OUT = "gridmix.test.var.bytes.out";
-
- public static final String MIN_REC_SIZE = "gridmix.test.min.rec.bytes";
- public static final String VAR_REC_SIZE = "gridmix.test.var.rec.bytes";
-
- public static final String MAX_MAPS = "gridmix.test.max.maps";
- public static final String MAX_REDS = "gridmix.test.max.reduces";
+ static final int MIN_REC = 1 << 14;
+ static final int MIN_BYTES = 1 << 20;
+ static final int VAR_REC = 1 << 14;
+ static final int VAR_BYTES = 4 << 20;
+ static final int MAX_MAP = 5;
+ static final int MAX_RED = 3;
+
+ static void initDist(Random r, double min, int[] recs, long[] bytes,
+ long tot_recs, long tot_bytes) {
+ final double[] recs_dist = getDistr(r, min, recs.length);
+ final double[] bytes_dist = getDistr(r, min, recs.length);
+ long totalbytes = 0L;
+ int totalrecs = 0;
+ for (int i = 0; i < recs.length; ++i) {
+ recs[i] = (int) Math.round(tot_recs * recs_dist[i]);
+ bytes[i] = Math.round(tot_bytes * bytes_dist[i]);
+ totalrecs += recs[i];
+ totalbytes += bytes[i];
+ }
+ // Add/remove excess
+ recs[0] += totalrecs - tot_recs;
+ bytes[0] += totalbytes - tot_bytes;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("DIST: " + Arrays.toString(recs) + " " +
+ tot_recs + "/" + totalrecs + " " +
+ Arrays.toString(bytes) + " " + tot_bytes + "/" + totalbytes);
+ }
+ }
private static final AtomicInteger seq = new AtomicInteger(0);
// set timestamps in the past
@@ -101,97 +138,65 @@
new AtomicLong(System.currentTimeMillis() -
TimeUnit.MILLISECONDS.convert(60, TimeUnit.DAYS));
+ private final int id;
private final String name;
private final int[] m_recsIn, m_recsOut, r_recsIn, r_recsOut;
private final long[] m_bytesIn, m_bytesOut, r_bytesIn, r_bytesOut;
private final long submitTime;
- public MockJob() {
- this(new Configuration(false));
- }
-
public MockJob(Configuration conf) {
- this(conf.getInt(MIN_BYTES_IN, 1 << 20),
- conf.getInt(VAR_BYTES_IN, 5 << 20),
- conf.getInt(MIN_BYTES_OUT, 1 << 20),
- conf.getInt(VAR_BYTES_OUT, 5 << 20),
- conf.getInt(MIN_REC_SIZE , 100),
- conf.getInt(VAR_REC_SIZE , 1 << 15),
- conf.getInt(MAX_MAPS, 5),
- conf.getInt(MAX_REDS, 3));
- }
-
- public MockJob(int min_bytes_in, int var_bytes_in,
- int min_bytes_out, int var_bytes_out,
- int min_rec_size, int var_rec_size,
- int max_maps, int max_reds) {
final Random r = new Random();
- name = String.format("MOCKJOB%05d", seq.getAndIncrement());
+ final long seed = r.nextLong();
+ r.setSeed(seed);
+ id = seq.getAndIncrement();
+ name = String.format("MOCKJOB%05d", id);
+ LOG.info(name + " (" + seed + ")");
submitTime = timestamp.addAndGet(TimeUnit.MILLISECONDS.convert(
r.nextInt(10), TimeUnit.SECONDS));
- int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;
- int iMapRTotal = 0, oMapRTotal = 0, iRedRTotal = 0, oRedRTotal = 0;
-
- final int iAvgMapRec = r.nextInt(var_rec_size) + min_rec_size;
- final int oAvgMapRec = r.nextInt(var_rec_size) + min_rec_size;
-
- // MAP
- m_bytesIn = new long[r.nextInt(max_maps) + 1];
- m_bytesOut = new long[m_bytesIn.length];
- m_recsIn = new int[m_bytesIn.length];
- m_recsOut = new int[m_bytesIn.length];
- for (int i = 0; i < m_bytesIn.length; ++i) {
- m_bytesIn[i] = r.nextInt(var_bytes_in) + min_bytes_in;
- iMapBTotal += m_bytesIn[i];
- m_recsIn[i] = (int)(m_bytesIn[i] / iAvgMapRec);
- iMapRTotal += m_recsIn[i];
-
- m_bytesOut[i] = r.nextInt(var_bytes_out) + min_bytes_out;
- oMapBTotal += m_bytesOut[i];
- m_recsOut[i] = (int)(m_bytesOut[i] / oAvgMapRec);
- oMapRTotal += m_recsOut[i];
- }
-
- // REDUCE
- r_bytesIn = new long[r.nextInt(max_reds) + 1];
- r_bytesOut = new long[r_bytesIn.length];
- r_recsIn = new int[r_bytesIn.length];
- r_recsOut = new int[r_bytesIn.length];
- iRedBTotal = oMapBTotal;
- iRedRTotal = oMapRTotal;
- for (int j = 0; iRedBTotal > 0; ++j) {
- int i = j % r_bytesIn.length;
- final int inc = r.nextInt(var_bytes_out) + min_bytes_out;
- r_bytesIn[i] += inc;
- iRedBTotal -= inc;
- if (iRedBTotal < 0) {
- r_bytesIn[i] += iRedBTotal;
- iRedBTotal = 0;
- }
- iRedRTotal += r_recsIn[i];
- r_recsIn[i] = (int)(r_bytesIn[i] / oAvgMapRec);
- iRedRTotal -= r_recsIn[i];
- if (iRedRTotal < 0) {
- r_recsIn[i] += iRedRTotal;
- iRedRTotal = 0;
- }
- r_bytesOut[i] = r.nextInt(var_bytes_in) + min_bytes_in;
- oRedBTotal += r_bytesOut[i];
- r_recsOut[i] = (int)(r_bytesOut[i] / iAvgMapRec);
- oRedRTotal += r_recsOut[i];
- }
- r_recsIn[0] += iRedRTotal;
+ m_recsIn = new int[r.nextInt(MAX_MAP) + 1];
+ m_bytesIn = new long[m_recsIn.length];
+ m_recsOut = new int[m_recsIn.length];
+ m_bytesOut = new long[m_recsIn.length];
+
+ r_recsIn = new int[r.nextInt(MAX_RED) + 1];
+ r_bytesIn = new long[r_recsIn.length];
+ r_recsOut = new int[r_recsIn.length];
+ r_bytesOut = new long[r_recsIn.length];
+
+ // map input
+ final long map_recs = r.nextInt(VAR_REC) + MIN_REC;
+ final long map_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+ initDist(r, 0.5, m_recsIn, m_bytesIn, map_recs, map_bytes);
+
+ // shuffle
+ final long shuffle_recs = r.nextInt(VAR_REC) + MIN_REC;
+ final long shuffle_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+ initDist(r, 0.4, m_recsOut, m_bytesOut, shuffle_recs, shuffle_bytes);
+ initDist(r, 0.8, r_recsIn, r_bytesIn, shuffle_recs, shuffle_bytes);
+
+ // reduce output
+ final long red_recs = r.nextInt(VAR_REC) + MIN_REC;
+ final long red_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+ initDist(r, 0.4, r_recsOut, r_bytesOut, red_recs, red_bytes);
if (LOG.isDebugEnabled()) {
- iRedRTotal = 0;
- iRedBTotal = 0;
- for (int i = 0; i < r_bytesIn.length; ++i) {
+ int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;
+ int iMapRTotal = 0, oMapRTotal = 0, iRedRTotal = 0, oRedRTotal = 0;
+ for (int i = 0; i < m_recsIn.length; ++i) {
+ iMapRTotal += m_recsIn[i];
+ iMapBTotal += m_bytesIn[i];
+ oMapRTotal += m_recsOut[i];
+ oMapBTotal += m_bytesOut[i];
+ }
+ for (int i = 0; i < r_recsIn.length; ++i) {
iRedRTotal += r_recsIn[i];
iRedBTotal += r_bytesIn[i];
+ oRedRTotal += r_recsOut[i];
+ oRedBTotal += r_bytesOut[i];
}
LOG.debug(String.format("%s: M (%03d) %6d/%10d -> %6d/%10d" +
- " R (%03d) %6d/%10d -> %6d/%10d @%d", name,
+ " R (%03d) %6d/%10d -> %6d/%10d @%d", name,
m_bytesIn.length, iMapRTotal, iMapBTotal, oMapRTotal, oMapBTotal,
r_bytesIn.length, iRedRTotal, iRedBTotal, oRedRTotal, oRedBTotal,
submitTime));
@@ -210,7 +215,7 @@
@Override
public JobID getJobID() {
- return null;
+ return new JobID("job_mock_" + name, id);
}
@Override
Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Random;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+public class TestFilePool {
+
+ static final Log LOG = LogFactory.getLog(TestFileQueue.class);
+ static final int NFILES = 26;
+ static final Path base = getBaseDir();
+
+ static Path getBaseDir() {
+ try {
+ final Configuration conf = new Configuration();
+ final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+ return new Path(System.getProperty("test.build.data", "/tmp"),
+ "testFilePool").makeQualified(fs);
+ } catch (IOException e) {
+ fail();
+ }
+ return null;
+ }
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ final Configuration conf = new Configuration();
+ final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+ fs.delete(base, true);
+ final Random r = new Random();
+ final long seed = r.nextLong();
+ r.setSeed(seed);
+ LOG.info("seed: " + seed);
+ fs.mkdirs(base);
+ for (int i = 0; i < NFILES; ++i) {
+ Path file = base;
+ for (double d = 0.6; d > 0.0; d *= 0.8) {
+ if (r.nextDouble() < d) {
+ file = new Path(base, Integer.toString(r.nextInt(3)));
+ continue;
+ }
+ break;
+ }
+ OutputStream out = null;
+ try {
+ out = fs.create(new Path(file, "" + (char)('A' + i)));
+ final byte[] b = new byte[1024];
+ Arrays.fill(b, (byte)('A' + i));
+ for (int len = ((i % 13) + 1) * 1024; len > 0; len -= 1024) {
+ out.write(b);
+ }
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+ }
+
+ @AfterClass
+ public static void cleanup() throws IOException {
+ final Configuration conf = new Configuration();
+ final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+ fs.delete(base, true);
+ }
+
+ @Test
+ public void testUnsuitable() throws Exception {
+ try {
+ final Configuration conf = new Configuration();
+ // all files 13k or less
+ conf.setLong(FilePool.GRIDMIX_MIN_FILE, 14 * 1024);
+ final FilePool pool = new FilePool(conf, base);
+ pool.refresh();
+ } catch (IOException e) {
+ return;
+ }
+ fail();
+ }
+
+ @Test
+ public void testPool() throws Exception {
+ final Random r = new Random();
+ final Configuration conf = new Configuration();
+ conf.setLong(FilePool.GRIDMIX_MIN_FILE, 3 * 1024);
+ final FilePool pool = new FilePool(conf, base);
+ pool.refresh();
+ final ArrayList<FileStatus> files = new ArrayList<FileStatus>();
+
+ // ensure 1k, 2k files excluded
+ final int expectedPoolSize = (NFILES / 2 * (NFILES / 2 + 1) - 6) * 1024;
+ assertEquals(expectedPoolSize, pool.getInputFiles(Long.MAX_VALUE, files));
+ assertEquals(NFILES - 4, files.size());
+
+ // exact match
+ files.clear();
+ assertEquals(expectedPoolSize, pool.getInputFiles(expectedPoolSize, files));
+
+ // match random within 12k
+ files.clear();
+ final long rand = r.nextInt(expectedPoolSize);
+ assertTrue("Missed: " + rand,
+ (NFILES / 2) * 1024 > rand - pool.getInputFiles(rand, files));
+
+ // all files
+ conf.setLong(FilePool.GRIDMIX_MIN_FILE, 0);
+ pool.refresh();
+ files.clear();
+ assertEquals((NFILES / 2 * (NFILES / 2 + 1)) * 1024,
+ pool.getInputFiles(Long.MAX_VALUE, files));
+ }
+
+ void checkSplitEq(FileSystem fs, CombineFileSplit split, long bytes)
+ throws Exception {
+ long splitBytes = 0L;
+ HashSet<Path> uniq = new HashSet<Path>();
+ for (int i = 0; i < split.getNumPaths(); ++i) {
+ splitBytes += split.getLength(i);
+ assertTrue(
+ split.getLength(i) <= fs.getFileStatus(split.getPath(i)).getLen());
+ assertFalse(uniq.contains(split.getPath(i)));
+ uniq.add(split.getPath(i));
+ }
+ assertEquals(bytes, splitBytes);
+ }
+
+ @Test
+ public void testStriper() throws Exception {
+ final Random r = new Random();
+ final Configuration conf = new Configuration();
+ final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+ conf.setLong(FilePool.GRIDMIX_MIN_FILE, 3 * 1024);
+ final FilePool pool = new FilePool(conf, base) {
+ @Override
+ public BlockLocation[] locationsFor(FileStatus stat, long start, long len)
+ throws IOException {
+ return new BlockLocation[] { new BlockLocation() };
+ }
+ };
+ pool.refresh();
+
+ final int expectedPoolSize = (NFILES / 2 * (NFILES / 2 + 1) - 6) * 1024;
+ final InputStriper striper = new InputStriper(pool, expectedPoolSize);
+ int last = 0;
+ for (int i = 0; i < expectedPoolSize;
+ last = Math.min(expectedPoolSize - i, r.nextInt(expectedPoolSize))) {
+ checkSplitEq(fs, striper.splitFor(pool, last, 0), last);
+ i += last;
+ }
+ final InputStriper striper2 = new InputStriper(pool, expectedPoolSize);
+ checkSplitEq(fs, striper2.splitFor(pool, expectedPoolSize, 0),
+ expectedPoolSize);
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+public class TestFileQueue {
+
+ static final Log LOG = LogFactory.getLog(TestFileQueue.class);
+ static final int NFILES = 4;
+ static final int BLOCK = 256;
+ static final Path[] paths = new Path[NFILES];
+ static final String[] loc = new String[NFILES];
+ static final long[] start = new long[NFILES];
+ static final long[] len = new long[NFILES];
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ final Configuration conf = new Configuration();
+ final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+ final Path p = new Path(System.getProperty("test.build.data", "/tmp"),
+ "testFileQueue").makeQualified(fs);
+ fs.delete(p, true);
+ final byte[] b = new byte[BLOCK];
+ for (int i = 0; i < NFILES; ++i) {
+ Arrays.fill(b, (byte)('A' + i));
+ paths[i] = new Path(p, "" + (char)('A' + i));
+ OutputStream f = null;
+ try {
+ f = fs.create(paths[i]);
+ f.write(b);
+ } finally {
+ if (f != null) {
+ f.close();
+ }
+ }
+ }
+ }
+
+ @AfterClass
+ public static void cleanup() throws IOException {
+ final Configuration conf = new Configuration();
+ final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+ final Path p = new Path(System.getProperty("test.build.data", "/tmp"),
+ "testFileQueue").makeQualified(fs);
+ fs.delete(p, true);
+ }
+
+ static ByteArrayOutputStream fillVerif() throws IOException {
+ final byte[] b = new byte[BLOCK];
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ for (int i = 0; i < NFILES; ++i) {
+ Arrays.fill(b, (byte)('A' + i));
+ out.write(b, 0, (int)len[i]);
+ }
+ return out;
+ }
+
+ @Test
+ public void testRepeat() throws Exception {
+ final Configuration conf = new Configuration();
+ Arrays.fill(loc, "");
+ Arrays.fill(start, 0L);
+ Arrays.fill(len, BLOCK);
+
+ final ByteArrayOutputStream out = fillVerif();
+ final FileQueue q =
+ new FileQueue(new CombineFileSplit(paths, start, len, loc), conf);
+ final byte[] verif = out.toByteArray();
+ final byte[] check = new byte[2 * NFILES * BLOCK];
+ q.read(check, 0, NFILES * BLOCK);
+ assertArrayEquals(verif, Arrays.copyOf(check, NFILES * BLOCK));
+
+ final byte[] verif2 = new byte[2 * NFILES * BLOCK];
+ System.arraycopy(verif, 0, verif2, 0, verif.length);
+ System.arraycopy(verif, 0, verif2, verif.length, verif.length);
+ q.read(check, 0, 2 * NFILES * BLOCK);
+ assertArrayEquals(verif2, check);
+
+ }
+
+ @Test
+ public void testUneven() throws Exception {
+ final Configuration conf = new Configuration();
+ Arrays.fill(loc, "");
+ Arrays.fill(start, 0L);
+ Arrays.fill(len, BLOCK);
+
+ final int B2 = BLOCK / 2;
+ for (int i = 0; i < NFILES; i += 2) {
+ start[i] += B2;
+ len[i] -= B2;
+ }
+ final FileQueue q =
+ new FileQueue(new CombineFileSplit(paths, start, len, loc), conf);
+ final ByteArrayOutputStream out = fillVerif();
+ final byte[] verif = out.toByteArray();
+ final byte[] check = new byte[NFILES / 2 * BLOCK + NFILES / 2 * B2];
+ q.read(check, 0, verif.length);
+ assertArrayEquals(verif, Arrays.copyOf(check, verif.length));
+ q.read(check, 0, verif.length);
+ assertArrayEquals(verif, Arrays.copyOf(check, verif.length));
+ }
+
+ @Test
+ public void testEmpty() throws Exception {
+ final Configuration conf = new Configuration();
+ // verify OK if unused
+ final FileQueue q = new FileQueue(new CombineFileSplit(
+ new Path[0], new long[0], new long[0], new String[0]), conf);
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+
+public class TestGridmixRecord {
+ private static final Log LOG = LogFactory.getLog(TestGridmixRecord.class);
+
+ static void lengthTest(GridmixRecord x, GridmixRecord y, int min,
+ int max) throws Exception {
+ final Random r = new Random();
+ final long seed = r.nextLong();
+ r.setSeed(seed);
+ LOG.info("length: " + seed);
+ final DataInputBuffer in = new DataInputBuffer();
+ final DataOutputBuffer out1 = new DataOutputBuffer();
+ final DataOutputBuffer out2 = new DataOutputBuffer();
+ for (int i = min; i < max; ++i) {
+ setSerialize(x, r.nextLong(), i, out1);
+ // check write
+ assertEquals(i, out1.getLength());
+ // write to stream
+ x.write(out2);
+ // check read
+ in.reset(out1.getData(), 0, out1.getLength());
+ y.readFields(in);
+ assertEquals(i, x.getSize());
+ assertEquals(i, y.getSize());
+ }
+ // check stream read
+ in.reset(out2.getData(), 0, out2.getLength());
+ for (int i = min; i < max; ++i) {
+ y.readFields(in);
+ assertEquals(i, y.getSize());
+ }
+ }
+
+ static void randomReplayTest(GridmixRecord x, GridmixRecord y, int min,
+ int max) throws Exception {
+ final Random r = new Random();
+ final long seed = r.nextLong();
+ r.setSeed(seed);
+ LOG.info("randReplay: " + seed);
+ final DataOutputBuffer out1 = new DataOutputBuffer();
+ for (int i = min; i < max; ++i) {
+ final int s = out1.getLength();
+ x.setSeed(r.nextLong());
+ x.setSize(i);
+ x.write(out1);
+ assertEquals(i, out1.getLength() - s);
+ }
+ final DataInputBuffer in = new DataInputBuffer();
+ in.reset(out1.getData(), 0, out1.getLength());
+ final DataOutputBuffer out2 = new DataOutputBuffer();
+ // deserialize written records, write to separate buffer
+ for (int i = min; i < max; ++i) {
+ final int s = in.getPosition();
+ y.readFields(in);
+ assertEquals(i, in.getPosition() - s);
+ y.write(out2);
+ }
+ // verify written contents match
+ assertEquals(out1.getLength(), out2.getLength());
+ // assumes that writes will grow buffer deterministically
+ assertEquals("Bad test", out1.getData().length, out2.getData().length);
+ assertArrayEquals(out1.getData(), out2.getData());
+ }
+
+ static void eqSeedTest(GridmixRecord x, GridmixRecord y, int max)
+ throws Exception {
+ final Random r = new Random();
+ final long s = r.nextLong();
+ r.setSeed(s);
+ LOG.info("eqSeed: " + s);
+ assertEquals(x.fixedBytes(), y.fixedBytes());
+ final int min = x.fixedBytes() + 1;
+ final DataOutputBuffer out1 = new DataOutputBuffer();
+ final DataOutputBuffer out2 = new DataOutputBuffer();
+ for (int i = min; i < max; ++i) {
+ final long seed = r.nextLong();
+ setSerialize(x, seed, i, out1);
+ setSerialize(y, seed, i, out2);
+ assertEquals(x, y);
+ assertEquals(x.hashCode(), y.hashCode());
+
+ // verify written contents match
+ assertEquals(out1.getLength(), out2.getLength());
+ // assumes that writes will grow buffer deterministically
+ assertEquals("Bad test", out1.getData().length, out2.getData().length);
+ assertArrayEquals(out1.getData(), out2.getData());
+ }
+ }
+
+ static void binSortTest(GridmixRecord x, GridmixRecord y, int min,
+ int max, WritableComparator cmp) throws Exception {
+ final Random r = new Random();
+ final long s = r.nextLong();
+ r.setSeed(s);
+ LOG.info("sort: " + s);
+ final DataOutputBuffer out1 = new DataOutputBuffer();
+ final DataOutputBuffer out2 = new DataOutputBuffer();
+ for (int i = min; i < max; ++i) {
+ final long seed1 = r.nextLong();
+ setSerialize(x, seed1, i, out1);
+ assertEquals(0, x.compareSeed(seed1, Math.max(0, i - x.fixedBytes())));
+
+ final long seed2 = r.nextLong();
+ setSerialize(y, seed2, i, out2);
+ assertEquals(0, y.compareSeed(seed2, Math.max(0, i - x.fixedBytes())));
+
+ // for eq sized records, ensure byte cmp where req
+ final int chk = WritableComparator.compareBytes(
+ out1.getData(), 0, out1.getLength(),
+ out2.getData(), 0, out2.getLength());
+ assertEquals(chk, x.compareTo(y));
+ assertEquals(chk, cmp.compare(
+ out1.getData(), 0, out1.getLength(),
+ out2.getData(), 0, out2.getLength()));
+ // write second copy, compare eq
+ final int s1 = out1.getLength();
+ x.write(out1);
+ assertEquals(0, cmp.compare(out1.getData(), 0, s1,
+ out1.getData(), s1, out1.getLength() - s1));
+ final int s2 = out2.getLength();
+ y.write(out2);
+ assertEquals(0, cmp.compare(out2.getData(), 0, s2,
+ out2.getData(), s2, out2.getLength() - s2));
+ assertEquals(chk, cmp.compare(out1.getData(), 0, s1,
+ out2.getData(), s2, out2.getLength() - s2));
+ }
+ }
+
+ static void checkSpec(GridmixKey a, GridmixKey b) throws Exception {
+ final Random r = new Random();
+ final long s = r.nextLong();
+ r.setSeed(s);
+ LOG.info("spec: " + s);
+ final DataInputBuffer in = new DataInputBuffer();
+ final DataOutputBuffer out = new DataOutputBuffer();
+ a.setType(GridmixKey.REDUCE_SPEC);
+ b.setType(GridmixKey.REDUCE_SPEC);
+ for (int i = 0; i < 100; ++i) {
+ final int in_rec = r.nextInt(Integer.MAX_VALUE);
+ a.setReduceInputRecords(in_rec);
+ final int out_rec = r.nextInt(Integer.MAX_VALUE);
+ a.setReduceOutputRecords(out_rec);
+ final int out_bytes = r.nextInt(Integer.MAX_VALUE);
+ a.setReduceOutputBytes(out_bytes);
+ final int min = WritableUtils.getVIntSize(in_rec)
+ + WritableUtils.getVIntSize(out_rec)
+ + WritableUtils.getVIntSize(out_bytes);
+ assertEquals(min + 2, a.fixedBytes()); // meta + vint min
+ final int size = r.nextInt(1024) + a.fixedBytes() + 1;
+ setSerialize(a, r.nextLong(), size, out);
+ assertEquals(size, out.getLength());
+ assertTrue(a.equals(a));
+ assertEquals(0, a.compareTo(a));
+
+ in.reset(out.getData(), 0, out.getLength());
+
+ b.readFields(in);
+ assertEquals(size, b.getSize());
+ assertEquals(in_rec, b.getReduceInputRecords());
+ assertEquals(out_rec, b.getReduceOutputRecords());
+ assertEquals(out_bytes, b.getReduceOutputBytes());
+ assertTrue(a.equals(b));
+ assertEquals(0, a.compareTo(b));
+ assertEquals(a.hashCode(), b.hashCode());
+ }
+ }
+
+ static void setSerialize(GridmixRecord x, long seed, int size,
+ DataOutputBuffer out) throws IOException {
+ x.setSeed(seed);
+ x.setSize(size);
+ out.reset();
+ x.write(out);
+ }
+
+ @Test
+ public void testKeySpec() throws Exception {
+ final int min = 5;
+ final int max = 300;
+ final GridmixKey a = new GridmixKey(GridmixKey.REDUCE_SPEC, 1, 0L);
+ final GridmixKey b = new GridmixKey(GridmixKey.REDUCE_SPEC, 1, 0L);
+ lengthTest(a, b, min, max);
+ randomReplayTest(a, b, min, max);
+ binSortTest(a, b, min, max, new GridmixKey.Comparator());
+ // 2 fixed GR bytes, 1 type, 3 spec
+ eqSeedTest(a, b, max);
+ checkSpec(a, b);
+ }
+
+ @Test
+ public void testKeyData() throws Exception {
+ final int min = 2;
+ final int max = 300;
+ final GridmixKey a = new GridmixKey(GridmixKey.DATA, 1, 0L);
+ final GridmixKey b = new GridmixKey(GridmixKey.DATA, 1, 0L);
+ lengthTest(a, b, min, max);
+ randomReplayTest(a, b, min, max);
+ binSortTest(a, b, min, max, new GridmixKey.Comparator());
+ // 2 fixed GR bytes, 1 type
+ eqSeedTest(a, b, 300);
+ }
+
+ @Test
+ public void testBaseRecord() throws Exception {
+ final int min = 1;
+ final int max = 300;
+ final GridmixRecord a = new GridmixRecord();
+ final GridmixRecord b = new GridmixRecord();
+ lengthTest(a, b, min, max);
+ randomReplayTest(a, b, min, max);
+ binSortTest(a, b, min, max, new GridmixRecord.Comparator());
+ // 2 fixed GR bytes
+ eqSeedTest(a, b, 300);
+ }
+
+ public static void main(String[] argv) throws Exception {
+ boolean fail = false;
+ final TestGridmixRecord test = new TestGridmixRecord();
+ try { test.testKeySpec(); } catch (Exception e) {
+ fail = true;
+ e.printStackTrace();
+ }
+ try {test.testKeyData(); } catch (Exception e) {
+ fail = true;
+ e.printStackTrace();
+ }
+ try {test.testBaseRecord(); } catch (Exception e) {
+ fail = true;
+ e.printStackTrace();
+ }
+ System.exit(fail ? -1 : 0);
+ }
+
+ static void printDebug(GridmixRecord a, GridmixRecord b) throws IOException {
+ DataOutputBuffer out = new DataOutputBuffer();
+ a.write(out);
+ System.out.println("A " +
+ Arrays.toString(Arrays.copyOf(out.getData(), out.getLength())));
+ out.reset();
+ b.write(out);
+ System.out.println("B " +
+ Arrays.toString(Arrays.copyOf(out.getData(), out.getLength())));
+ }
+
+}
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Fri Dec 11 19:32:58 2009
@@ -20,9 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Comparator;
import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
@@ -35,8 +33,6 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskReport;
@@ -44,7 +40,6 @@
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.TaskInfo;
-import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import static org.apache.hadoop.mapreduce.TaskCounter.*;
@@ -96,7 +91,7 @@
static class TestMonitor extends JobMonitor {
- static final long SLOPBYTES = 5 * 1024;
+ static final long SLOPBYTES = 1024;
private final int expected;
private final BlockingQueue<Job> retiredJobs;
@@ -138,12 +133,12 @@
final TaskReport[] mReports = job.getTaskReports(TaskType.MAP);
assertEquals("Mismatched map count", nMaps, mReports.length);
check(TaskType.MAP, job, spec, mReports,
- 0, 1, nReds * SLOPBYTES, nReds + 1);
+ 0, 0, SLOPBYTES, nReds);
final TaskReport[] rReports = job.getTaskReports(TaskType.REDUCE);
assertEquals("Mismatched reduce count", nReds, rReports.length);
check(TaskType.REDUCE, job, spec, rReports,
- nMaps * SLOPBYTES, nMaps + 1, 0, 1);
+ nMaps * SLOPBYTES, 2 * nMaps, 0, 0);
}
}
@@ -176,74 +171,97 @@
(int)counters.findCounter(MAP_OUTPUT_RECORDS).getValue();
specInfo = spec.getTaskInfo(TaskType.MAP, i);
+ specInputRecords[i] = specInfo.getInputRecords();
+ specInputBytes[i] = specInfo.getInputBytes();
+ specOutputRecords[i] = specInfo.getOutputRecords();
+ specOutputBytes[i] = specInfo.getOutputBytes();
+ System.out.printf(type + " SPEC: %9d -> %9d :: %5d -> %5d\n",
+ specInputBytes[i], specOutputBytes[i],
+ specInputRecords[i], specOutputRecords[i]);
+ System.out.printf(type + " RUN: %9d -> %9d :: %5d -> %5d\n",
+ runInputBytes[i], runOutputBytes[i],
+ runInputRecords[i], runOutputRecords[i]);
break;
case REDUCE:
- runInputBytes[i] =
- counters.findCounter(REDUCE_SHUFFLE_BYTES).getValue();
- runInputRecords[i] =
- (int)counters.findCounter(REDUCE_INPUT_RECORDS).getValue();
- runOutputBytes[i] =
- counters.findCounter("FileSystemCounters",
- "HDFS_BYTES_WRITTEN").getValue();
- runOutputRecords[i] =
- (int)counters.findCounter(REDUCE_OUTPUT_RECORDS).getValue();
+ runInputBytes[i] = 0;
+ runInputRecords[i] =
+ (int)counters.findCounter(REDUCE_INPUT_RECORDS).getValue();
+ runOutputBytes[i] =
+ counters.findCounter("FileSystemCounters",
+ "HDFS_BYTES_WRITTEN").getValue();
+ runOutputRecords[i] =
+ (int)counters.findCounter(REDUCE_OUTPUT_RECORDS).getValue();
+
specInfo = spec.getTaskInfo(TaskType.REDUCE, i);
+ // There is no reliable counter for reduce input bytes. The
+ // variable-length encoding of intermediate records and other noise
+ // make this quantity difficult to estimate. The shuffle and spec
+ // input bytes are included in debug output for reference, but are
+ // not checked
+ specInputBytes[i] = 0;
+ specInputRecords[i] = specInfo.getInputRecords();
+ specOutputRecords[i] = specInfo.getOutputRecords();
+ specOutputBytes[i] = specInfo.getOutputBytes();
+ System.out.printf(type + " SPEC: (%9d) -> %9d :: %5d -> %5d\n",
+ specInfo.getInputBytes(), specOutputBytes[i],
+ specInputRecords[i], specOutputRecords[i]);
+ System.out.printf(type + " RUN: (%9d) -> %9d :: %5d -> %5d\n",
+ counters.findCounter(REDUCE_SHUFFLE_BYTES).getValue(),
+ runOutputBytes[i], runInputRecords[i], runOutputRecords[i]);
break;
default:
specInfo = null;
fail("Unexpected type: " + type);
}
- specInputBytes[i] = specInfo.getInputBytes();
- specInputRecords[i] = specInfo.getInputRecords();
- specOutputRecords[i] = specInfo.getOutputRecords();
- specOutputBytes[i] = specInfo.getOutputBytes();
- System.out.printf(type + " SPEC: %9d -> %9d :: %5d -> %5d\n",
- specInputBytes[i], specOutputBytes[i],
- specInputRecords[i], specOutputRecords[i]);
- System.out.printf(type + " RUN: %9d -> %9d :: %5d -> %5d\n",
- runInputBytes[i], runOutputBytes[i],
- runInputRecords[i], runOutputRecords[i]);
}
// Check input bytes
Arrays.sort(specInputBytes);
Arrays.sort(runInputBytes);
for (int i = 0; i < runTasks.length; ++i) {
- assertTrue("Mismatched input bytes " +
+ assertTrue("Mismatched " + type + " input bytes " +
specInputBytes[i] + "/" + runInputBytes[i],
- runInputBytes[i] - specInputBytes[i] <= extraInputBytes);
+ eqPlusMinus(runInputBytes[i], specInputBytes[i], extraInputBytes));
}
// Check input records
Arrays.sort(specInputRecords);
Arrays.sort(runInputRecords);
for (int i = 0; i < runTasks.length; ++i) {
- assertTrue("Mismatched input records " +
+ assertTrue("Mismatched " + type + " input records " +
specInputRecords[i] + "/" + runInputRecords[i],
- runInputRecords[i] - specInputRecords[i] <= extraInputRecords);
+ eqPlusMinus(runInputRecords[i], specInputRecords[i],
+ extraInputRecords));
}
// Check output bytes
Arrays.sort(specOutputBytes);
Arrays.sort(runOutputBytes);
for (int i = 0; i < runTasks.length; ++i) {
- assertTrue("Mismatched output bytes " +
+ assertTrue("Mismatched " + type + " output bytes " +
specOutputBytes[i] + "/" + runOutputBytes[i],
- runOutputBytes[i] - specOutputBytes[i] <= extraOutputBytes);
+ eqPlusMinus(runOutputBytes[i], specOutputBytes[i],
+ extraOutputBytes));
}
// Check output records
Arrays.sort(specOutputRecords);
Arrays.sort(runOutputRecords);
for (int i = 0; i < runTasks.length; ++i) {
- assertTrue("Mismatched output records " +
+ assertTrue("Mismatched " + type + " output records " +
specOutputRecords[i] + "/" + runOutputRecords[i],
- runOutputRecords[i] - specOutputRecords[i] <= extraOutputRecords);
+ eqPlusMinus(runOutputRecords[i], specOutputRecords[i],
+ extraOutputRecords));
}
}
+ private static boolean eqPlusMinus(long a, long b, long x) {
+ final long diff = Math.abs(a - b);
+ return diff <= x;
+ }
+
@Override
protected void onSuccess(Job job) {
retiredJobs.add(job);
@@ -292,6 +310,7 @@
};
DebugGridmix client = new DebugGridmix();
final Configuration conf = mrCluster.createJobConf();
+ //conf.setInt(Gridmix.GRIDMIX_KEY_LEN, 2);
int res = ToolRunner.run(conf, client, argv);
assertEquals("Client exited with nonzero status", 0, res);
client.checkMonitor();
Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java?rev=889778&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java Fri Dec 11 19:32:58 2009
@@ -0,0 +1,79 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.util.Random;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+public class TestRecordFactory {
+ private static final Log LOG = LogFactory.getLog(TestRecordFactory.class);
+
+ public static void testFactory(long targetBytes, long targetRecs)
+ throws Exception {
+ final Configuration conf = new Configuration();
+ final GridmixKey key = new GridmixKey();
+ final GridmixRecord val = new GridmixRecord();
+ LOG.info("Target bytes/records: " + targetBytes + "/" + targetRecs);
+ final RecordFactory f = new AvgRecordFactory(targetBytes, targetRecs, conf);
+ targetRecs = targetRecs <= 0 && targetBytes >= 0
+ ? Math.max(1,
+ targetBytes / conf.getInt("gridmix.missing.rec.size", 64 * 1024))
+ : targetRecs;
+
+ long records = 0L;
+ final DataOutputBuffer out = new DataOutputBuffer();
+ while (f.next(key, val)) {
+ ++records;
+ key.write(out);
+ val.write(out);
+ }
+ assertEquals(targetRecs, records);
+ assertEquals(targetBytes, out.getLength());
+ }
+
+ @Test
+ public void testRandom() throws Exception {
+ final Random r = new Random();
+ final long targetBytes = r.nextInt(1 << 20) + 3 * (1 << 14);
+ final long targetRecs = r.nextInt(1 << 14);
+ testFactory(targetBytes, targetRecs);
+ }
+
+ @Test
+ public void testAvg() throws Exception {
+ final Random r = new Random();
+ final long avgsize = r.nextInt(1 << 10) + 1;
+ final long targetRecs = r.nextInt(1 << 14);
+ testFactory(targetRecs * avgsize, targetRecs);
+ }
+
+ @Test
+ public void testZero() throws Exception {
+ final Random r = new Random();
+ final long targetBytes = r.nextInt(1 << 20);
+ testFactory(targetBytes, 0);
+ }
+}
Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java Fri Dec 11 19:32:58 2009
@@ -24,7 +24,8 @@
private final int recsOut;
private final long maxMemory;
- public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, int maxMemory) {
+ public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut,
+ long maxMemory) {
this.bytesIn = bytesIn;
this.recsIn = recsIn;
this.bytesOut = bytesOut;
Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java?rev=889778&r1=889777&r2=889778&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java Fri Dec 11 19:32:58 2009
@@ -619,38 +619,42 @@
Values type = loggedTask.getTaskType();
if ((type != Values.MAP) && (type != Values.REDUCE)) {
throw new IllegalArgumentException(
- "getTaskInfo only supports MAP or REDUCE tasks: " + type.toString() +
- " for task = " + loggedTask.getTaskID());
+ "getTaskInfo only supports MAP or REDUCE tasks: " + type.toString()
+ + " for task = " + loggedTask.getTaskID());
}
for (LoggedTaskAttempt attempt : attempts) {
attempt = sanitizeLoggedTaskAttempt(attempt);
// ignore bad attempts or unsuccessful attempts.
- if ((attempt == null)
- || (attempt.getResult() != Values.SUCCESS)) {
+ if ((attempt == null) || (attempt.getResult() != Values.SUCCESS)) {
continue;
}
if (type == Values.MAP) {
inputBytes = attempt.getHdfsBytesRead();
inputRecords = attempt.getMapInputRecords();
- outputBytes = attempt.getMapOutputBytes();
+ outputBytes =
+ (job.getTotalReduces() > 0) ? attempt.getMapOutputBytes() : attempt
+ .getHdfsBytesWritten();
outputRecords = attempt.getMapOutputRecords();
- heapMegabytes = (job.getJobMapMB() > 0) ? job.getJobMapMB()
- : job.getHeapMegabytes();
+ heapMegabytes =
+ (job.getJobMapMB() > 0) ? job.getJobMapMB() : job
+ .getHeapMegabytes();
} else {
inputBytes = attempt.getReduceShuffleBytes();
inputRecords = attempt.getReduceInputRecords();
outputBytes = attempt.getHdfsBytesWritten();
outputRecords = attempt.getReduceOutputRecords();
- heapMegabytes = (job.getJobReduceMB() > 0) ? job.getJobReduceMB()
- : job.getHeapMegabytes();
+ heapMegabytes =
+ (job.getJobReduceMB() > 0) ? job.getJobReduceMB() : job
+ .getHeapMegabytes();
}
break;
}
- TaskInfo taskInfo = new TaskInfo(inputBytes, (int) inputRecords,
- outputBytes, (int) outputRecords, (int) heapMegabytes);
+ TaskInfo taskInfo =
+ new TaskInfo(inputBytes, (int) inputRecords, outputBytes,
+ (int) outputRecords, (int) heapMegabytes);
return taskInfo;
}
@@ -869,8 +873,9 @@
private LoggedTaskAttempt getLoggedTaskAttempt(TaskType taskType,
int taskNumber, int taskAttemptNumber) {
buildMaps();
- TaskAttemptID id = new TaskAttemptID(getMaskedTaskID(taskType, taskNumber),
- taskAttemptNumber);
+ TaskAttemptID id =
+ new TaskAttemptID(getMaskedTaskID(taskType, taskNumber),
+ taskAttemptNumber);
return loggedTaskAttemptMap.get(id);
}