You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:38:23 UTC
svn commit: r1077079 [2/11] - in
/hadoop/common/branches/branch-0.20-security-patches: ./ src/contrib/
src/contrib/gridmix/ src/contrib/gridmix/ivy/ src/contrib/gridmix/src/
src/contrib/gridmix/src/java/ src/contrib/gridmix/src/java/org/
src/contrib/gr...
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,523 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Formatter;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Synthetic job generated from a trace description.
+ */
+class GridmixJob implements Callable<Job>, Delayed {
+
+ public static final String JOBNAME = "GRIDMIX";
+ public static final String ORIGNAME = "gridmix.job.name.original";
+ public static final Log LOG = LogFactory.getLog(GridmixJob.class);
+
+ private static final ThreadLocal<Formatter> nameFormat =
+ new ThreadLocal<Formatter>() {
+ @Override
+ protected Formatter initialValue() {
+ final StringBuilder sb = new StringBuilder(JOBNAME.length() + 5);
+ sb.append(JOBNAME);
+ return new Formatter(sb);
+ }
+ };
+
+ private final int seq;
+ private final Path outdir;
+ protected final Job job;
+ private final JobStory jobdesc;
+ private final long submissionTimeNanos;
+
+ public GridmixJob(Configuration conf, long submissionMillis,
+ JobStory jobdesc, Path outRoot, int seq) throws IOException {
+ ((StringBuilder)nameFormat.get().out()).setLength(JOBNAME.length());
+ job = new Job(conf, nameFormat.get().format("%05d", seq).toString());
+ submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
+ submissionMillis, TimeUnit.MILLISECONDS);
+ this.jobdesc = jobdesc;
+ this.seq = seq;
+ outdir = new Path(outRoot, "" + seq);
+ }
+
+ protected GridmixJob(Configuration conf, long submissionMillis, String name)
+ throws IOException {
+ job = new Job(conf, name);
+ submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
+ submissionMillis, TimeUnit.MILLISECONDS);
+ jobdesc = null;
+ outdir = null;
+ seq = -1;
+ }
+
+ public String toString() {
+ return job.getJobName();
+ }
+
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(submissionTimeNanos - System.nanoTime(),
+ TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public int compareTo(Delayed other) {
+ if (this == other) {
+ return 0;
+ }
+ if (other instanceof GridmixJob) {
+ final long otherNanos = ((GridmixJob)other).submissionTimeNanos;
+ if (otherNanos < submissionTimeNanos) {
+ return 1;
+ }
+ if (otherNanos > submissionTimeNanos) {
+ return -1;
+ }
+ return id() - ((GridmixJob)other).id();
+ }
+ final long diff =
+ getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS);
+ return 0 == diff ? 0 : (diff > 0 ? 1 : -1);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ // not possible unless job is cloned; all jobs should be unique
+ return other instanceof GridmixJob && id() == ((GridmixJob)other).id();
+ }
+
+ @Override
+ public int hashCode() {
+ return id();
+ }
+
+ int id() {
+ return seq;
+ }
+
+ Job getJob() {
+ return job;
+ }
+
+ JobStory getJobDesc() {
+ return jobdesc;
+ }
+
+ public Job call() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ job.setMapperClass(GridmixMapper.class);
+ job.setReducerClass(GridmixReducer.class);
+ job.setNumReduceTasks(jobdesc.getNumberReduces());
+ job.setMapOutputKeyClass(GridmixKey.class);
+ job.setMapOutputValueClass(GridmixRecord.class);
+ job.setSortComparatorClass(GridmixKey.Comparator.class);
+ job.setGroupingComparatorClass(SpecGroupingComparator.class);
+ job.setInputFormatClass(GridmixInputFormat.class);
+ job.setOutputFormatClass(RawBytesOutputFormat.class);
+ job.setPartitionerClass(DraftPartitioner.class);
+ job.setJarByClass(GridmixJob.class);
+ job.getConfiguration().setInt("gridmix.job.seq", seq);
+ job.getConfiguration().set(ORIGNAME, null == jobdesc.getJobID()
+ ? "<unknown>" : jobdesc.getJobID().toString());
+ job.getConfiguration().setBoolean("mapred.used.genericoptionsparser", true);
+ FileInputFormat.addInputPath(job, new Path("ignored"));
+ FileOutputFormat.setOutputPath(job, outdir);
+ job.submit();
+ return job;
+ }
+
+ public static class DraftPartitioner<V> extends Partitioner<GridmixKey,V> {
+ public int getPartition(GridmixKey key, V value, int numReduceTasks) {
+ return key.getPartition();
+ }
+ }
+
+ public static class SpecGroupingComparator
+ implements RawComparator<GridmixKey> {
+ private final DataInputBuffer di = new DataInputBuffer();
+ private final byte[] reset = di.getData();
+ @Override
+ public int compare(GridmixKey g1, GridmixKey g2) {
+ final byte t1 = g1.getType();
+ final byte t2 = g2.getType();
+ if (t1 == GridmixKey.REDUCE_SPEC ||
+ t2 == GridmixKey.REDUCE_SPEC) {
+ return t1 - t2;
+ }
+ assert t1 == GridmixKey.DATA;
+ assert t2 == GridmixKey.DATA;
+ return g1.compareTo(g2);
+ }
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ try {
+ final int ret;
+ di.reset(b1, s1, l1);
+ final int x1 = WritableUtils.readVInt(di);
+ di.reset(b2, s2, l2);
+ final int x2 = WritableUtils.readVInt(di);
+ final int t1 = b1[s1 + x1];
+ final int t2 = b2[s2 + x2];
+ if (t1 == GridmixKey.REDUCE_SPEC ||
+ t2 == GridmixKey.REDUCE_SPEC) {
+ ret = t1 - t2;
+ } else {
+ assert t1 == GridmixKey.DATA;
+ assert t2 == GridmixKey.DATA;
+ ret =
+ WritableComparator.compareBytes(b1, s1, x1, b2, s2, x2);
+ }
+ di.reset(reset, 0, 0);
+ return ret;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static class GridmixMapper
+ extends Mapper<NullWritable,GridmixRecord,GridmixKey,GridmixRecord> {
+
+ private double acc;
+ private double ratio;
+ private final ArrayList<RecordFactory> reduces =
+ new ArrayList<RecordFactory>();
+ private final Random r = new Random();
+
+ private final GridmixKey key = new GridmixKey();
+ private final GridmixRecord val = new GridmixRecord();
+
+ @Override
+ protected void setup(Context ctxt)
+ throws IOException, InterruptedException {
+ final Configuration conf = ctxt.getConfiguration();
+ final GridmixSplit split = (GridmixSplit) ctxt.getInputSplit();
+ final int maps = split.getMapCount();
+ final long[] reduceBytes = split.getOutputBytes();
+ final long[] reduceRecords = split.getOutputRecords();
+
+ long totalRecords = 0L;
+ final int nReduces = ctxt.getNumReduceTasks();
+ if (nReduces > 0) {
+ int idx = 0;
+ int id = split.getId();
+ for (int i = 0; i < nReduces; ++i) {
+ final GridmixKey.Spec spec = new GridmixKey.Spec();
+ if (i == id) {
+ spec.bytes_out = split.getReduceBytes(idx);
+ spec.rec_out = split.getReduceRecords(idx);
+ ++idx;
+ id += maps;
+ }
+ reduces.add(new IntermediateRecordFactory(
+ new AvgRecordFactory(reduceBytes[i], reduceRecords[i], conf),
+ i, reduceRecords[i], spec, conf));
+ totalRecords += reduceRecords[i];
+ }
+ } else {
+ reduces.add(new AvgRecordFactory(reduceBytes[0], reduceRecords[0],
+ conf));
+ totalRecords = reduceRecords[0];
+ }
+ final long splitRecords = split.getInputRecords();
+ final long inputRecords = splitRecords <= 0 && split.getLength() >= 0
+ ? Math.max(1,
+ split.getLength() / conf.getInt("gridmix.missing.rec.size", 64*1024))
+ : splitRecords;
+ ratio = totalRecords / (1.0 * inputRecords);
+ acc = 0.0;
+ }
+
+ @Override
+ public void map(NullWritable ignored, GridmixRecord rec,
+ Context context) throws IOException, InterruptedException {
+ acc += ratio;
+ while (acc >= 1.0 && !reduces.isEmpty()) {
+ key.setSeed(r.nextLong());
+ val.setSeed(r.nextLong());
+ final int idx = r.nextInt(reduces.size());
+ final RecordFactory f = reduces.get(idx);
+ if (!f.next(key, val)) {
+ reduces.remove(idx);
+ continue;
+ }
+ context.write(key, val);
+ acc -= 1.0;
+ }
+ }
+
+ @Override
+ public void cleanup(Context context)
+ throws IOException, InterruptedException {
+ for (RecordFactory factory : reduces) {
+ key.setSeed(r.nextLong());
+ while (factory.next(key, val)) {
+ context.write(key, val);
+ key.setSeed(r.nextLong());
+ }
+ }
+ }
+ }
+
+ public static class GridmixReducer
+ extends Reducer<GridmixKey,GridmixRecord,NullWritable,GridmixRecord> {
+
+ private final Random r = new Random();
+ private final GridmixRecord val = new GridmixRecord();
+
+ private double acc;
+ private double ratio;
+ private RecordFactory factory;
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ if (!context.nextKey() ||
+ context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) {
+ throw new IOException("Missing reduce spec");
+ }
+ long outBytes = 0L;
+ long outRecords = 0L;
+ long inRecords = 0L;
+ for (GridmixRecord ignored : context.getValues()) {
+ final GridmixKey spec = context.getCurrentKey();
+ inRecords += spec.getReduceInputRecords();
+ outBytes += spec.getReduceOutputBytes();
+ outRecords += spec.getReduceOutputRecords();
+ }
+ if (0 == outRecords && inRecords > 0) {
+ LOG.info("Spec output bytes w/o records. Using input record count");
+ outRecords = inRecords;
+ }
+ factory =
+ new AvgRecordFactory(outBytes, outRecords, context.getConfiguration());
+ ratio = outRecords / (1.0 * inRecords);
+ acc = 0.0;
+ }
+ @Override
+ protected void reduce(GridmixKey key, Iterable<GridmixRecord> values,
+ Context context) throws IOException, InterruptedException {
+ for (GridmixRecord ignored : values) {
+ acc += ratio;
+ while (acc >= 1.0 && factory.next(null, val)) {
+ context.write(NullWritable.get(), val);
+ acc -= 1.0;
+ }
+ }
+ }
+ @Override
+ protected void cleanup(Context context)
+ throws IOException, InterruptedException {
+ val.setSeed(r.nextLong());
+ while (factory.next(null, val)) {
+ context.write(NullWritable.get(), val);
+ val.setSeed(r.nextLong());
+ }
+ }
+ }
+
+ static class GridmixRecordReader
+ extends RecordReader<NullWritable,GridmixRecord> {
+
+ private RecordFactory factory;
+ private final Random r = new Random();
+ private final GridmixRecord val = new GridmixRecord();
+
+ public GridmixRecordReader() { }
+
+ @Override
+ public void initialize(InputSplit genericSplit, TaskAttemptContext ctxt)
+ throws IOException, InterruptedException {
+ final GridmixSplit split = (GridmixSplit)genericSplit;
+ final Configuration conf = ctxt.getConfiguration();
+ factory = new ReadRecordFactory(split.getLength(),
+ split.getInputRecords(), new FileQueue(split, conf), conf);
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ val.setSeed(r.nextLong());
+ return factory.next(null, val);
+ }
+ @Override
+ public float getProgress() throws IOException {
+ return factory.getProgress();
+ }
+ @Override
+ public NullWritable getCurrentKey() {
+ return NullWritable.get();
+ }
+ @Override
+ public GridmixRecord getCurrentValue() {
+ return val;
+ }
+ @Override
+ public void close() throws IOException {
+ factory.close();
+ }
+ }
+
+ static class GridmixInputFormat
+ extends InputFormat<NullWritable,GridmixRecord> {
+
+ @Override
+ public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+ return pullDescription(jobCtxt.getConfiguration().getInt(
+ "gridmix.job.seq", -1));
+ }
+ @Override
+ public RecordReader<NullWritable,GridmixRecord> createRecordReader(
+ InputSplit split, final TaskAttemptContext taskContext)
+ throws IOException {
+ return new GridmixRecordReader();
+ }
+ }
+
+ static class RawBytesOutputFormat<K>
+ extends FileOutputFormat<K,GridmixRecord> {
+
+ @Override
+ public RecordWriter<K,GridmixRecord> getRecordWriter(
+ TaskAttemptContext job) throws IOException {
+
+ Path file = getDefaultWorkFile(job, "");
+ FileSystem fs = file.getFileSystem(job.getConfiguration());
+ final FSDataOutputStream fileOut = fs.create(file, false);
+ return new RecordWriter<K,GridmixRecord>() {
+ @Override
+ public void write(K ignored, GridmixRecord value)
+ throws IOException {
+ value.writeRandom(fileOut, value.getSize());
+ }
+ @Override
+ public void close(TaskAttemptContext ctxt) throws IOException {
+ fileOut.close();
+ }
+ };
+ }
+ }
+
+ // TODO replace with ThreadLocal submitter?
+ private static final ConcurrentHashMap<Integer,List<InputSplit>> descCache =
+ new ConcurrentHashMap<Integer,List<InputSplit>>();
+
+ static void pushDescription(int seq, List<InputSplit> splits) {
+ if (null != descCache.putIfAbsent(seq, splits)) {
+ throw new IllegalArgumentException("Description exists for id " + seq);
+ }
+ }
+
+ static List<InputSplit> pullDescription(int seq) {
+ return descCache.remove(seq);
+ }
+
+ // not nesc when TL
+ static void clearAll() {
+ descCache.clear();
+ }
+
+ void buildSplits(FilePool inputDir) throws IOException {
+ long mapInputBytesTotal = 0L;
+ long mapOutputBytesTotal = 0L;
+ long mapOutputRecordsTotal = 0L;
+ final JobStory jobdesc = getJobDesc();
+ if (null == jobdesc) {
+ return;
+ }
+ final int maps = jobdesc.getNumberMaps();
+ final int reds = jobdesc.getNumberReduces();
+ for (int i = 0; i < maps; ++i) {
+ final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
+ mapInputBytesTotal += info.getInputBytes();
+ mapOutputBytesTotal += info.getOutputBytes();
+ mapOutputRecordsTotal += info.getOutputRecords();
+ }
+ final double[] reduceRecordRatio = new double[reds];
+ final double[] reduceByteRatio = new double[reds];
+ for (int i = 0; i < reds; ++i) {
+ final TaskInfo info = jobdesc.getTaskInfo(TaskType.REDUCE, i);
+ reduceByteRatio[i] = info.getInputBytes() / (1.0 * mapOutputBytesTotal);
+ reduceRecordRatio[i] =
+ info.getInputRecords() / (1.0 * mapOutputRecordsTotal);
+ }
+ final InputStriper striper = new InputStriper(inputDir, mapInputBytesTotal);
+ final List<InputSplit> splits = new ArrayList<InputSplit>();
+ for (int i = 0; i < maps; ++i) {
+ final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0);
+ final long[] specBytes = new long[nSpec];
+ final long[] specRecords = new long[nSpec];
+ for (int j = 0; j < nSpec; ++j) {
+ final TaskInfo info =
+ jobdesc.getTaskInfo(TaskType.REDUCE, i + j * maps);
+ specBytes[j] = info.getOutputBytes();
+ specRecords[j] = info.getOutputRecords();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
+ i + j * maps, info.getOutputRecords(), info.getOutputBytes()));
+ }
+ }
+ final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
+ splits.add(new GridmixSplit(striper.splitFor(inputDir,
+ info.getInputBytes(), 3), maps, i,
+ info.getInputBytes(), info.getInputRecords(),
+ info.getOutputBytes(), info.getOutputRecords(),
+ reduceByteRatio, reduceRecordRatio, specBytes, specRecords));
+ }
+ pushDescription(id(), splits);
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.WritableComparator;
+
+class GridmixKey extends GridmixRecord {
+ static final byte REDUCE_SPEC = 0;
+ static final byte DATA = 1;
+
+ static final int META_BYTES = 1;
+
+ private byte type;
+ private int partition; // NOT serialized
+ private Spec spec = new Spec();
+
+ GridmixKey() {
+ this(DATA, 1, 0L);
+ }
+ GridmixKey(byte type, int size, long seed) {
+ super(size, seed);
+ this.type = type;
+ // setting type may change pcnt random bytes
+ setSize(size);
+ }
+
+ @Override
+ public int getSize() {
+ switch (type) {
+ case REDUCE_SPEC:
+ return super.getSize() + spec.getSize() + META_BYTES;
+ case DATA:
+ return super.getSize() + META_BYTES;
+ default:
+ throw new IllegalStateException("Invalid type: " + type);
+ }
+ }
+
+ @Override
+ public void setSize(int size) {
+ switch (type) {
+ case REDUCE_SPEC:
+ super.setSize(size - (META_BYTES + spec.getSize()));
+ break;
+ case DATA:
+ super.setSize(size - META_BYTES);
+ break;
+ default:
+ throw new IllegalStateException("Invalid type: " + type);
+ }
+ }
+
+ /**
+ * Partition is not serialized.
+ */
+ public int getPartition() {
+ return partition;
+ }
+ public void setPartition(int partition) {
+ this.partition = partition;
+ }
+
+ public long getReduceInputRecords() {
+ assert REDUCE_SPEC == getType();
+ return spec.rec_in;
+ }
+ public void setReduceInputRecords(long rec_in) {
+ assert REDUCE_SPEC == getType();
+ final int origSize = getSize();
+ spec.rec_in = rec_in;
+ setSize(origSize);
+ }
+
+ public long getReduceOutputRecords() {
+ assert REDUCE_SPEC == getType();
+ return spec.rec_out;
+ }
+ public void setReduceOutputRecords(long rec_out) {
+ assert REDUCE_SPEC == getType();
+ final int origSize = getSize();
+ spec.rec_out = rec_out;
+ setSize(origSize);
+ }
+
+ public long getReduceOutputBytes() {
+ assert REDUCE_SPEC == getType();
+ return spec.bytes_out;
+ };
+ public void setReduceOutputBytes(long b_out) {
+ assert REDUCE_SPEC == getType();
+ final int origSize = getSize();
+ spec.bytes_out = b_out;
+ setSize(origSize);
+ }
+
+ public byte getType() {
+ return type;
+ }
+ public void setType(byte type) throws IOException {
+ final int origSize = getSize();
+ switch (type) {
+ case REDUCE_SPEC:
+ case DATA:
+ this.type = type;
+ break;
+ default:
+ throw new IOException("Invalid type: " + type);
+ }
+ setSize(origSize);
+ }
+
+ public void setSpec(Spec spec) {
+ assert REDUCE_SPEC == getType();
+ final int origSize = getSize();
+ this.spec.set(spec);
+ setSize(origSize);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ setType(in.readByte());
+ if (REDUCE_SPEC == getType()) {
+ spec.readFields(in);
+ }
+ }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ final byte t = getType();
+ out.writeByte(t);
+ if (REDUCE_SPEC == t) {
+ spec.write(out);
+ }
+ }
+ int fixedBytes() {
+ return super.fixedBytes() +
+ (REDUCE_SPEC == getType() ? spec.getSize() : 0) + META_BYTES;
+ }
+ @Override
+ public int compareTo(GridmixRecord other) {
+ final GridmixKey o = (GridmixKey) other;
+ final byte t1 = getType();
+ final byte t2 = o.getType();
+ if (t1 != t2) {
+ return t1 - t2;
+ }
+ return super.compareTo(other);
+ }
+
+ /**
+ * Note that while the spec is not explicitly included, changing the spec
+ * may change its size, which will affect equality.
+ */
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other != null && other.getClass() == getClass()) {
+ final GridmixKey o = ((GridmixKey)other);
+ return getType() == o.getType() && super.equals(o);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode() ^ getType();
+ }
+
+ public static class Spec implements Writable {
+ long rec_in;
+ long rec_out;
+ long bytes_out;
+ public Spec() { }
+
+ public void set(Spec other) {
+ rec_in = other.rec_in;
+ bytes_out = other.bytes_out;
+ rec_out = other.rec_out;
+ }
+
+ public int getSize() {
+ return WritableUtils.getVIntSize(rec_in) +
+ WritableUtils.getVIntSize(rec_out) +
+ WritableUtils.getVIntSize(bytes_out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ rec_in = WritableUtils.readVLong(in);
+ rec_out = WritableUtils.readVLong(in);
+ bytes_out = WritableUtils.readVLong(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVLong(out, rec_in);
+ WritableUtils.writeVLong(out, rec_out);
+ WritableUtils.writeVLong(out, bytes_out);
+ }
+ }
+
+ public static class Comparator extends GridmixRecord.Comparator {
+
+ private final DataInputBuffer di = new DataInputBuffer();
+ private final byte[] reset = di.getData();
+
+ public Comparator() {
+ super(GridmixKey.class);
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ try {
+ di.reset(b1, s1, l1);
+ final int x1 = WritableUtils.readVInt(di);
+ di.reset(b2, s2, l2);
+ final int x2 = WritableUtils.readVInt(di);
+ final int ret = (b1[s1 + x1] != b2[s2 + x2])
+ ? b1[s1 + x1] - b2[s2 + x2]
+ : super.compare(b1, s1, x1, b2, s2, x2);
+ di.reset(reset, 0, 0);
+ return ret;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static {
+ WritableComparator.define(GridmixKey.class, new Comparator());
+ }
+ }
+}
+
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+
+class GridmixRecord implements WritableComparable<GridmixRecord> {
+
+ private static final int FIXED_BYTES = 1;
+ private int size = -1;
+ private long seed;
+ private final DataInputBuffer dib =
+ new DataInputBuffer();
+ private final DataOutputBuffer dob =
+ new DataOutputBuffer(Long.SIZE / Byte.SIZE);
+ private byte[] literal = dob.getData();
+
+ GridmixRecord() {
+ this(1, 0L);
+ }
+
+ GridmixRecord(int size, long seed) {
+ this.seed = seed;
+ setSizeInternal(size);
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public void setSize(int size) {
+ setSizeInternal(size);
+ }
+
+ private void setSizeInternal(int size) {
+ this.size = Math.max(1, size);
+ try {
+ seed = maskSeed(seed, this.size);
+ dob.reset();
+ dob.writeLong(seed);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public final void setSeed(long seed) {
+ this.seed = seed;
+ }
+
+ /** Marsaglia, 2003. */
+ long nextRand(long x) {
+ x ^= (x << 13);
+ x ^= (x >>> 7);
+ return (x ^= (x << 17));
+ }
+
+ public void writeRandom(DataOutput out, final int size) throws IOException {
+ long tmp = seed;
+ out.writeLong(tmp);
+ int i = size - (Long.SIZE / Byte.SIZE);
+ while (i > Long.SIZE / Byte.SIZE - 1) {
+ tmp = nextRand(tmp);
+ out.writeLong(tmp);
+ i -= Long.SIZE / Byte.SIZE;
+ }
+ for (tmp = nextRand(tmp); i > 0; --i) {
+ out.writeByte((int)(tmp & 0xFF));
+ tmp >>>= Byte.SIZE;
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ size = WritableUtils.readVInt(in);
+ int payload = size - WritableUtils.getVIntSize(size);
+ if (payload > Long.SIZE / Byte.SIZE) {
+ seed = in.readLong();
+ payload -= Long.SIZE / Byte.SIZE;
+ } else {
+ Arrays.fill(literal, (byte)0);
+ in.readFully(literal, 0, payload);
+ dib.reset(literal, 0, literal.length);
+ seed = dib.readLong();
+ payload = 0;
+ }
+ final int vBytes = in.skipBytes(payload);
+ if (vBytes != payload) {
+ throw new EOFException("Expected " + payload + ", read " + vBytes);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // data bytes including vint encoding
+ WritableUtils.writeVInt(out, size);
+ final int payload = size - WritableUtils.getVIntSize(size);
+ if (payload > Long.SIZE / Byte.SIZE) {
+ writeRandom(out, payload);
+ } else if (payload > 0) {
+ out.write(literal, 0, payload);
+ }
+ }
+
+ @Override
+ public int compareTo(GridmixRecord other) {
+ return compareSeed(other.seed,
+ Math.max(0, other.getSize() - other.fixedBytes()));
+ }
+
+ int fixedBytes() {
+ // min vint size
+ return FIXED_BYTES;
+ }
+
+ private static long maskSeed(long sd, int sz) {
+ // Don't use fixedBytes here; subclasses will set intended random len
+ if (sz <= FIXED_BYTES) {
+ sd = 0L;
+ } else if (sz < Long.SIZE / Byte.SIZE + FIXED_BYTES) {
+ final int tmp = sz - FIXED_BYTES;
+ final long mask = (1L << (Byte.SIZE * tmp)) - 1;
+ sd &= mask << (Byte.SIZE * (Long.SIZE / Byte.SIZE - tmp));
+ }
+ return sd;
+ }
+
+ int compareSeed(long jSeed, int jSize) {
+ final int iSize = Math.max(0, getSize() - fixedBytes());
+ final int seedLen = Math.min(iSize, jSize) + FIXED_BYTES;
+ jSeed = maskSeed(jSeed, seedLen);
+ long iSeed = maskSeed(seed, seedLen);
+ final int cmplen = Math.min(iSize, jSize);
+ for (int i = 0; i < cmplen; i += Byte.SIZE) {
+ final int k = cmplen - i;
+ for (long j = Long.SIZE - Byte.SIZE;
+ j >= Math.max(0, Long.SIZE / Byte.SIZE - k) * Byte.SIZE;
+ j -= Byte.SIZE) {
+ final int xi = (int)((iSeed >>> j) & 0xFFL);
+ final int xj = (int)((jSeed >>> j) & 0xFFL);
+ if (xi != xj) {
+ return xi - xj;
+ }
+ }
+ iSeed = nextRand(iSeed);
+ jSeed = nextRand(jSeed);
+ }
+ return iSize - jSize;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other != null && other.getClass() == getClass()) {
+ final GridmixRecord o = ((GridmixRecord)other);
+ return getSize() == o.getSize() && seed == o.seed;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int)(seed * getSize());
+ }
+
+ public static class Comparator extends WritableComparator {
+
+ public Comparator() {
+ super(GridmixRecord.class);
+ }
+
+ public Comparator(Class<? extends WritableComparable<?>> sub) {
+ super(sub);
+ }
+
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int n1 = WritableUtils.decodeVIntSize(b1[s1]);
+ int n2 = WritableUtils.decodeVIntSize(b2[s2]);
+ n1 -= WritableUtils.getVIntSize(n1);
+ n2 -= WritableUtils.getVIntSize(n2);
+ return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
+ }
+
+ static {
+ WritableComparator.define(GridmixRecord.class, new Comparator());
+ }
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableUtils;
+
+class GridmixSplit extends CombineFileSplit {
+ private int id;
+ private int nSpec;
+ private int maps;
+ private int reduces;
+ private long inputRecords;
+ private long outputBytes;
+ private long outputRecords;
+ private long maxMemory;
+ private double[] reduceBytes = new double[0];
+ private double[] reduceRecords = new double[0];
+
+ // Spec for reduces id mod this
+ private long[] reduceOutputBytes = new long[0];
+ private long[] reduceOutputRecords = new long[0];
+
+ GridmixSplit() {
+ super();
+ }
+
+ public GridmixSplit(CombineFileSplit cfsplit, int maps, int id,
+ long inputBytes, long inputRecords, long outputBytes,
+ long outputRecords, double[] reduceBytes, double[] reduceRecords,
+ long[] reduceOutputBytes, long[] reduceOutputRecords)
+ throws IOException {
+ super(cfsplit);
+ this.id = id;
+ this.maps = maps;
+ reduces = reduceBytes.length;
+ this.inputRecords = inputRecords;
+ this.outputBytes = outputBytes;
+ this.outputRecords = outputRecords;
+ this.reduceBytes = reduceBytes;
+ this.reduceRecords = reduceRecords;
+ nSpec = reduceOutputBytes.length;
+ this.reduceOutputBytes = reduceOutputBytes;
+ this.reduceOutputRecords = reduceOutputRecords;
+ }
+ public int getId() {
+ return id;
+ }
+ public int getMapCount() {
+ return maps;
+ }
+ public long getInputRecords() {
+ return inputRecords;
+ }
+ public long[] getOutputBytes() {
+ if (0 == reduces) {
+ return new long[] { outputBytes };
+ }
+ final long[] ret = new long[reduces];
+ for (int i = 0; i < reduces; ++i) {
+ ret[i] = Math.round(outputBytes * reduceBytes[i]);
+ }
+ return ret;
+ }
+ public long[] getOutputRecords() {
+ if (0 == reduces) {
+ return new long[] { outputRecords };
+ }
+ final long[] ret = new long[reduces];
+ for (int i = 0; i < reduces; ++i) {
+ ret[i] = Math.round(outputRecords * reduceRecords[i]);
+ }
+ return ret;
+ }
+ public long getReduceBytes(int i) {
+ return reduceOutputBytes[i];
+ }
+ public long getReduceRecords(int i) {
+ return reduceOutputRecords[i];
+ }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ WritableUtils.writeVInt(out, id);
+ WritableUtils.writeVInt(out, maps);
+ WritableUtils.writeVLong(out, inputRecords);
+ WritableUtils.writeVLong(out, outputBytes);
+ WritableUtils.writeVLong(out, outputRecords);
+ WritableUtils.writeVLong(out, maxMemory);
+ WritableUtils.writeVInt(out, reduces);
+ for (int i = 0; i < reduces; ++i) {
+ out.writeDouble(reduceBytes[i]);
+ out.writeDouble(reduceRecords[i]);
+ }
+ WritableUtils.writeVInt(out, nSpec);
+ for (int i = 0; i < nSpec; ++i) {
+ WritableUtils.writeVLong(out, reduceOutputBytes[i]);
+ WritableUtils.writeVLong(out, reduceOutputRecords[i]);
+ }
+ }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ id = WritableUtils.readVInt(in);
+ maps = WritableUtils.readVInt(in);
+ inputRecords = WritableUtils.readVLong(in);
+ outputBytes = WritableUtils.readVLong(in);
+ outputRecords = WritableUtils.readVLong(in);
+ maxMemory = WritableUtils.readVLong(in);
+ reduces = WritableUtils.readVInt(in);
+ if (reduceBytes.length < reduces) {
+ reduceBytes = new double[reduces];
+ reduceRecords = new double[reduces];
+ }
+ for (int i = 0; i < reduces; ++i) {
+ reduceBytes[i] = in.readDouble();
+ reduceRecords[i] = in.readDouble();
+ }
+ nSpec = WritableUtils.readVInt(in);
+ if (reduceOutputBytes.length < nSpec) {
+ reduceOutputBytes = new long[nSpec];
+ reduceOutputRecords = new long[nSpec];
+ }
+ for (int i = 0; i < nSpec; ++i) {
+ reduceOutputBytes[i] = WritableUtils.readVLong(in);
+ reduceOutputRecords[i] = WritableUtils.readVLong(in);
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Given a {@link #FilePool}, obtain a set of files capable of satisfying
+ * a full set of splits, then iterate over each source to fill the request.
+ */
+class InputStriper {
+ public static final Log LOG = LogFactory.getLog(InputStriper.class);
+ int idx;
+ long currentStart;
+ FileStatus current;
+ final List<FileStatus> files = new ArrayList<FileStatus>();
+
+ /**
+ * @param inputDir Pool from which files are requested.
+ * @param mapBytes Sum of all expected split requests.
+ */
+ InputStriper(FilePool inputDir, long mapBytes)
+ throws IOException {
+ final long inputBytes = inputDir.getInputFiles(mapBytes, files);
+ if (mapBytes > inputBytes) {
+ LOG.warn("Using " + inputBytes + "/" + mapBytes + " bytes");
+ }
+ if (files.isEmpty() && mapBytes > 0) {
+ throw new IOException("Failed to satisfy request for " + mapBytes);
+ }
+ current = files.isEmpty() ? null : files.get(0);
+ }
+
+ /**
+ * @param inputDir Pool used to resolve block locations.
+ * @param bytes Target byte count
+ * @param nLocs Number of block locations per split.
+ * @return A set of files satisfying the byte count, with locations weighted
+ * to the dominating proportion of input bytes.
+ */
+ CombineFileSplit splitFor(FilePool inputDir, long bytes, int nLocs)
+ throws IOException {
+ final ArrayList<Path> paths = new ArrayList<Path>();
+ final ArrayList<Long> start = new ArrayList<Long>();
+ final ArrayList<Long> length = new ArrayList<Long>();
+ final HashMap<String,Double> sb = new HashMap<String,Double>();
+ do {
+ paths.add(current.getPath());
+ start.add(currentStart);
+ final long fromFile = Math.min(bytes, current.getLen() - currentStart);
+ length.add(fromFile);
+ for (BlockLocation loc :
+ inputDir.locationsFor(current, currentStart, fromFile)) {
+ final double tedium = loc.getLength() / (1.0 * bytes);
+ for (String l : loc.getHosts()) {
+ Double j = sb.get(l);
+ if (null == j) {
+ sb.put(l, tedium);
+ } else {
+ sb.put(l, j.doubleValue() + tedium);
+ }
+ }
+ }
+ currentStart += fromFile;
+ bytes -= fromFile;
+ if (current.getLen() - currentStart == 0) {
+ current = files.get(++idx % files.size());
+ currentStart = 0;
+ }
+ } while (bytes > 0);
+ final ArrayList<Entry<String,Double>> sort =
+ new ArrayList<Entry<String,Double>>(sb.entrySet());
+ Collections.sort(sort, hostRank);
+ final String[] hosts = new String[Math.min(nLocs, sort.size())];
+ for (int i = 0; i < nLocs && i < sort.size(); ++i) {
+ hosts[i] = sort.get(i).getKey();
+ }
+ return new CombineFileSplit(paths.toArray(new Path[0]),
+ toLongArray(start), toLongArray(length), hosts);
+ }
+
+ private long[] toLongArray(final ArrayList<Long> sigh) {
+ final long[] ret = new long[sigh.size()];
+ for (int i = 0; i < ret.length; ++i) {
+ ret[i] = sigh.get(i);
+ }
+ return ret;
+ }
+
+ static final Comparator<Entry<String,Double>> hostRank =
+ new Comparator<Entry<String,Double>>() {
+ public int compare(Entry<String,Double> a, Entry<String,Double> b) {
+ final double va = a.getValue();
+ final double vb = b.getValue();
+ return va > vb ? -1 : va < vb ? 1 : 0;
+ }
+ };
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Factory passing reduce specification as its last record.
+ */
+class IntermediateRecordFactory extends RecordFactory {
+
+ private final GridmixKey.Spec spec;
+ private final RecordFactory factory;
+ private final int partition;
+ private final long targetRecords;
+ private boolean done = false;
+ private long accRecords = 0L;
+
+ /**
+ * @param targetBytes Expected byte count.
+ * @param targetRecords Expected record count; will emit spec records after
+ * this boundary is passed.
+ * @param partition Reduce to which records are emitted.
+ * @param spec Specification to emit.
+ * @param conf Unused.
+ */
+ public IntermediateRecordFactory(long targetBytes, long targetRecords,
+ int partition, GridmixKey.Spec spec, Configuration conf) {
+ this(new AvgRecordFactory(targetBytes, targetRecords, conf), partition,
+ targetRecords, spec, conf);
+ }
+
+ /**
+ * @param factory Factory from which byte/record counts are obtained.
+ * @param partition Reduce to which records are emitted.
+ * @param targetRecords Expected record count; will emit spec records after
+ * this boundary is passed.
+ * @param spec Specification to emit.
+ * @param conf Unused.
+ */
+ public IntermediateRecordFactory(RecordFactory factory, int partition,
+ long targetRecords, GridmixKey.Spec spec, Configuration conf) {
+ this.spec = spec;
+ this.factory = factory;
+ this.partition = partition;
+ this.targetRecords = targetRecords;
+ }
+
+ @Override
+ public boolean next(GridmixKey key, GridmixRecord val) throws IOException {
+ assert key != null;
+ final boolean rslt = factory.next(key, val);
+ ++accRecords;
+ if (rslt) {
+ if (accRecords < targetRecords) {
+ key.setType(GridmixKey.DATA);
+ } else {
+ final int orig = key.getSize();
+ key.setType(GridmixKey.REDUCE_SPEC);
+ spec.rec_in = accRecords;
+ key.setSpec(spec);
+ val.setSize(val.getSize() - (key.getSize() - orig));
+ // reset counters
+ accRecords = 0L;
+ spec.bytes_out = 0L;
+ spec.rec_out = 0L;
+ done = true;
+ }
+ } else if (!done) {
+ // ensure spec emitted
+ key.setType(GridmixKey.REDUCE_SPEC);
+ key.setPartition(partition);
+ key.setSize(0);
+ val.setSize(0);
+ spec.rec_in = 0L;
+ key.setSpec(spec);
+ done = true;
+ return true;
+ }
+ key.setPartition(partition);
+ return rslt;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return factory.getProgress();
+ }
+
+ @Override
+ public void close() throws IOException {
+ factory.close();
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+import org.apache.hadoop.tools.rumen.ZombieJobProducer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ * Component reading job traces generated by Rumen. Each job in the trace is
+ * assigned a sequence number and given a submission time relative to the
+ * job that preceded it. Jobs are enqueued in the JobSubmitter provided at
+ * construction.
+ * @see org.apache.hadoop.tools.rumen.HadoopLogsAnalyzer
+ */
+class JobFactory implements Gridmix.Component<Void> {
+
+ public static final Log LOG = LogFactory.getLog(JobFactory.class);
+
+ private final Path scratch;
+ private final float rateFactor;
+ private final Configuration conf;
+ private final ReaderThread rThread;
+ private final AtomicInteger sequence;
+ private final JobSubmitter submitter;
+ private final CountDownLatch startFlag;
+ private volatile IOException error = null;
+ protected final JobStoryProducer jobProducer;
+
+ /**
+ * Creating a new instance does not start the thread.
+ * @param submitter Component to which deserialized jobs are passed
+ * @param jobTrace Stream of job traces with which to construct a
+ * {@link org.apache.hadoop.tools.rumen.ZombieJobProducer}
+ * @param scratch Directory into which to write output from simulated jobs
+ * @param conf Config passed to all jobs to be submitted
+ * @param startFlag Latch released from main to start pipeline
+ */
+ public JobFactory(JobSubmitter submitter, InputStream jobTrace,
+ Path scratch, Configuration conf, CountDownLatch startFlag)
+ throws IOException {
+ this(submitter, new ZombieJobProducer(jobTrace, null), scratch, conf,
+ startFlag);
+ }
+
+ /**
+ * Constructor permitting JobStoryProducer to be mocked.
+ * @param submitter Component to which deserialized jobs are passed
+ * @param jobProducer Producer generating JobStory objects.
+ * @param scratch Directory into which to write output from simulated jobs
+ * @param conf Config passed to all jobs to be submitted
+ * @param startFlag Latch released from main to start pipeline
+ */
+ protected JobFactory(JobSubmitter submitter, JobStoryProducer jobProducer,
+ Path scratch, Configuration conf, CountDownLatch startFlag) {
+ sequence = new AtomicInteger(0);
+ this.scratch = scratch;
+ this.rateFactor = conf.getFloat(Gridmix.GRIDMIX_SUB_MUL, 1.0f);
+ this.jobProducer = jobProducer;
+ this.conf = new Configuration(conf);
+ this.submitter = submitter;
+ this.startFlag = startFlag;
+ this.rThread = new ReaderThread();
+ }
+
+ static class MinTaskInfo extends TaskInfo {
+ public MinTaskInfo(TaskInfo info) {
+ super(info.getInputBytes(), info.getInputRecords(),
+ info.getOutputBytes(), info.getOutputRecords(),
+ info.getTaskMemory());
+ }
+ public long getInputBytes() {
+ return Math.max(0, super.getInputBytes());
+ }
+ public int getInputRecords() {
+ return Math.max(0, super.getInputRecords());
+ }
+ public long getOutputBytes() {
+ return Math.max(0, super.getOutputBytes());
+ }
+ public int getOutputRecords() {
+ return Math.max(0, super.getOutputRecords());
+ }
+ public long getTaskMemory() {
+ return Math.max(0, super.getTaskMemory());
+ }
+ }
+
+ static class FilterJobStory implements JobStory {
+
+ protected final JobStory job;
+
+ public FilterJobStory(JobStory job) {
+ this.job = job;
+ }
+ public JobConf getJobConf() { return job.getJobConf(); }
+ public String getName() { return job.getName(); }
+ public JobID getJobID() { return job.getJobID(); }
+ public String getUser() { return job.getUser(); }
+ public long getSubmissionTime() { return job.getSubmissionTime(); }
+ public InputSplit[] getInputSplits() { return job.getInputSplits(); }
+ public int getNumberMaps() { return job.getNumberMaps(); }
+ public int getNumberReduces() { return job.getNumberReduces(); }
+ public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+ return job.getTaskInfo(taskType, taskNumber);
+ }
+ public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber,
+ int taskAttemptNumber) {
+ return job.getTaskAttemptInfo(taskType, taskNumber, taskAttemptNumber);
+ }
+ public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(
+ int taskNumber, int taskAttemptNumber, int locality) {
+ return job.getMapTaskAttemptInfoAdjusted(
+ taskNumber, taskAttemptNumber, locality);
+ }
+ public Values getOutcome() {
+ return job.getOutcome();
+ }
+ }
+
+ /**
+ * Worker thread responsible for reading descriptions, assigning sequence
+ * numbers, and normalizing time.
+ */
+ private class ReaderThread extends Thread {
+
+ public ReaderThread() {
+ super("GridmixJobFactory");
+ }
+
+ private JobStory getNextJobFiltered() throws IOException {
+ JobStory job;
+ do {
+ job = jobProducer.getNextJob();
+ } while (job != null
+ && (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS ||
+ job.getSubmissionTime() < 0));
+ return null == job ? null : new FilterJobStory(job) {
+ @Override
+ public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+ return new MinTaskInfo(this.job.getTaskInfo(taskType, taskNumber));
+ }
+ };
+ }
+
+ @Override
+ public void run() {
+ try {
+ startFlag.await();
+ if (Thread.currentThread().isInterrupted()) {
+ return;
+ }
+ final long initTime = TimeUnit.MILLISECONDS.convert(
+ System.nanoTime(), TimeUnit.NANOSECONDS);
+ LOG.debug("START @ " + initTime);
+ long first = -1;
+ long last = -1;
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ final JobStory job = getNextJobFiltered();
+ if (null == job) {
+ return;
+ }
+ if (first < 0) {
+ first = job.getSubmissionTime();
+ }
+ final long current = job.getSubmissionTime();
+ if (current < last) {
+ LOG.warn("Job " + job.getJobID() + " out of order");
+ continue;
+ }
+ last = current;
+ submitter.add(new GridmixJob(conf, initTime +
+ Math.round(rateFactor * (current - first)),
+ job, scratch, sequence.getAndIncrement()));
+ } catch (IOException e) {
+ JobFactory.this.error = e;
+ return;
+ }
+ }
+ } catch (InterruptedException e) {
+ // exit thread; ignore any jobs remaining in the trace
+ return;
+ } finally {
+ IOUtils.cleanup(null, jobProducer);
+ }
+ }
+ }
+
+ /**
+ * Obtain the error that caused the thread to exit unexpectedly.
+ */
+ public IOException error() {
+ return error;
+ }
+
+ /**
+ * Add is disabled.
+ * @throws UnsupportedOperationException
+ */
+ public void add(Void ignored) {
+ throw new UnsupportedOperationException(getClass().getName() +
+ " is at the start of the pipeline and accepts no events");
+ }
+
+ /**
+ * Start the reader thread, wait for latch if necessary.
+ */
+ public void start() {
+ rThread.start();
+ }
+
+ /**
+ * Wait for the reader thread to exhaust the job trace.
+ */
+ public void join(long millis) throws InterruptedException {
+ rThread.join(millis);
+ }
+
+ /**
+ * Interrupt the reader thread.
+ */
+ public void shutdown() {
+ rThread.interrupt();
+ }
+
+ /**
+ * Interrupt the reader thread. This requires no special consideration, as
+ * the thread has no pending work queue.
+ */
+ public void abort() {
+ // Currently no special work
+ rThread.interrupt();
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Component accepting submitted, running jobs and responsible for
+ * monitoring jobs for success and failure. Once a job is submitted, it is
+ * polled for status until complete. If a job is complete, then the monitor
+ * thread returns immediately to the queue. If not, the monitor will sleep
+ * for some duration.
+ */
+class JobMonitor implements Gridmix.Component<Job> {
+
+ public static final Log LOG = LogFactory.getLog(JobMonitor.class);
+
+ private final Queue<Job> mJobs;
+ private final MonitorThread mThread;
+ private final BlockingQueue<Job> runningJobs;
+ private final long pollDelayMillis;
+ private boolean graceful = false;
+ private boolean shutdown = false;
+
+ /**
+ * Create a JobMonitor with a default polling interval of 5s.
+ */
+ public JobMonitor() {
+ this(5, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Create a JobMonitor that sleeps for the specified duration after
+ * polling a still-running job.
+ * @param pollDelay Delay after polling a running job
+ * @param unit Time unit for pollDelaySec (rounded to milliseconds)
+ */
+ public JobMonitor(int pollDelay, TimeUnit unit) {
+ mThread = new MonitorThread();
+ runningJobs = new LinkedBlockingQueue<Job>();
+ mJobs = new LinkedList<Job>();
+ this.pollDelayMillis = TimeUnit.MILLISECONDS.convert(pollDelay, unit);
+ }
+
+ /**
+ * Add a job to the polling queue.
+ */
+ public void add(Job job) throws InterruptedException {
+ runningJobs.put(job);
+ }
+
+ /**
+ * Temporary hook for recording job success.
+ */
+ protected void onSuccess(Job job) {
+ LOG.info(job.getJobName() + " (" + job.getJobID() + ")" + " success");
+ }
+
+ /**
+ * Temporary hook for recording job failure.
+ */
+ protected void onFailure(Job job) {
+ LOG.info(job.getJobName() + " (" + job.getJobID() + ")" + " failure");
+ }
+
+ /**
+ * If shutdown before all jobs have completed, any still-running jobs
+ * may be extracted from the component.
+ * @throws IllegalStateException If monitoring thread is still running.
+ * @return Any jobs submitted and not known to have completed.
+ */
+ List<Job> getRemainingJobs() {
+ if (mThread.isAlive()) {
+ LOG.warn("Internal error: Polling running monitor for jobs");
+ }
+ synchronized (mJobs) {
+ return new ArrayList<Job>(mJobs);
+ }
+ }
+
+ /**
+ * Monitoring thread pulling running jobs from the component and into
+ * a queue to be polled for status.
+ */
+ private class MonitorThread extends Thread {
+
+ public MonitorThread() {
+ super("GridmixJobMonitor");
+ }
+
+ /**
+ * Check a job for success or failure.
+ */
+ public void process(Job job) throws IOException, InterruptedException {
+ if (job.isSuccessful()) {
+ onSuccess(job);
+ } else {
+ onFailure(job);
+ }
+ }
+
+ @Override
+ public void run() {
+ boolean graceful;
+ boolean shutdown;
+ while (true) {
+ try {
+ synchronized (mJobs) {
+ graceful = JobMonitor.this.graceful;
+ shutdown = JobMonitor.this.shutdown;
+ runningJobs.drainTo(mJobs);
+ }
+
+ // shutdown conditions; either shutdown requested and all jobs
+ // have completed or abort requested and there are recently
+ // submitted jobs not in the monitored set
+ if (shutdown) {
+ if (!graceful) {
+ while (!runningJobs.isEmpty()) {
+ synchronized (mJobs) {
+ runningJobs.drainTo(mJobs);
+ }
+ }
+ break;
+ } else if (mJobs.isEmpty()) {
+ break;
+ }
+ }
+ while (!mJobs.isEmpty()) {
+ Job job;
+ synchronized (mJobs) {
+ job = mJobs.poll();
+ }
+ try {
+ if (job.isComplete()) {
+ process(job);
+ continue;
+ }
+ } catch (IOException e) {
+ if (e.getCause() instanceof ClosedByInterruptException) {
+ // Job doesn't throw InterruptedException, but RPC socket layer
+ // is blocking and may throw a wrapped Exception if this thread
+ // is interrupted. Since the lower level cleared the flag,
+ // reset it here
+ Thread.currentThread().interrupt();
+ } else {
+ LOG.warn("Lost job " + (null == job.getJobName()
+ ? "<unknown>" : job.getJobName()), e);
+ continue;
+ }
+ }
+ synchronized (mJobs) {
+ if (!mJobs.offer(job)) {
+ LOG.error("Lost job " + (null == job.getJobName()
+ ? "<unknown>" : job.getJobName())); // should never
+ // happen
+ }
+ }
+ break;
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(pollDelayMillis);
+ } catch (InterruptedException e) {
+ shutdown = true;
+ continue;
+ }
+ } catch (Throwable e) {
+ LOG.warn("Unexpected exception: ", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Start the internal, monitoring thread.
+ */
+ public void start() {
+ mThread.start();
+ }
+
+ /**
+ * Wait for the monitor to halt, assuming shutdown or abort have been
+ * called. Note that, since submission may be sporatic, this will hang
+ * if no form of shutdown has been requested.
+ */
+ public void join(long millis) throws InterruptedException {
+ mThread.join(millis);
+ }
+
+ /**
+ * Drain all submitted jobs to a queue and stop the monitoring thread.
+ * Upstream submitter is assumed dead.
+ */
+ public void abort() {
+ synchronized (mJobs) {
+ graceful = false;
+ shutdown = true;
+ }
+ mThread.interrupt();
+ }
+
+ /**
+ * When all monitored jobs have completed, stop the monitoring thread.
+ * Upstream submitter is assumed dead.
+ */
+ public void shutdown() {
+ synchronized (mJobs) {
+ graceful = true;
+ shutdown = true;
+ }
+ mThread.interrupt();
+ }
+}
+
+
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Component accepting deserialized job traces, computing split data, and
+ * submitting to the cluster on deadline. Each job added from an upstream
+ * factory must be submitted to the cluster by the deadline recorded on it.
+ * Once submitted, jobs must be added to a downstream component for
+ * monitoring.
+ */
+class JobSubmitter implements Gridmix.Component<GridmixJob> {
+
+ public static final Log LOG = LogFactory.getLog(JobSubmitter.class);
+
+ final Semaphore sem;
+ private final FilePool inputDir;
+ private final JobMonitor monitor;
+ private final ExecutorService sched;
+ private volatile boolean shutdown = false;
+
+ /**
+ * Initialize the submission component with downstream monitor and pool of
+ * files from which split data may be read.
+ * @param monitor Monitor component to which jobs should be passed
+ * @param threads Number of submission threads
+ * See {@link Gridmix#GRIDMIX_SUB_THR}.
+ * @param queueDepth Max depth of pending work queue
+ * See {@link Gridmix#GRIDMIX_QUE_DEP}.
+ * @param inputDir Set of files from which split data may be mined for
+ * synthetic jobs.
+ */
+ public JobSubmitter(JobMonitor monitor, int threads, int queueDepth,
+ FilePool inputDir) {
+ sem = new Semaphore(queueDepth);
+ sched = new ThreadPoolExecutor(threads, threads, 0L,
+ TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+ this.inputDir = inputDir;
+ this.monitor = monitor;
+ }
+
+ /**
+ * Runnable wrapping a job to be submitted to the cluster.
+ */
+ private class SubmitTask implements Runnable {
+
+ final GridmixJob job;
+ public SubmitTask(GridmixJob job) {
+ this.job = job;
+ }
+ public void run() {
+ try {
+ // pre-compute split information
+ try {
+ job.buildSplits(inputDir);
+ } catch (IOException e) {
+ LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+ return;
+ }
+ // Sleep until deadline
+ long nsDelay = job.getDelay(TimeUnit.NANOSECONDS);
+ while (nsDelay > 0) {
+ TimeUnit.NANOSECONDS.sleep(nsDelay);
+ nsDelay = job.getDelay(TimeUnit.NANOSECONDS);
+ }
+ try {
+ // submit job
+ monitor.add(job.call());
+ LOG.debug("SUBMIT " + job + "@" + System.currentTimeMillis() +
+ " (" + job.getJob().getJobID() + ")");
+ } catch (IOException e) {
+ LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+ if (e.getCause() instanceof ClosedByInterruptException) {
+ throw new InterruptedException("Failed to submit " +
+ job.getJob().getJobName());
+ }
+ } catch (ClassNotFoundException e) {
+ LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+ }
+ } catch (InterruptedException e) {
+ // abort execution, remove splits if nesc
+ // TODO release ThdLoc
+ GridmixJob.pullDescription(job.id());
+ Thread.currentThread().interrupt();
+ return;
+ } finally {
+ sem.release();
+ }
+ }
+ }
+
+ /**
+ * Enqueue the job to be submitted per the deadline associated with it.
+ */
+ public void add(final GridmixJob job) throws InterruptedException {
+ final boolean addToQueue = !shutdown;
+ if (addToQueue) {
+ final SubmitTask task = new SubmitTask(job);
+ sem.acquire();
+ try {
+ sched.execute(task);
+ } catch (RejectedExecutionException e) {
+ sem.release();
+ }
+ }
+ }
+
+ /**
+ * (Re)scan the set of input files from which splits are derived.
+ */
+ public void refreshFilePool() throws IOException {
+ inputDir.refresh();
+ }
+
+ /**
+ * Does nothing, as the threadpool is already initialized and waiting for
+ * work from the upstream factory.
+ */
+ public void start() { }
+
+ /**
+ * Continue running until all queued jobs have been submitted to the
+ * cluster.
+ */
+ public void join(long millis) throws InterruptedException {
+ if (!shutdown) {
+ throw new IllegalStateException("Cannot wait for active submit thread");
+ }
+ sched.awaitTermination(millis, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Finish all jobs pending submission, but do not accept new work.
+ */
+ public void shutdown() {
+ // complete pending tasks, but accept no new tasks
+ shutdown = true;
+ sched.shutdown();
+ }
+
+ /**
+ * Discard pending work, including precomputed work waiting to be
+ * submitted.
+ */
+ public void abort() {
+ //pendingJobs.clear();
+ shutdown = true;
+ sched.shutdownNow();
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * For every record consumed, read key + val bytes from the stream provided.
+ */
+class ReadRecordFactory extends RecordFactory {
+
+ /**
+ * Size of internal, scratch buffer to read from internal stream.
+ */
+ public static final String GRIDMIX_READ_BUF_SIZE = "gridmix.read.buffer.size";
+
+ private final byte[] buf;
+ private final InputStream src;
+ private final RecordFactory factory;
+
+ /**
+ * @param targetBytes Expected byte count.
+ * @param targetRecords Expected record count.
+ * @param src Stream to read bytes.
+ * @param conf Used to establish read buffer size. @see #GRIDMIX_READ_BUF_SIZE
+ */
+ public ReadRecordFactory(long targetBytes, long targetRecords,
+ InputStream src, Configuration conf) {
+ this(new AvgRecordFactory(targetBytes, targetRecords, conf), src, conf);
+ }
+
+ /**
+ * @param factory Factory to draw record sizes.
+ * @param src Stream to read bytes.
+ * @param conf Used to establish read buffer size. @see #GRIDMIX_READ_BUF_SIZE
+ */
+ public ReadRecordFactory(RecordFactory factory, InputStream src,
+ Configuration conf) {
+ this.src = src;
+ this.factory = factory;
+ buf = new byte[conf.getInt(GRIDMIX_READ_BUF_SIZE, 64 * 1024)];
+ }
+
+ @Override
+ public boolean next(GridmixKey key, GridmixRecord val) throws IOException {
+ if (!factory.next(key, val)) {
+ return false;
+ }
+ for (int len = (null == key ? 0 : key.getSize()) + val.getSize();
+ len > 0; len -= buf.length) {
+ IOUtils.readFully(src, buf, 0, Math.min(buf.length, len));
+ }
+ return true;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return factory.getProgress();
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.cleanup(null, src);
+ factory.close();
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Interface for producing records as inputs and outputs to tasks.
+ */
+abstract class RecordFactory implements Closeable {
+
+ /**
+ * Transform the given record or perform some operation.
+ * @return true if the record should be emitted.
+ */
+ public abstract boolean next(GridmixKey key, GridmixRecord val)
+ throws IOException;
+
+ /**
+ * Estimate of exhausted record capacity.
+ */
+ public abstract float getProgress() throws IOException;
+
+}