You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2009/12/11 20:33:31 UTC
svn commit: r889779 [1/2] - in /hadoop/mapreduce/branches/branch-0.21: ./
src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/
src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/
src/tools/org/apache/hadoop/tools/rumen/
Author: cdouglas
Date: Fri Dec 11 19:33:27 2009
New Revision: 889779
URL: http://svn.apache.org/viewvc?rev=889779&view=rev
Log:
MAPREDUCE-1124. Fix imprecise byte counts in Gridmix.
Added:
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java
Modified:
hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=889779&r1=889778&r2=889779&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Fri Dec 11 19:33:27 2009
@@ -876,3 +876,4 @@
MAPREDUCE-879. Fix broken unit test TestTaskTrackerLocalization on MacOS.
(Sreekanth Ramakrishnan via yhemanth)
+ MAPREDUCE-1124. Fix imprecise byte counts in Gridmix. (cdouglas)
Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java?rev=889779&view=auto
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java (added)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java Fri Dec 11 19:33:27 2009
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Given byte and record targets, emit roughly equal-sized records satisfying
+ * the contract.
+ */
+class AvgRecordFactory extends RecordFactory {
+
+ /**
+ * Percentage of record for key data.
+ */
+ public static final String GRIDMIX_KEY_FRC = "gridmix.key.fraction";
+
+
+ private final long targetBytes;
+ private final long targetRecords;
+ private final long step;
+ private final int avgrec;
+ private final int keyLen;
+ private long accBytes = 0L;
+ private long accRecords = 0L;
+
+ /**
+ * @param targetBytes Expected byte count.
+ * @param targetRecords Expected record count.
+ * @param conf Used to resolve edge cases @see #GRIDMIX_KEY_FRC
+ */
+ public AvgRecordFactory(long targetBytes, long targetRecords,
+ Configuration conf) {
+ this.targetBytes = targetBytes;
+ this.targetRecords = targetRecords <= 0 && this.targetBytes >= 0
+ ? Math.max(1,
+ this.targetBytes / conf.getInt("gridmix.missing.rec.size", 64 * 1024))
+ : targetRecords;
+ final long tmp = this.targetBytes / this.targetRecords;
+ step = this.targetBytes - this.targetRecords * tmp;
+ avgrec = (int) Math.min(Integer.MAX_VALUE, tmp + 1);
+ keyLen = Math.max(1,
+ (int)(tmp * Math.min(1.0f, conf.getFloat(GRIDMIX_KEY_FRC, 0.1f))));
+ }
+
+ @Override
+ public boolean next(GridmixKey key, GridmixRecord val) throws IOException {
+ if (accBytes >= targetBytes) {
+ return false;
+ }
+ final int reclen = accRecords++ >= step ? avgrec - 1 : avgrec;
+ final int len = (int) Math.min(targetBytes - accBytes, reclen);
+ // len != reclen?
+ if (key != null) {
+ key.setSize(keyLen);
+ val.setSize(len - key.getSize());
+ } else {
+ val.setSize(len);
+ }
+ accBytes += len;
+ return true;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return Math.min(1.0f, accBytes / ((float)targetBytes));
+ }
+
+ @Override
+ public void close() throws IOException {
+ // noop
+ }
+
+}
Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java?rev=889779&r1=889778&r2=889779&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java Fri Dec 11 19:33:27 2009
@@ -223,7 +223,6 @@
return getSize();
}
- // TODO sort, pick rand pairs of kth large/small in dir
IndexMapper mapping;
if ((curdir.size() < 200) || ((double) targetSize / getSize() > 0.5)) {
mapping = new DenseIndexMapper(curdir.size());
@@ -234,13 +233,13 @@
ArrayList<Integer> selected = new ArrayList<Integer>();
long ret = 0L;
int poolSize = curdir.size();
- while (ret < targetSize) {
+ do {
int pos = rand.nextInt(poolSize);
int index = mapping.get(pos);
selected.add(index);
ret += curdir.get(index).getLen();
mapping.swap(pos, --poolSize);
- }
+ } while (ret < targetSize);
for (Integer i : selected) {
files.add(curdir.get(i));
Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java?rev=889779&view=auto
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java (added)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java Fri Dec 11 19:33:27 2009
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+/**
+ * Given a {@link org.apache.hadoop.mapreduce.lib.input.CombineFileSplit},
+ * circularly read through each input source.
+ */
+class FileQueue extends InputStream {
+
+ private int idx = -1;
+ private long curlen = -1L;
+ private FSDataInputStream input;
+ private final byte[] z = new byte[1];
+ private final Path[] paths;
+ private final long[] lengths;
+ private final long[] startoffset;
+ private final Configuration conf;
+
+ /**
+ * @param split Description of input sources.
+ * @param conf Used to resolve FileSystem instances.
+ */
+ public FileQueue(CombineFileSplit split, Configuration conf)
+ throws IOException {
+ this.conf = conf;
+ paths = split.getPaths();
+ startoffset = split.getStartOffsets();
+ lengths = split.getLengths();
+ nextSource();
+ }
+
+ protected void nextSource() throws IOException {
+ if (0 == paths.length) {
+ return;
+ }
+ if (input != null) {
+ input.close();
+ }
+ idx = (idx + 1) % paths.length;
+ curlen = lengths[idx];
+ final Path file = paths[idx];
+ final FileSystem fs = file.getFileSystem(conf);
+ input = fs.open(file);
+ input.seek(startoffset[idx]);
+ }
+
+ @Override
+ public int read() throws IOException {
+ final int tmp = read(z);
+ return tmp == -1 ? -1 : (0xFF & z[0]);
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int kvread = 0;
+ while (kvread < len) {
+ if (curlen <= 0) {
+ nextSource();
+ continue;
+ }
+ final int srcRead = (int) Math.min(len - kvread, curlen);
+ IOUtils.readFully(input, b, kvread, srcRead);
+ curlen -= srcRead;
+ kvread += srcRead;
+ }
+ return kvread;
+ }
+
+ @Override
+ public void close() throws IOException {
+ input.close();
+ }
+
+}
Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java?rev=889779&r1=889778&r2=889779&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java Fri Dec 11 19:33:27 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.OutputStream;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
@@ -28,7 +29,6 @@
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -52,10 +52,30 @@
// TODO can replace with form of GridmixJob
class GenerateData extends GridmixJob {
+ /**
+ * Total bytes to write.
+ */
+ public static final String GRIDMIX_GEN_BYTES = "gridmix.gen.bytes";
+
+ /**
+ * Maximum size per file written.
+ */
+ public static final String GRIDMIX_GEN_CHUNK = "gridmix.gen.bytes.per.file";
+
+ /**
+ * Size of writes to output file.
+ */
+ public static final String GRIDMIX_VAL_BYTES = "gendata.val.bytes";
+
+ /**
+ * Status reporting interval, in megabytes.
+ */
+ public static final String GRIDMIX_GEN_INTERVAL = "gendata.interval.mb";
+
public GenerateData(Configuration conf, Path outdir, long genbytes)
throws IOException {
super(conf, 0L, "GRIDMIX_GENDATA");
- job.getConfiguration().setLong("gridmix.gendata.bytes", genbytes);
+ job.getConfiguration().setLong(GRIDMIX_GEN_BYTES, genbytes);
FileOutputFormat.setOutputPath(job, outdir);
}
@@ -84,7 +104,7 @@
protected void setup(Context context)
throws IOException, InterruptedException {
val = new BytesWritable(new byte[
- context.getConfiguration().getInt("gendata.val.bytes", 1024 * 1024)]);
+ context.getConfiguration().getInt(GRIDMIX_VAL_BYTES, 1024 * 1024)]);
}
@Override
@@ -106,7 +126,7 @@
final JobClient client = new JobClient(jobCtxt.getConfiguration());
ClusterStatus stat = client.getClusterStatus(true);
final long toGen =
- jobCtxt.getConfiguration().getLong("gridmix.gendata.bytes", -1);
+ jobCtxt.getConfiguration().getLong(GRIDMIX_GEN_BYTES, -1);
if (toGen < 0) {
throw new IOException("Invalid/missing generation bytes: " + toGen);
}
@@ -144,7 +164,7 @@
throws IOException, InterruptedException {
toWrite = split.getLength();
RINTERVAL = ctxt.getConfiguration().getInt(
- "gendata.report.interval.mb", 10) << 20;
+ GRIDMIX_GEN_INTERVAL, 10) << 20;
}
@Override
public boolean nextKeyValue() throws IOException {
@@ -219,20 +239,52 @@
public RecordWriter<NullWritable,BytesWritable> getRecordWriter(
TaskAttemptContext job) throws IOException {
- Path file = getDefaultWorkFile(job, "");
- FileSystem fs = file.getFileSystem(job.getConfiguration());
- final FSDataOutputStream fileOut = fs.create(file, false);
- return new RecordWriter<NullWritable,BytesWritable>() {
- @Override
- public void write(NullWritable key, BytesWritable value)
- throws IOException {
- fileOut.write(value.getBytes(), 0, value.getLength());
- }
- @Override
- public void close(TaskAttemptContext ctxt) throws IOException {
+ return new ChunkWriter(getDefaultWorkFile(job, ""),
+ job.getConfiguration());
+ }
+
+ static class ChunkWriter extends RecordWriter<NullWritable,BytesWritable> {
+ private final Path outDir;
+ private final FileSystem fs;
+ private final long maxFileBytes;
+
+ private long accFileBytes = 0L;
+ private long fileIdx = -1L;
+ private OutputStream fileOut = null;
+
+ public ChunkWriter(Path outDir, Configuration conf) throws IOException {
+ this.outDir = outDir;
+ fs = outDir.getFileSystem(conf);
+ maxFileBytes = conf.getLong(GRIDMIX_GEN_CHUNK, 1L << 30);
+ nextDestination();
+ }
+ private void nextDestination() throws IOException {
+ if (fileOut != null) {
fileOut.close();
}
- };
+ fileOut = fs.create(new Path(outDir, "segment-" + (++fileIdx)), false);
+ accFileBytes = 0L;
+ }
+ @Override
+ public void write(NullWritable key, BytesWritable value)
+ throws IOException {
+ int written = 0;
+ final int total = value.getLength();
+ while (written < total) {
+ final int write = (int)
+ Math.min(total - written, maxFileBytes - accFileBytes);
+ fileOut.write(value.getBytes(), written, write);
+ written += write;
+ accFileBytes += write;
+ if (accFileBytes >= maxFileBytes) {
+ nextDestination();
+ }
+ }
+ }
+ @Override
+ public void close(TaskAttemptContext ctxt) throws IOException {
+ fileOut.close();
+ }
}
}
Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=889779&r1=889778&r2=889779&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Fri Dec 11 19:33:27 2009
@@ -26,7 +26,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.Job;
@@ -73,12 +72,11 @@
"gridmix.client.pending.queue.depth";
/**
- * Size of key data in synthetic jobs. At present, key length is not
- * available in job traces. Since all solutions are equally bad, globally
- * specifying the amount of each record that is key data is the simplest
- * to implement and the method chosen.
+ * Multiplier to accelerate or decelerate job submission. As a crude means of
+ * sizing a job trace to a cluster, the time separating two jobs is
+ * multiplied by this factor.
*/
- public static final String GRIDMIX_KEY_LEN = "gridmix.min.key.length";
+ public static final String GRIDMIX_SUB_MUL = "gridmix.submit.multiplier";
// Submit data structures
private JobFactory factory;
@@ -135,7 +133,7 @@
submitter = createJobSubmitter(monitor,
conf.getInt(GRIDMIX_SUB_THR,
Runtime.getRuntime().availableProcessors() + 1),
- conf.getInt(GRIDMIX_QUE_DEP, 100),
+ conf.getInt(GRIDMIX_QUE_DEP, 5),
new FilePool(conf, ioPath));
factory = createJobFactory(submitter, traceIn, scratchDir, conf, startFlag);
monitor.start();
@@ -182,12 +180,10 @@
printUsage(System.err);
return 1;
}
- FileSystem fs = null;
InputStream trace = null;
try {
final Configuration conf = getConf();
Path scratchDir = new Path(ioPath, conf.get(GRIDMIX_OUT_DIR, "gridmix"));
- fs = scratchDir.getFileSystem(conf);
// add shutdown hook for SIGINT, etc.
Runtime.getRuntime().addShutdownHook(sdh);
CountDownLatch startFlag = new CountDownLatch(1);
@@ -210,7 +206,7 @@
if (factory != null) {
// wait for input exhaustion
- factory.join();
+ factory.join(Long.MAX_VALUE);
final Throwable badTraceException = factory.error();
if (null != badTraceException) {
LOG.error("Error in trace", badTraceException);
@@ -218,10 +214,10 @@
}
// wait for pending tasks to be submitted
submitter.shutdown();
- submitter.join();
+ submitter.join(Long.MAX_VALUE);
// wait for running tasks to complete
monitor.shutdown();
- monitor.join();
+ monitor.join(Long.MAX_VALUE);
}
} finally {
IOUtils.cleanup(LOG, trace);
@@ -236,13 +232,17 @@
*/
class Shutdown extends Thread {
- private void killComponent(Component<?> component) {
+ static final long FAC_SLEEP = 1000;
+ static final long SUB_SLEEP = 4000;
+ static final long MON_SLEEP = 15000;
+
+ private void killComponent(Component<?> component, long maxwait) {
if (component == null) {
return;
}
- component.abort(); // read no more tasks
+ component.abort();
try {
- component.join();
+ component.join(maxwait);
} catch (InterruptedException e) {
LOG.warn("Interrupted waiting for " + component);
}
@@ -253,9 +253,9 @@
public void run() {
LOG.info("Exiting...");
try {
- killComponent(factory); // read no more tasks
- killComponent(submitter); // submit no more tasks
- killComponent(monitor); // process remaining jobs in this thread
+ killComponent(factory, FAC_SLEEP); // read no more tasks
+ killComponent(submitter, SUB_SLEEP); // submit no more tasks
+ killComponent(monitor, MON_SLEEP); // process remaining jobs here
} finally {
if (monitor == null) {
return;
@@ -306,7 +306,8 @@
out.printf(" %-40s : Output directory\n", GRIDMIX_OUT_DIR);
out.printf(" %-40s : Submitting threads\n", GRIDMIX_SUB_THR);
out.printf(" %-40s : Queued job desc\n", GRIDMIX_QUE_DEP);
- out.printf(" %-40s : Key size\n", GRIDMIX_KEY_LEN);
+ out.printf(" %-40s : Key fraction of rec\n",
+ AvgRecordFactory.GRIDMIX_KEY_FRC);
}
/**
@@ -331,7 +332,7 @@
* Wait until the service completes. It is assumed that either a
* {@link #shutdown} or {@link #abort} has been requested.
*/
- void join() throws InterruptedException;
+ void join(long millis) throws InterruptedException;
/**
* Shut down gracefully, finishing all pending work. Reject new requests.
Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=889779&r1=889778&r2=889779&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Fri Dec 11 19:33:27 2009
@@ -17,22 +17,10 @@
*/
package org.apache.hadoop.mapred.gridmix;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.IntBuffer;
-import java.nio.LongBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.EnumSet;
import java.util.Formatter;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@@ -40,15 +28,10 @@
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
@@ -65,7 +48,6 @@
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.tools.rumen.JobStory;
@@ -80,6 +62,7 @@
class GridmixJob implements Callable<Job>, Delayed {
public static final String JOBNAME = "GRIDMIX";
+ public static final String ORIGNAME = "gridmix.job.name.original";
public static final Log LOG = LogFactory.getLog(GridmixJob.class);
private static final ThreadLocal<Formatter> nameFormat =
@@ -180,14 +163,17 @@
job.setReducerClass(GridmixReducer.class);
job.setNumReduceTasks(jobdesc.getNumberReduces());
job.setMapOutputKeyClass(GridmixKey.class);
- job.setMapOutputValueClass(BytesWritable.class);
- job.setSortComparatorClass(BytesWritable.Comparator.class);
+ job.setMapOutputValueClass(GridmixRecord.class);
+ job.setSortComparatorClass(GridmixKey.Comparator.class);
job.setGroupingComparatorClass(SpecGroupingComparator.class);
job.setInputFormatClass(GridmixInputFormat.class);
job.setOutputFormatClass(RawBytesOutputFormat.class);
job.setPartitionerClass(DraftPartitioner.class);
job.setJarByClass(GridmixJob.class);
job.getConfiguration().setInt("gridmix.job.seq", seq);
+ job.getConfiguration().set(ORIGNAME, null == jobdesc.getJobID()
+ ? "<unknown>" : jobdesc.getJobID().toString());
+ job.getConfiguration().setBoolean(Job.USED_GENERIC_PARSER, true);
FileInputFormat.addInputPath(job, new Path("ignored"));
FileOutputFormat.setOutputPath(job, outdir);
job.submit();
@@ -200,11 +186,10 @@
}
}
- /**
- * Group REDUCE_SPEC records together
- */
public static class SpecGroupingComparator
- implements RawComparator<GridmixKey>, Serializable {
+ implements RawComparator<GridmixKey> {
+ private final DataInputBuffer di = new DataInputBuffer();
+ private final byte[] reset = di.getData();
@Override
public int compare(GridmixKey g1, GridmixKey g2) {
final byte t1 = g1.getType();
@@ -215,284 +200,128 @@
}
assert t1 == GridmixKey.DATA;
assert t2 == GridmixKey.DATA;
- return WritableComparator.compareBytes(
- g1.getBytes(), 0, g1.getLength(),
- g2.getBytes(), 0, g2.getLength());
+ return g1.compareTo(g2);
}
@Override
- public int compare(byte[] b1, int s1, int l1,
- byte[] b2, int s2, int l2) {
- final byte t1 = b1[s1 + 4];
- final byte t2 = b2[s2 + 4];
- if (t1 == GridmixKey.REDUCE_SPEC ||
- t2 == GridmixKey.REDUCE_SPEC) {
- return t1 - t2;
- }
- assert t1 == GridmixKey.DATA;
- assert t2 == GridmixKey.DATA;
- return WritableComparator.compareBytes(
- b1, s1 + 4, l1 - 4,
- b2, s2 + 4, l2 - 4);
- }
- }
-
- /**
- * Keytype for synthetic jobs, some embedding instructions for the reduce.
- */
- public static class GridmixKey extends BytesWritable {
- // long fields specifying reduce contract
- private enum RSpec { REC_IN, REC_OUT, BYTES_OUT };
- private static final int SPEC_START = 5; // type + partition len
- private static final int NUMFIELDS = RSpec.values().length;
- private static final int SPEC_SIZE = NUMFIELDS * 8;
-
- // Key types
- static final byte REDUCE_SPEC = 0;
- static final byte DATA = 1;
-
- private IntBuffer partition;
- private LongBuffer spec;
-
- public GridmixKey() {
- super(new byte[SPEC_START]);
- }
-
- public GridmixKey(byte type, byte[] b) {
- super(b);
- setType(type);
- }
-
- public byte getType() {
- return getBytes()[0];
- }
- public void setPartition(int partition) {
- this.partition.put(0, partition);
- }
- public int getPartition() {
- return partition.get(0);
- }
- public long getReduceInputRecords() {
- checkState(REDUCE_SPEC);
- return spec.get(RSpec.REC_IN.ordinal());
- }
- public long getReduceOutputBytes() {
- checkState(REDUCE_SPEC);
- return spec.get(RSpec.BYTES_OUT.ordinal());
- }
- public long getReduceOutputRecords() {
- checkState(REDUCE_SPEC);
- return spec.get(RSpec.REC_OUT.ordinal());
- }
- public void setType(byte b) {
- switch (b) {
- case REDUCE_SPEC:
- if (getCapacity() < SPEC_START + SPEC_SIZE) {
- setSize(SPEC_START + SPEC_SIZE);
- }
- spec =
- ByteBuffer.wrap(getBytes(), SPEC_START, SPEC_SIZE).asLongBuffer();
- break;
- case DATA:
- if (getCapacity() < SPEC_START) {
- setSize(SPEC_START);
- }
- spec = null;
- break;
- default:
- throw new IllegalArgumentException("Illegal type " + b);
- }
- getBytes()[0] = b;
- partition =
- ByteBuffer.wrap(getBytes(), 1, SPEC_START - 1).asIntBuffer();
- }
- public void setReduceInputRecords(long records) {
- checkState(REDUCE_SPEC);
- spec.put(RSpec.REC_IN.ordinal(), records);
- }
- public void setReduceOutputBytes(long bytes) {
- checkState(REDUCE_SPEC);
- spec.put(RSpec.BYTES_OUT.ordinal(), bytes);
- }
- public void setReduceOutputRecords(long records) {
- checkState(REDUCE_SPEC);
- spec.put(RSpec.REC_OUT.ordinal(), records);
- }
- private void checkState(byte b) {
- if (getLength() < SPEC_START || getType() != b) {
- throw new IllegalStateException("Expected " + b + ", was " + getType());
- }
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- if (getLength() < SPEC_START) {
- throw new IOException("Invalid GridmixKey, len " + getLength());
- }
- partition =
- ByteBuffer.wrap(getBytes(), 1, SPEC_START - 1).asIntBuffer();
- spec = getType() == REDUCE_SPEC
- ? ByteBuffer.wrap(getBytes(), SPEC_START, SPEC_SIZE).asLongBuffer()
- : null;
- }
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- if (getType() == REDUCE_SPEC) {
- LOG.debug("SPEC(" + getPartition() + ") " + getReduceInputRecords() +
- " -> " + getReduceOutputRecords() + "/" + getReduceOutputBytes());
- }
- }
- @Override
- public boolean equals(Object other) {
- if (other instanceof GridmixKey) {
- return super.equals(other);
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ try {
+ final int ret;
+ di.reset(b1, s1, l1);
+ final int x1 = WritableUtils.readVInt(di);
+ di.reset(b2, s2, l2);
+ final int x2 = WritableUtils.readVInt(di);
+ final int t1 = b1[s1 + x1];
+ final int t2 = b2[s2 + x2];
+ if (t1 == GridmixKey.REDUCE_SPEC ||
+ t2 == GridmixKey.REDUCE_SPEC) {
+ ret = t1 - t2;
+ } else {
+ assert t1 == GridmixKey.DATA;
+ assert t2 == GridmixKey.DATA;
+ ret =
+ WritableComparator.compareBytes(b1, s1, x1, b2, s2, x2);
+ }
+ di.reset(reset, 0, 0);
+ return ret;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- return false;
- }
-
- @Override
- public int hashCode() {
- return super.hashCode();
}
}
public static class GridmixMapper
- extends Mapper<IntWritable,BytesWritable,GridmixKey,BytesWritable> {
-
- private final Random r = new Random();
- private GridmixKey key;
- private final BytesWritable val = new BytesWritable();
+ extends Mapper<NullWritable,GridmixRecord,GridmixKey,GridmixRecord> {
- private int keyLen;
private double acc;
private double ratio;
- private int[] reduceRecordSize;
- private long[] reduceRecordCount;
- private long[] reduceRecordRemaining;
+ private final ArrayList<RecordFactory> reduces =
+ new ArrayList<RecordFactory>();
+ private final Random r = new Random();
+
+ private final GridmixKey key = new GridmixKey();
+ private final GridmixRecord val = new GridmixRecord();
@Override
- protected void setup(Context context)
+ protected void setup(Context ctxt)
throws IOException, InterruptedException {
- // TODO clearly job-specific, but no data at present
- keyLen = context.getConfiguration().getInt(Gridmix.GRIDMIX_KEY_LEN, 20);
- key = new GridmixKey(GridmixKey.DATA, new byte[keyLen]);
- final GridmixSplit split = (GridmixSplit) context.getInputSplit();
- LOG.info("ID: " + split.getId());
- reduceRecordCount = split.getOutputRecords();
- reduceRecordRemaining =
- Arrays.copyOf(reduceRecordCount, reduceRecordCount.length);
- reduceRecordSize = new int[reduceRecordCount.length];
- int valsize = -1;
+ final Configuration conf = ctxt.getConfiguration();
+ final GridmixSplit split = (GridmixSplit) ctxt.getInputSplit();
+ final int maps = split.getMapCount();
final long[] reduceBytes = split.getOutputBytes();
+ final long[] reduceRecords = split.getOutputRecords();
+
long totalRecords = 0L;
- for (int i = 0; i < reduceBytes.length; ++i) {
- reduceRecordSize[i] = Math.max(0,
- Math.round(reduceBytes[i] / (1.0f * reduceRecordCount[i])) - keyLen);
- valsize = Math.max(reduceRecordSize[i], valsize);
- totalRecords += reduceRecordCount[i];
- }
- valsize = Math.max(0, valsize - 4); // BW len encoding
- val.setCapacity(valsize);
- val.setSize(valsize);
- ratio = totalRecords / (1.0 * split.getInputRecords());
+ final int nReduces = ctxt.getNumReduceTasks();
+ if (nReduces > 0) {
+ int idx = 0;
+ int id = split.getId();
+ for (int i = 0; i < nReduces; ++i) {
+ final GridmixKey.Spec spec = new GridmixKey.Spec();
+ if (i == id) {
+ spec.bytes_out = split.getReduceBytes(idx);
+ spec.rec_out = split.getReduceRecords(idx);
+ ++idx;
+ id += maps;
+ }
+ reduces.add(new IntermediateRecordFactory(
+ new AvgRecordFactory(reduceBytes[i], reduceRecords[i], conf),
+ i, reduceRecords[i], spec, conf));
+ totalRecords += reduceRecords[i];
+ }
+ } else {
+ reduces.add(new AvgRecordFactory(reduceBytes[0], reduceRecords[0],
+ conf));
+ totalRecords = reduceRecords[0];
+ }
+ final long splitRecords = split.getInputRecords();
+ final long inputRecords = splitRecords <= 0 && split.getLength() >= 0
+ ? Math.max(1,
+ split.getLength() / conf.getInt("gridmix.missing.rec.size", 64*1024))
+ : splitRecords;
+ ratio = totalRecords / (1.0 * inputRecords);
acc = 0.0;
}
- protected void fillBytes(BytesWritable val, int len) {
- r.nextBytes(val.getBytes());
- val.setSize(len);
- }
-
- /** Find next non-empty partition after start. */
- private int getNextPart(final int start) {
- int p = start;
- do {
- p = (p + 1) % reduceRecordSize.length;
- } while (0 == reduceRecordRemaining[p] && p != start);
- return 0 == reduceRecordRemaining[p] ? -1 : p;
- }
-
@Override
- public void map(IntWritable ignored, BytesWritable bytes,
+ public void map(NullWritable ignored, GridmixRecord rec,
Context context) throws IOException, InterruptedException {
- int p = getNextPart(r.nextInt(reduceRecordSize.length));
- if (-1 == p) {
- return;
- }
acc += ratio;
- while (acc >= 1.0) {
- fillBytes(key, key.getLength());
- key.setType(GridmixKey.DATA);
- key.setPartition(p);
- --reduceRecordRemaining[p];
- fillBytes(val, reduceRecordSize[p]);
+ while (acc >= 1.0 && !reduces.isEmpty()) {
+ key.setSeed(r.nextLong());
+ val.setSeed(r.nextLong());
+ final int idx = r.nextInt(reduces.size());
+ final RecordFactory f = reduces.get(idx);
+ if (!f.next(key, val)) {
+ reduces.remove(idx);
+ continue;
+ }
context.write(key, val);
acc -= 1.0;
- if (0 == reduceRecordRemaining[p] && -1 == (p = getNextPart(p))) {
- return;
- }
}
}
@Override
public void cleanup(Context context)
throws IOException, InterruptedException {
- // output any remaining records
- // TODO include reduce spec in remaining records if avail
- // (i.e. move this to map)
- for (int i = 0; i < reduceRecordSize.length; ++i) {
- for (long j = reduceRecordRemaining[i]; j > 0; --j) {
- fillBytes(key, key.getLength());
- key.setType(GridmixKey.DATA);
- key.setPartition(i);
- fillBytes(val, reduceRecordSize[i]);
+ for (RecordFactory factory : reduces) {
+ key.setSeed(r.nextLong());
+ while (factory.next(key, val)) {
context.write(key, val);
+ key.setSeed(r.nextLong());
}
}
- val.setSize(0);
- key.setType(GridmixKey.REDUCE_SPEC);
- final int reduces = context.getNumReduceTasks();
- final GridmixSplit split = (GridmixSplit) context.getInputSplit();
- final int maps = split.getMapCount();
- int idx = 0;
- int id = split.getId();
- for (int i = 0; i < reduces; ++i) {
- key.setPartition(i);
- key.setReduceInputRecords(reduceRecordCount[i]);
- // Write spec for all red st r_id % id == 0
- if (i == id) {
- key.setReduceOutputBytes(split.getReduceBytes(idx));
- key.setReduceOutputRecords(split.getReduceRecords(idx));
- LOG.debug(String.format("SPEC'D %d / %d to %d",
- split.getReduceRecords(idx), split.getReduceBytes(idx), i));
- ++idx;
- id += maps;
- } else {
- key.setReduceOutputBytes(0);
- key.setReduceOutputRecords(0);
- }
- context.write(key, val);
- }
}
}
public static class GridmixReducer
- extends Reducer<GridmixKey,BytesWritable,NullWritable,BytesWritable> {
+ extends Reducer<GridmixKey,GridmixRecord,NullWritable,GridmixRecord> {
private final Random r = new Random();
- private final BytesWritable val = new BytesWritable();
+ private final GridmixRecord val = new GridmixRecord();
private double acc;
private double ratio;
- private long written;
- private long inRecords = 0L;
- private long outBytes = 0L;
- private long outRecords = 0L;
-
- protected void fillBytes(BytesWritable val, int len) {
- r.nextBytes(val.getBytes());
- val.setSize(len);
- }
+ private RecordFactory factory;
@Override
protected void setup(Context context)
@@ -501,62 +330,52 @@
context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) {
throw new IOException("Missing reduce spec");
}
- for (BytesWritable ignored : context.getValues()) {
+ long outBytes = 0L;
+ long outRecords = 0L;
+ long inRecords = 0L;
+ for (GridmixRecord ignored : context.getValues()) {
final GridmixKey spec = context.getCurrentKey();
inRecords += spec.getReduceInputRecords();
- LOG.debug("GOT COUNT " + spec.getReduceInputRecords());
outBytes += spec.getReduceOutputBytes();
outRecords += spec.getReduceOutputRecords();
}
- LOG.debug("GOT SPEC " + outRecords + "/" + outBytes);
- val.setCapacity(Math.round(outBytes / (1.0f * outRecords)));
+ if (0 == outRecords && inRecords > 0) {
+ LOG.info("Spec output bytes w/o records. Using input record count");
+ outRecords = inRecords;
+ }
+ factory =
+ new AvgRecordFactory(outBytes, outRecords, context.getConfiguration());
ratio = outRecords / (1.0 * inRecords);
acc = 0.0;
- LOG.debug(String.format("RECV %d -> %10d/%10d %d %f", inRecords,
- outRecords, outBytes, val.getCapacity(), ratio));
}
@Override
- protected void reduce(GridmixKey key, Iterable<BytesWritable> values,
+ protected void reduce(GridmixKey key, Iterable<GridmixRecord> values,
Context context) throws IOException, InterruptedException {
- for (BytesWritable ignored : values) {
+ for (GridmixRecord ignored : values) {
acc += ratio;
- while (acc >= 1.0 && written < outBytes) {
- final int len = (int) Math.min(outBytes - written, val.getCapacity());
- fillBytes(val, len);
+ while (acc >= 1.0 && factory.next(null, val)) {
context.write(NullWritable.get(), val);
acc -= 1.0;
- written += len;
- LOG.debug(String.format("%f %d/%d", acc, written, outBytes));
}
}
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
- while (written < outBytes) {
- final int len = (int) Math.min(outBytes - written, val.getCapacity());
- fillBytes(val, len);
+ val.setSeed(r.nextLong());
+ while (factory.next(null, val)) {
context.write(NullWritable.get(), val);
- written += len;
+ val.setSeed(r.nextLong());
}
}
}
static class GridmixRecordReader
- extends RecordReader<IntWritable,BytesWritable> {
+ extends RecordReader<NullWritable,GridmixRecord> {
- private long bytesRead = 0;
- private long bytesTotal;
- private Configuration conf;
- private final IntWritable key = new IntWritable();
- private final BytesWritable inBytes = new BytesWritable();
-
- private FSDataInputStream input;
- private int idx = -1;
- private int capacity;
- private Path[] paths;
- private long[] startoffset;
- private long[] lengths;
+ private RecordFactory factory;
+ private final Random r = new Random();
+ private final GridmixRecord val = new GridmixRecord();
public GridmixRecordReader() { }
@@ -564,178 +383,36 @@
public void initialize(InputSplit genericSplit, TaskAttemptContext ctxt)
throws IOException, InterruptedException {
final GridmixSplit split = (GridmixSplit)genericSplit;
- this.conf = ctxt.getConfiguration();
- paths = split.getPaths();
- startoffset = split.getStartOffsets();
- lengths = split.getLengths();
- bytesTotal = split.getLength();
- capacity = (int) Math.round(bytesTotal / (1.0 * split.getInputRecords()));
- inBytes.setCapacity(capacity);
- nextSource();
- }
- private void nextSource() throws IOException {
- idx = (idx + 1) % paths.length;
- final Path file = paths[idx];
- final FileSystem fs = file.getFileSystem(conf);
- input = fs.open(file, capacity);
- input.seek(startoffset[idx]);
+ final Configuration conf = ctxt.getConfiguration();
+ factory = new ReadRecordFactory(split.getLength(),
+ split.getInputRecords(), new FileQueue(split, conf), conf);
}
+
@Override
public boolean nextKeyValue() throws IOException {
- if (bytesRead >= bytesTotal) {
- return false;
- }
- final int len = (int)
- Math.min(bytesTotal - bytesRead, inBytes.getCapacity());
- int kvread = 0;
- while (kvread < len) {
- assert lengths[idx] >= 0;
- if (lengths[idx] <= 0) {
- nextSource();
- continue;
- }
- final int srcRead = (int) Math.min(len - kvread, lengths[idx]);
- IOUtils.readFully(input, inBytes.getBytes(), kvread, srcRead);
- //LOG.trace("Read " + srcRead + " bytes from " + paths[idx]);
- lengths[idx] -= srcRead;
- kvread += srcRead;
- }
- bytesRead += kvread;
- return true;
+ val.setSeed(r.nextLong());
+ return factory.next(null, val);
}
@Override
public float getProgress() throws IOException {
- return bytesRead / ((float)bytesTotal);
+ return factory.getProgress();
}
@Override
- public IntWritable getCurrentKey() { return key; }
+ public NullWritable getCurrentKey() {
+ return NullWritable.get();
+ }
@Override
- public BytesWritable getCurrentValue() { return inBytes; }
+ public GridmixRecord getCurrentValue() {
+ return val;
+ }
@Override
public void close() throws IOException {
- IOUtils.cleanup(null, input);
- }
- }
-
- static class GridmixSplit extends CombineFileSplit {
- private int id;
- private int nSpec;
- private int maps;
- private int reduces;
- private long inputRecords;
- private long outputBytes;
- private long outputRecords;
- private long maxMemory;
- private double[] reduceBytes = new double[0];
- private double[] reduceRecords = new double[0];
-
- // Spec for reduces id mod this
- private long[] reduceOutputBytes = new long[0];
- private long[] reduceOutputRecords = new long[0];
-
- GridmixSplit() {
- super();
- }
-
- public GridmixSplit(CombineFileSplit cfsplit, int maps, int id,
- long inputBytes, long inputRecords, long outputBytes,
- long outputRecords, double[] reduceBytes, double[] reduceRecords,
- long[] reduceOutputBytes, long[] reduceOutputRecords)
- throws IOException {
- super(cfsplit);
- this.id = id;
- this.maps = maps;
- reduces = reduceBytes.length;
- this.inputRecords = inputRecords;
- this.outputBytes = outputBytes;
- this.outputRecords = outputRecords;
- this.reduceBytes = Arrays.copyOf(reduceBytes, reduces);
- this.reduceRecords = Arrays.copyOf(reduceRecords, reduces);
- nSpec = reduceOutputBytes.length;
- this.reduceOutputBytes = reduceOutputBytes;
- this.reduceOutputRecords = reduceOutputRecords;
- }
- public int getId() {
- return id;
- }
- public int getMapCount() {
- return maps;
- }
- public long getInputRecords() {
- return inputRecords;
- }
- public long[] getOutputBytes() {
- final long[] ret = new long[reduces];
- for (int i = 0; i < reduces; ++i) {
- ret[i] = Math.round(outputBytes * reduceBytes[i]);
- }
- return ret;
- }
- public long[] getOutputRecords() {
- final long[] ret = new long[reduces];
- for (int i = 0; i < reduces; ++i) {
- ret[i] = Math.round(outputRecords * reduceRecords[i]);
- }
- return ret;
- }
- public long getReduceBytes(int i) {
- return reduceOutputBytes[i];
- }
- public long getReduceRecords(int i) {
- return reduceOutputRecords[i];
- }
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- WritableUtils.writeVInt(out, id);
- WritableUtils.writeVInt(out, maps);
- WritableUtils.writeVLong(out, inputRecords);
- WritableUtils.writeVLong(out, outputBytes);
- WritableUtils.writeVLong(out, outputRecords);
- WritableUtils.writeVLong(out, maxMemory);
- WritableUtils.writeVInt(out, reduces);
- for (int i = 0; i < reduces; ++i) {
- out.writeDouble(reduceBytes[i]);
- out.writeDouble(reduceRecords[i]);
- }
- WritableUtils.writeVInt(out, nSpec);
- for (int i = 0; i < nSpec; ++i) {
- out.writeLong(reduceOutputBytes[i]);
- out.writeLong(reduceOutputRecords[i]);
- }
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- id = WritableUtils.readVInt(in);
- maps = WritableUtils.readVInt(in);
- inputRecords = WritableUtils.readVLong(in);
- outputBytes = WritableUtils.readVLong(in);
- outputRecords = WritableUtils.readVLong(in);
- maxMemory = WritableUtils.readVLong(in);
- reduces = WritableUtils.readVInt(in);
- if (reduceBytes.length < reduces) {
- reduceBytes = new double[reduces];
- reduceRecords = new double[reduces];
- }
- for (int i = 0; i < reduces; ++i) {
- reduceBytes[i] = in.readDouble();
- reduceRecords[i] = in.readDouble();
- }
- nSpec = WritableUtils.readVInt(in);
- if (reduceOutputBytes.length < nSpec) {
- reduceOutputBytes = new long[nSpec];
- reduceOutputRecords = new long[nSpec];
- }
- for (int i = 0; i < nSpec; ++i) {
- reduceOutputBytes[i] = in.readLong();
- reduceOutputRecords[i] = in.readLong();
- }
+ factory.close();
}
}
static class GridmixInputFormat
- extends InputFormat<IntWritable,BytesWritable> {
+ extends InputFormat<NullWritable,GridmixRecord> {
@Override
public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
@@ -743,29 +420,28 @@
"gridmix.job.seq", -1));
}
@Override
- public RecordReader<IntWritable,BytesWritable> createRecordReader(
+ public RecordReader<NullWritable,GridmixRecord> createRecordReader(
InputSplit split, final TaskAttemptContext taskContext)
throws IOException {
return new GridmixRecordReader();
}
}
- static class RawBytesOutputFormat
- extends FileOutputFormat<NullWritable,BytesWritable> {
+ static class RawBytesOutputFormat<K>
+ extends FileOutputFormat<K,GridmixRecord> {
@Override
- public RecordWriter<NullWritable,BytesWritable> getRecordWriter(
+ public RecordWriter<K,GridmixRecord> getRecordWriter(
TaskAttemptContext job) throws IOException {
Path file = getDefaultWorkFile(job, "");
FileSystem fs = file.getFileSystem(job.getConfiguration());
final FSDataOutputStream fileOut = fs.create(file, false);
- return new RecordWriter<NullWritable,BytesWritable>() {
+ return new RecordWriter<K,GridmixRecord>() {
@Override
- public void write(NullWritable key, BytesWritable value)
+ public void write(K ignored, GridmixRecord value)
throws IOException {
- //LOG.trace("WROTE " + value.getLength() + " bytes");
- fileOut.write(value.getBytes(), 0, value.getLength());
+ value.writeRandom(fileOut, value.getSize());
}
@Override
public void close(TaskAttemptContext ctxt) throws IOException {
@@ -829,8 +505,10 @@
jobdesc.getTaskInfo(TaskType.REDUCE, i + j * maps);
specBytes[j] = info.getOutputBytes();
specRecords[j] = info.getOutputRecords();
- LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
- i + j * maps, info.getOutputRecords(), info.getOutputBytes()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
+ i + j * maps, info.getOutputRecords(), info.getOutputBytes()));
+ }
}
final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
splits.add(new GridmixSplit(striper.splitFor(inputDir,
@@ -842,77 +520,4 @@
pushDescription(id(), splits);
}
- static class InputStriper {
- int idx;
- long currentStart;
- FileStatus current;
- final List<FileStatus> files = new ArrayList<FileStatus>();
-
- InputStriper(FilePool inputDir, long mapBytes)
- throws IOException {
- final long inputBytes = inputDir.getInputFiles(mapBytes, files);
- if (mapBytes > inputBytes) {
- LOG.warn("Using " + inputBytes + "/" + mapBytes + " bytes");
- }
- current = files.get(0);
- }
-
- CombineFileSplit splitFor(FilePool inputDir, long bytes, int nLocs)
- throws IOException {
- final ArrayList<Path> paths = new ArrayList<Path>();
- final ArrayList<Long> start = new ArrayList<Long>();
- final ArrayList<Long> length = new ArrayList<Long>();
- final HashMap<String,Double> sb = new HashMap<String,Double>();
- while (bytes > 0) {
- paths.add(current.getPath());
- start.add(currentStart);
- final long fromFile = Math.min(bytes, current.getLen() - currentStart);
- length.add(fromFile);
- for (BlockLocation loc :
- inputDir.locationsFor(current, currentStart, fromFile)) {
- final double tedium = loc.getLength() / (1.0 * bytes);
- for (String l : loc.getHosts()) {
- Double j = sb.get(l);
- if (null == j) {
- sb.put(l, tedium);
- } else {
- sb.put(l, j.doubleValue() + tedium);
- }
- }
- }
- currentStart += fromFile;
- bytes -= fromFile;
- if (current.getLen() - currentStart == 0) {
- current = files.get(++idx % files.size());
- currentStart = 0;
- }
- }
- final ArrayList<Entry<String,Double>> sort =
- new ArrayList<Entry<String,Double>>(sb.entrySet());
- Collections.sort(sort, hostRank);
- final String[] hosts = new String[Math.min(nLocs, sort.size())];
- for (int i = 0; i < nLocs && i < sort.size(); ++i) {
- hosts[i] = sort.get(i).getKey();
- }
- return new CombineFileSplit(paths.toArray(new Path[0]),
- toLongArray(start), toLongArray(length), hosts);
- }
-
- private long[] toLongArray(final ArrayList<Long> sigh) {
- final long[] ret = new long[sigh.size()];
- for (int i = 0; i < ret.length; ++i) {
- ret[i] = sigh.get(i);
- }
- return ret;
- }
-
- final Comparator<Entry<String,Double>> hostRank =
- new Comparator<Entry<String,Double>>() {
- public int compare(Entry<String,Double> a, Entry<String,Double> b) {
- final double va = a.getValue();
- final double vb = b.getValue();
- return va > vb ? -1 : va < vb ? 1 : 0;
- }
- };
- }
}
Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java?rev=889779&view=auto
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java (added)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java Fri Dec 11 19:33:27 2009
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.WritableComparator;
+
+class GridmixKey extends GridmixRecord {
+ static final byte REDUCE_SPEC = 0;
+ static final byte DATA = 1;
+
+ static final int META_BYTES = 1;
+
+ private byte type;
+ private int partition; // NOT serialized
+ private Spec spec = new Spec();
+
+ GridmixKey() {
+ this(DATA, 1, 0L);
+ }
+ GridmixKey(byte type, int size, long seed) {
+ super(size, seed);
+ this.type = type;
+ // setting type may change pcnt random bytes
+ setSize(size);
+ }
+
+ @Override
+ public int getSize() {
+ switch (type) {
+ case REDUCE_SPEC:
+ return super.getSize() + spec.getSize() + META_BYTES;
+ case DATA:
+ return super.getSize() + META_BYTES;
+ default:
+ throw new IllegalStateException("Invalid type: " + type);
+ }
+ }
+
+ @Override
+ public void setSize(int size) {
+ switch (type) {
+ case REDUCE_SPEC:
+ super.setSize(size - (META_BYTES + spec.getSize()));
+ break;
+ case DATA:
+ super.setSize(size - META_BYTES);
+ break;
+ default:
+ throw new IllegalStateException("Invalid type: " + type);
+ }
+ }
+
+ /**
+ * Partition is not serialized.
+ */
+ public int getPartition() {
+ return partition;
+ }
+ public void setPartition(int partition) {
+ this.partition = partition;
+ }
+
+ public long getReduceInputRecords() {
+ assert REDUCE_SPEC == getType();
+ return spec.rec_in;
+ }
+ public void setReduceInputRecords(long rec_in) {
+ assert REDUCE_SPEC == getType();
+ final int origSize = getSize();
+ spec.rec_in = rec_in;
+ setSize(origSize);
+ }
+
+ public long getReduceOutputRecords() {
+ assert REDUCE_SPEC == getType();
+ return spec.rec_out;
+ }
+ public void setReduceOutputRecords(long rec_out) {
+ assert REDUCE_SPEC == getType();
+ final int origSize = getSize();
+ spec.rec_out = rec_out;
+ setSize(origSize);
+ }
+
+ public long getReduceOutputBytes() {
+ assert REDUCE_SPEC == getType();
+ return spec.bytes_out;
+ };
+ public void setReduceOutputBytes(long b_out) {
+ assert REDUCE_SPEC == getType();
+ final int origSize = getSize();
+ spec.bytes_out = b_out;
+ setSize(origSize);
+ }
+
+ public byte getType() {
+ return type;
+ }
+ public void setType(byte type) throws IOException {
+ final int origSize = getSize();
+ switch (type) {
+ case REDUCE_SPEC:
+ case DATA:
+ this.type = type;
+ break;
+ default:
+ throw new IOException("Invalid type: " + type);
+ }
+ setSize(origSize);
+ }
+
+ public void setSpec(Spec spec) {
+ assert REDUCE_SPEC == getType();
+ final int origSize = getSize();
+ this.spec.set(spec);
+ setSize(origSize);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ setType(in.readByte());
+ if (REDUCE_SPEC == getType()) {
+ spec.readFields(in);
+ }
+ }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ final byte t = getType();
+ out.writeByte(t);
+ if (REDUCE_SPEC == t) {
+ spec.write(out);
+ }
+ }
+ int fixedBytes() {
+ return super.fixedBytes() +
+ (REDUCE_SPEC == getType() ? spec.getSize() : 0) + META_BYTES;
+ }
+ @Override
+ public int compareTo(GridmixRecord other) {
+ final GridmixKey o = (GridmixKey) other;
+ final byte t1 = getType();
+ final byte t2 = o.getType();
+ if (t1 != t2) {
+ return t1 - t2;
+ }
+ return super.compareTo(other);
+ }
+
+ /**
+ * Note that while the spec is not explicitly included, changing the spec
+ * may change its size, which will affect equality.
+ */
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other != null && other.getClass() == getClass()) {
+ final GridmixKey o = ((GridmixKey)other);
+ return getType() == o.getType() && super.equals(o);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode() ^ getType();
+ }
+
+ public static class Spec implements Writable {
+ long rec_in;
+ long rec_out;
+ long bytes_out;
+ public Spec() { }
+
+ public void set(Spec other) {
+ rec_in = other.rec_in;
+ bytes_out = other.bytes_out;
+ rec_out = other.rec_out;
+ }
+
+ public int getSize() {
+ return WritableUtils.getVIntSize(rec_in) +
+ WritableUtils.getVIntSize(rec_out) +
+ WritableUtils.getVIntSize(bytes_out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ rec_in = WritableUtils.readVLong(in);
+ rec_out = WritableUtils.readVLong(in);
+ bytes_out = WritableUtils.readVLong(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVLong(out, rec_in);
+ WritableUtils.writeVLong(out, rec_out);
+ WritableUtils.writeVLong(out, bytes_out);
+ }
+ }
+
+ public static class Comparator extends GridmixRecord.Comparator {
+
+ private final DataInputBuffer di = new DataInputBuffer();
+ private final byte[] reset = di.getData();
+
+ public Comparator() {
+ super(GridmixKey.class);
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ try {
+ di.reset(b1, s1, l1);
+ final int x1 = WritableUtils.readVInt(di);
+ di.reset(b2, s2, l2);
+ final int x2 = WritableUtils.readVInt(di);
+ final int ret = (b1[s1 + x1] != b2[s2 + x2])
+ ? b1[s1 + x1] - b2[s2 + x2]
+ : super.compare(b1, s1, x1, b2, s2, x2);
+ di.reset(reset, 0, 0);
+ return ret;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static {
+ WritableComparator.define(GridmixKey.class, new Comparator());
+ }
+ }
+}
+
Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java?rev=889779&view=auto
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java (added)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java Fri Dec 11 19:33:27 2009
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+
+class GridmixRecord implements WritableComparable<GridmixRecord> {
+
+ private static final int FIXED_BYTES = 1;
+ private int size = -1;
+ private long seed;
+ private final DataInputBuffer dib =
+ new DataInputBuffer();
+ private final DataOutputBuffer dob =
+ new DataOutputBuffer(Long.SIZE / Byte.SIZE);
+ private byte[] literal = dob.getData();
+
+ GridmixRecord() {
+ this(1, 0L);
+ }
+
+ GridmixRecord(int size, long seed) {
+ this.seed = seed;
+ setSizeInternal(size);
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public void setSize(int size) {
+ setSizeInternal(size);
+ }
+
+ private void setSizeInternal(int size) {
+ this.size = Math.max(1, size);
+ try {
+ seed = maskSeed(seed, this.size);
+ dob.reset();
+ dob.writeLong(seed);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public final void setSeed(long seed) {
+ this.seed = seed;
+ }
+
+ /** Marsaglia, 2003. */
+ long nextRand(long x) {
+ x ^= (x << 13);
+ x ^= (x >>> 7);
+ return (x ^= (x << 17));
+ }
+
+ public void writeRandom(DataOutput out, final int size) throws IOException {
+ long tmp = seed;
+ out.writeLong(tmp);
+ int i = size - (Long.SIZE / Byte.SIZE);
+ while (i > Long.SIZE / Byte.SIZE - 1) {
+ tmp = nextRand(tmp);
+ out.writeLong(tmp);
+ i -= Long.SIZE / Byte.SIZE;
+ }
+ for (tmp = nextRand(tmp); i > 0; --i) {
+ out.writeByte((int)(tmp & 0xFF));
+ tmp >>>= Byte.SIZE;
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ size = WritableUtils.readVInt(in);
+ int payload = size - WritableUtils.getVIntSize(size);
+ if (payload > Long.SIZE / Byte.SIZE) {
+ seed = in.readLong();
+ payload -= Long.SIZE / Byte.SIZE;
+ } else {
+ Arrays.fill(literal, (byte)0);
+ in.readFully(literal, 0, payload);
+ dib.reset(literal, 0, literal.length);
+ seed = dib.readLong();
+ payload = 0;
+ }
+ final int vBytes = in.skipBytes(payload);
+ if (vBytes != payload) {
+ throw new EOFException("Expected " + payload + ", read " + vBytes);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // data bytes including vint encoding
+ WritableUtils.writeVInt(out, size);
+ final int payload = size - WritableUtils.getVIntSize(size);
+ if (payload > Long.SIZE / Byte.SIZE) {
+ writeRandom(out, payload);
+ } else if (payload > 0) {
+ out.write(literal, 0, payload);
+ }
+ }
+
+ @Override
+ public int compareTo(GridmixRecord other) {
+ return compareSeed(other.seed,
+ Math.max(0, other.getSize() - other.fixedBytes()));
+ }
+
+ int fixedBytes() {
+ // min vint size
+ return FIXED_BYTES;
+ }
+
+ private static long maskSeed(long sd, int sz) {
+ // Don't use fixedBytes here; subclasses will set intended random len
+ if (sz <= FIXED_BYTES) {
+ sd = 0L;
+ } else if (sz < Long.SIZE / Byte.SIZE + FIXED_BYTES) {
+ final int tmp = sz - FIXED_BYTES;
+ final long mask = (1L << (Byte.SIZE * tmp)) - 1;
+ sd &= mask << (Byte.SIZE * (Long.SIZE / Byte.SIZE - tmp));
+ }
+ return sd;
+ }
+
+ int compareSeed(long jSeed, int jSize) {
+ final int iSize = Math.max(0, getSize() - fixedBytes());
+ final int seedLen = Math.min(iSize, jSize) + FIXED_BYTES;
+ jSeed = maskSeed(jSeed, seedLen);
+ long iSeed = maskSeed(seed, seedLen);
+ final int cmplen = Math.min(iSize, jSize);
+ for (int i = 0; i < cmplen; i += Byte.SIZE) {
+ final int k = cmplen - i;
+ for (long j = Long.SIZE - Byte.SIZE;
+ j >= Math.max(0, Long.SIZE / Byte.SIZE - k) * Byte.SIZE;
+ j -= Byte.SIZE) {
+ final int xi = (int)((iSeed >>> j) & 0xFFL);
+ final int xj = (int)((jSeed >>> j) & 0xFFL);
+ if (xi != xj) {
+ return xi - xj;
+ }
+ }
+ iSeed = nextRand(iSeed);
+ jSeed = nextRand(jSeed);
+ }
+ return iSize - jSize;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other != null && other.getClass() == getClass()) {
+ final GridmixRecord o = ((GridmixRecord)other);
+ return getSize() == o.getSize() && seed == o.seed;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int)(seed * getSize());
+ }
+
+ public static class Comparator extends WritableComparator {
+
+ public Comparator() {
+ super(GridmixRecord.class);
+ }
+
+ public Comparator(Class<? extends WritableComparable<?>> sub) {
+ super(sub);
+ }
+
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int n1 = WritableUtils.decodeVIntSize(b1[s1]);
+ int n2 = WritableUtils.decodeVIntSize(b2[s2]);
+ n1 -= WritableUtils.getVIntSize(n1);
+ n2 -= WritableUtils.getVIntSize(n2);
+ return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
+ }
+
+ static {
+ WritableComparator.define(GridmixRecord.class, new Comparator());
+ }
+ }
+
+}
Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java?rev=889779&view=auto
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java (added)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java Fri Dec 11 19:33:27 2009
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+class GridmixSplit extends CombineFileSplit {
+ private int id;
+ private int nSpec;
+ private int maps;
+ private int reduces;
+ private long inputRecords;
+ private long outputBytes;
+ private long outputRecords;
+ private long maxMemory;
+ private double[] reduceBytes = new double[0];
+ private double[] reduceRecords = new double[0];
+
+ // Spec for reduces id mod this
+ private long[] reduceOutputBytes = new long[0];
+ private long[] reduceOutputRecords = new long[0];
+
+ GridmixSplit() {
+ super();
+ }
+
+ public GridmixSplit(CombineFileSplit cfsplit, int maps, int id,
+ long inputBytes, long inputRecords, long outputBytes,
+ long outputRecords, double[] reduceBytes, double[] reduceRecords,
+ long[] reduceOutputBytes, long[] reduceOutputRecords)
+ throws IOException {
+ super(cfsplit);
+ this.id = id;
+ this.maps = maps;
+ reduces = reduceBytes.length;
+ this.inputRecords = inputRecords;
+ this.outputBytes = outputBytes;
+ this.outputRecords = outputRecords;
+ this.reduceBytes = reduceBytes;
+ this.reduceRecords = reduceRecords;
+ nSpec = reduceOutputBytes.length;
+ this.reduceOutputBytes = reduceOutputBytes;
+ this.reduceOutputRecords = reduceOutputRecords;
+ }
+ public int getId() {
+ return id;
+ }
+ public int getMapCount() {
+ return maps;
+ }
+ public long getInputRecords() {
+ return inputRecords;
+ }
+ public long[] getOutputBytes() {
+ if (0 == reduces) {
+ return new long[] { outputBytes };
+ }
+ final long[] ret = new long[reduces];
+ for (int i = 0; i < reduces; ++i) {
+ ret[i] = Math.round(outputBytes * reduceBytes[i]);
+ }
+ return ret;
+ }
+ public long[] getOutputRecords() {
+ if (0 == reduces) {
+ return new long[] { outputRecords };
+ }
+ final long[] ret = new long[reduces];
+ for (int i = 0; i < reduces; ++i) {
+ ret[i] = Math.round(outputRecords * reduceRecords[i]);
+ }
+ return ret;
+ }
+ public long getReduceBytes(int i) {
+ return reduceOutputBytes[i];
+ }
+ public long getReduceRecords(int i) {
+ return reduceOutputRecords[i];
+ }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ WritableUtils.writeVInt(out, id);
+ WritableUtils.writeVInt(out, maps);
+ WritableUtils.writeVLong(out, inputRecords);
+ WritableUtils.writeVLong(out, outputBytes);
+ WritableUtils.writeVLong(out, outputRecords);
+ WritableUtils.writeVLong(out, maxMemory);
+ WritableUtils.writeVInt(out, reduces);
+ for (int i = 0; i < reduces; ++i) {
+ out.writeDouble(reduceBytes[i]);
+ out.writeDouble(reduceRecords[i]);
+ }
+ WritableUtils.writeVInt(out, nSpec);
+ for (int i = 0; i < nSpec; ++i) {
+ WritableUtils.writeVLong(out, reduceOutputBytes[i]);
+ WritableUtils.writeVLong(out, reduceOutputRecords[i]);
+ }
+ }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ id = WritableUtils.readVInt(in);
+ maps = WritableUtils.readVInt(in);
+ inputRecords = WritableUtils.readVLong(in);
+ outputBytes = WritableUtils.readVLong(in);
+ outputRecords = WritableUtils.readVLong(in);
+ maxMemory = WritableUtils.readVLong(in);
+ reduces = WritableUtils.readVInt(in);
+ if (reduceBytes.length < reduces) {
+ reduceBytes = new double[reduces];
+ reduceRecords = new double[reduces];
+ }
+ for (int i = 0; i < reduces; ++i) {
+ reduceBytes[i] = in.readDouble();
+ reduceRecords[i] = in.readDouble();
+ }
+ nSpec = WritableUtils.readVInt(in);
+ if (reduceOutputBytes.length < nSpec) {
+ reduceOutputBytes = new long[nSpec];
+ reduceOutputRecords = new long[nSpec];
+ }
+ for (int i = 0; i < nSpec; ++i) {
+ reduceOutputBytes[i] = WritableUtils.readVLong(in);
+ reduceOutputRecords[i] = WritableUtils.readVLong(in);
+ }
+ }
+}
Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java?rev=889779&view=auto
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java (added)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java Fri Dec 11 19:33:27 2009
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Given a {@link #FilePool}, obtain a set of files capable of satisfying
+ * a full set of splits, then iterate over each source to fill the request.
+ */
+class InputStriper {
+ public static final Log LOG = LogFactory.getLog(InputStriper.class);
+ int idx;
+ long currentStart;
+ FileStatus current;
+ final List<FileStatus> files = new ArrayList<FileStatus>();
+
+ /**
+ * @param inputDir Pool from which files are requested.
+ * @param mapBytes Sum of all expected split requests.
+ */
+ InputStriper(FilePool inputDir, long mapBytes)
+ throws IOException {
+ final long inputBytes = inputDir.getInputFiles(mapBytes, files);
+ if (mapBytes > inputBytes) {
+ LOG.warn("Using " + inputBytes + "/" + mapBytes + " bytes");
+ }
+ if (files.isEmpty() && mapBytes > 0) {
+ throw new IOException("Failed to satisfy request for " + mapBytes);
+ }
+ current = files.isEmpty() ? null : files.get(0);
+ }
+
+ /**
+ * @param inputDir Pool used to resolve block locations.
+ * @param bytes Target byte count
+ * @param nLocs Number of block locations per split.
+ * @return A set of files satisfying the byte count, with locations weighted
+ * to the dominating proportion of input bytes.
+ */
+ CombineFileSplit splitFor(FilePool inputDir, long bytes, int nLocs)
+ throws IOException {
+ final ArrayList<Path> paths = new ArrayList<Path>();
+ final ArrayList<Long> start = new ArrayList<Long>();
+ final ArrayList<Long> length = new ArrayList<Long>();
+ final HashMap<String,Double> sb = new HashMap<String,Double>();
+ do {
+ paths.add(current.getPath());
+ start.add(currentStart);
+ final long fromFile = Math.min(bytes, current.getLen() - currentStart);
+ length.add(fromFile);
+ for (BlockLocation loc :
+ inputDir.locationsFor(current, currentStart, fromFile)) {
+ final double tedium = loc.getLength() / (1.0 * bytes);
+ for (String l : loc.getHosts()) {
+ Double j = sb.get(l);
+ if (null == j) {
+ sb.put(l, tedium);
+ } else {
+ sb.put(l, j.doubleValue() + tedium);
+ }
+ }
+ }
+ currentStart += fromFile;
+ bytes -= fromFile;
+ if (current.getLen() - currentStart == 0) {
+ current = files.get(++idx % files.size());
+ currentStart = 0;
+ }
+ } while (bytes > 0);
+ final ArrayList<Entry<String,Double>> sort =
+ new ArrayList<Entry<String,Double>>(sb.entrySet());
+ Collections.sort(sort, hostRank);
+ final String[] hosts = new String[Math.min(nLocs, sort.size())];
+ for (int i = 0; i < nLocs && i < sort.size(); ++i) {
+ hosts[i] = sort.get(i).getKey();
+ }
+ return new CombineFileSplit(paths.toArray(new Path[0]),
+ toLongArray(start), toLongArray(length), hosts);
+ }
+
+ private long[] toLongArray(final ArrayList<Long> sigh) {
+ final long[] ret = new long[sigh.size()];
+ for (int i = 0; i < ret.length; ++i) {
+ ret[i] = sigh.get(i);
+ }
+ return ret;
+ }
+
+ static final Comparator<Entry<String,Double>> hostRank =
+ new Comparator<Entry<String,Double>>() {
+ public int compare(Entry<String,Double> a, Entry<String,Double> b) {
+ final double va = a.getValue();
+ final double vb = b.getValue();
+ return va > vb ? -1 : va < vb ? 1 : 0;
+ }
+ };
+}
Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java?rev=889779&view=auto
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java (added)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java Fri Dec 11 19:33:27 2009
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Factory passing reduce specification as its last record.
+ */
+class IntermediateRecordFactory extends RecordFactory {
+
+ private final GridmixKey.Spec spec;
+ private final RecordFactory factory;
+ private final int partition;
+ private final long targetRecords;
+ private boolean done = false;
+ private long accRecords = 0L;
+
+ /**
+ * @param targetBytes Expected byte count.
+ * @param targetRecords Expected record count; will emit spec records after
+ * this boundary is passed.
+ * @param partition Reduce to which records are emitted.
+ * @param spec Specification to emit.
+ * @param conf Unused.
+ */
+ public IntermediateRecordFactory(long targetBytes, long targetRecords,
+ int partition, GridmixKey.Spec spec, Configuration conf) {
+ this(new AvgRecordFactory(targetBytes, targetRecords, conf), partition,
+ targetRecords, spec, conf);
+ }
+
+ /**
+ * @param factory Factory from which byte/record counts are obtained.
+ * @param partition Reduce to which records are emitted.
+ * @param targetRecords Expected record count; will emit spec records after
+ * this boundary is passed.
+ * @param spec Specification to emit.
+ * @param conf Unused.
+ */
+ public IntermediateRecordFactory(RecordFactory factory, int partition,
+ long targetRecords, GridmixKey.Spec spec, Configuration conf) {
+ this.spec = spec;
+ this.factory = factory;
+ this.partition = partition;
+ this.targetRecords = targetRecords;
+ }
+
+ @Override
+ public boolean next(GridmixKey key, GridmixRecord val) throws IOException {
+ assert key != null;
+ final boolean rslt = factory.next(key, val);
+ ++accRecords;
+ if (rslt) {
+ if (accRecords < targetRecords) {
+ key.setType(GridmixKey.DATA);
+ } else {
+ final int orig = key.getSize();
+ key.setType(GridmixKey.REDUCE_SPEC);
+ spec.rec_in = accRecords;
+ key.setSpec(spec);
+ val.setSize(val.getSize() - (key.getSize() - orig));
+ // reset counters
+ accRecords = 0L;
+ spec.bytes_out = 0L;
+ spec.rec_out = 0L;
+ done = true;
+ }
+ } else if (!done) {
+ // ensure spec emitted
+ key.setType(GridmixKey.REDUCE_SPEC);
+ key.setPartition(partition);
+ key.setSize(0);
+ val.setSize(0);
+ spec.rec_in = 0L;
+ key.setSpec(spec);
+ done = true;
+ return true;
+ }
+ key.setPartition(partition);
+ return rslt;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return factory.getProgress();
+ }
+
+ @Override
+ public void close() throws IOException {
+ factory.close();
+ }
+}