You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/16 23:44:46 UTC
svn commit: r529410 [16/27] - in /lucene/hadoop/trunk: ./
src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/
src/contrib/abacus/src/java/org/apache/hadoop/abacus/
src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/...
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java Mon Apr 16 14:44:35 2007
@@ -47,7 +47,7 @@
}
public KeyValueLineRecordReader(Configuration job, FileSplit split)
- throws IOException {
+ throws IOException {
super(job, split);
String sepStr = job.get("key.value.separator.in.input.line", "\t");
this.separator = (byte) sepStr.charAt(0);
@@ -64,7 +64,7 @@
/** Read key/value pair in a line. */
public synchronized boolean next(Writable key, Writable value)
- throws IOException {
+ throws IOException {
Text tKey = (Text) key;
Text tValue = (Text) value;
byte[] line = null;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon Apr 16 14:44:35 2007
@@ -278,7 +278,7 @@
public JobStatus[] jobsToComplete() {return null;}
public TaskCompletionEvent[] getTaskCompletionEvents(
- String jobid, int fromEventId, int maxEvents) throws IOException{
+ String jobid, int fromEventId, int maxEvents) throws IOException{
return TaskCompletionEvent.EMPTY_ARRAY;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java Mon Apr 16 14:44:35 2007
@@ -23,28 +23,28 @@
* @author Mike Cafarella
*******************************/
interface MRConstants {
- //
- // Timeouts, constants
- //
- public static final long HEARTBEAT_INTERVAL = 10 * 1000;
- public static final long TASKTRACKER_EXPIRY_INTERVAL = 10 * 60 * 1000;
+ //
+ // Timeouts, constants
+ //
+ public static final long HEARTBEAT_INTERVAL = 10 * 1000;
+ public static final long TASKTRACKER_EXPIRY_INTERVAL = 10 * 60 * 1000;
- //for the inmemory filesystem (to do in-memory merge)
- /**
- * Constant denoting when a merge of in memory files will be triggered
- */
- public static final float MAX_INMEM_FILESYS_USE = 0.5f;
- /**
- * Constant denoting the max size (in terms of the fraction of the total
- * size of the filesys) of a map output file that we will try
- * to keep in mem. Ideally, this should be a factor of MAX_INMEM_FILESYS_USE
- */
- public static final float MAX_INMEM_FILESIZE_FRACTION =
- MAX_INMEM_FILESYS_USE/2;
+ //for the inmemory filesystem (to do in-memory merge)
+ /**
+ * Constant denoting when a merge of in memory files will be triggered
+ */
+ public static final float MAX_INMEM_FILESYS_USE = 0.5f;
+ /**
+ * Constant denoting the max size (in terms of the fraction of the total
+ * size of the filesys) of a map output file that we will try
+ * to keep in mem. Ideally, this should be a factor of MAX_INMEM_FILESYS_USE
+ */
+ public static final float MAX_INMEM_FILESIZE_FRACTION =
+ MAX_INMEM_FILESYS_USE/2;
- //
- // Result codes
- //
- public static int SUCCESS = 0;
- public static int FILE_NOT_FOUND = -1;
+ //
+ // Result codes
+ //
+ public static int SUCCESS = 0;
+ public static int FILE_NOT_FOUND = -1;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Apr 16 14:44:35 2007
@@ -77,7 +77,7 @@
}
public boolean isMapTask() {
- return true;
+ return true;
}
public void localizeConfiguration(JobConf conf) throws IOException {
@@ -118,7 +118,7 @@
InputSplit split;
try {
split = (InputSplit)
- ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
+ ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
} catch (ClassNotFoundException exp) {
IOException wrap = new IOException("Split class " + splitClass +
" not found");
@@ -198,23 +198,23 @@
private Thread createProgressThread(final TaskUmbilicalProtocol umbilical) {
//spawn a thread to give merge progress heartbeats
Thread sortProgress = new Thread() {
- public void run() {
- LOG.debug("Started thread: " + getName());
- while (true) {
- try {
- reportProgress(umbilical);
- Thread.sleep(PROGRESS_INTERVAL);
- } catch (InterruptedException e) {
+ public void run() {
+ LOG.debug("Started thread: " + getName());
+ while (true) {
+ try {
+ reportProgress(umbilical);
+ Thread.sleep(PROGRESS_INTERVAL);
+ } catch (InterruptedException e) {
return;
- } catch (Throwable e) {
+ } catch (Throwable e) {
LOG.info("Thread Exception in " +
- "reporting sort progress\n" +
- StringUtils.stringifyException(e));
+ "reporting sort progress\n" +
+ StringUtils.stringifyException(e));
continue;
+ }
}
}
- }
- };
+ };
sortProgress.setName("Sort progress reporter for task "+getTaskId());
sortProgress.setDaemon(true);
return sortProgress;
@@ -260,10 +260,10 @@
private FSDataOutputStream indexOut;
private long segmentStart;
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
- Reporter reporter) throws IOException {
+ Reporter reporter) throws IOException {
this.partitions = job.getNumReduceTasks();
this.partitioner = (Partitioner)ReflectionUtils.newInstance(
- job.getPartitionerClass(), job);
+ job.getPartitionerClass(), job);
maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024;
keyValBuffer = new DataOutputBuffer();
@@ -284,21 +284,21 @@
Class codecClass =
job.getMapOutputCompressorClass(DefaultCodec.class);
codec = (CompressionCodec)
- ReflectionUtils.newInstance(codecClass, job);
+ ReflectionUtils.newInstance(codecClass, job);
}
sortImpl = new BufferSorter[partitions];
for (int i = 0; i < partitions; i++)
sortImpl[i] = (BufferSorter)ReflectionUtils.newInstance(
- job.getClass("map.sort.class", MergeSorter.class,
- BufferSorter.class), job);
+ job.getClass("map.sort.class", MergeSorter.class,
+ BufferSorter.class), job);
}
public void startPartition(int partNumber) throws IOException {
//We create the sort output as multiple sequence files within a spilled
//file. So we create a writer for each partition.
segmentStart = out.getPos();
writer =
- SequenceFile.createWriter(job, out, job.getMapOutputKeyClass(),
- job.getMapOutputValueClass(), compressionType, codec);
+ SequenceFile.createWriter(job, out, job.getMapOutputKeyClass(),
+ job.getMapOutputValueClass(), compressionType, codec);
}
private void endPartition(int partNumber) throws IOException {
//Need to write syncs especially if block compression is in use
@@ -311,7 +311,7 @@
}
public void collect(WritableComparable key,
- Writable value) throws IOException {
+ Writable value) throws IOException {
if (key.getClass() != keyClass) {
throw new IOException("Type mismatch in key from map: expected "
@@ -362,7 +362,7 @@
numSpills);
indexOut = localFs.create(indexFilename);
LOG.debug("opened "+
- mapOutputFile.getSpillFile(getTaskId(), numSpills).getName());
+ mapOutputFile.getSpillFile(getTaskId(), numSpills).getName());
//invoke the sort
for (int i = 0; i < partitions; i++) {
@@ -379,16 +379,16 @@
//got all the input key/val, processed, and output the result
//key/vals before we write the partition header in the output file
Reducer combiner = (Reducer)ReflectionUtils.newInstance(
- job.getCombinerClass(), job);
+ job.getCombinerClass(), job);
// make collector
OutputCollector combineCollector = new OutputCollector() {
- public void collect(WritableComparable key, Writable value)
- throws IOException {
- synchronized (this) {
- writer.append(key, value);
+ public void collect(WritableComparable key, Writable value)
+ throws IOException {
+ synchronized (this) {
+ writer.append(key, value);
+ }
}
- }
- };
+ };
combineAndSpill(rIter, combiner, combineCollector);
combiner.close();
}
@@ -404,10 +404,10 @@
}
private void combineAndSpill(RawKeyValueIterator resultIter,
- Reducer combiner, OutputCollector combineCollector) throws IOException {
+ Reducer combiner, OutputCollector combineCollector) throws IOException {
//combine the key/value obtained from the offset & indices arrays.
CombineValuesIterator values = new CombineValuesIterator(resultIter,
- comparator, keyClass, valClass, job, reporter);
+ comparator, keyClass, valClass, job, reporter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector, reporter);
values.nextKey();
@@ -459,7 +459,7 @@
4096);
//The final index file output stream
FSDataOutputStream finalIndexOut = localFs.create(finalIndexFile, true,
- 4096);
+ 4096);
long segmentStart;
if (numSpills == 0) {
@@ -467,8 +467,8 @@
for (int i = 0; i < partitions; i++) {
segmentStart = finalOut.getPos();
SequenceFile.createWriter(job, finalOut,
- job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
- compressionType, codec);
+ job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
+ compressionType, codec);
finalIndexOut.writeLong(segmentStart);
finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
}
@@ -498,15 +498,15 @@
long segmentLength = indexIn.readLong();
indexIn.close();
SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset,
- segmentLength, filename[i]);
+ segmentLength, filename[i]);
s.preserveInput(true);
s.doSync();
segmentList.add(i, s);
}
segmentStart = finalOut.getPos();
SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut,
- job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
- compressionType, codec);
+ job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
+ compressionType, codec);
sorter.writeFile(sorter.merge(segmentList, new Path(getTaskId())),
writer);
//add a sync block - required esp. for block compression to ensure
@@ -537,9 +537,9 @@
private class CombineValuesIterator extends ValuesIterator {
public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in,
- WritableComparator comparator, Class keyClass,
- Class valClass, Configuration conf, Reporter reporter)
- throws IOException {
+ WritableComparator comparator, Class keyClass,
+ Class valClass, Configuration conf, Reporter reporter)
+ throws IOException {
super(in, comparator, keyClass, valClass, conf, reporter);
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java Mon Apr 16 14:44:35 2007
@@ -47,7 +47,7 @@
System.arraycopy(pointers, 0, pointersCopy, 0, count);
m.mergeSort(pointers, pointersCopy, 0, count);
return new MRSortResultIterator(super.keyValBuffer, pointersCopy,
- super.startOffsets, super.keyLengths, super.valueLengths);
+ super.startOffsets, super.keyLengths, super.valueLengths);
}
/** The implementation of the compare method from Comparator. This basically
* forwards the call to the super class's compare. Note that
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Mon Apr 16 14:44:35 2007
@@ -66,7 +66,7 @@
{
getProgress().setStatus("reduce");
setPhase(TaskStatus.Phase.SHUFFLE); // phase to start with
- }
+ }
private Progress copyPhase = getProgress().addPhase("copy");
private Progress sortPhase = getProgress().addPhase("sort");
@@ -87,7 +87,7 @@
}
public boolean isMapTask() {
- return false;
+ return false;
}
public int getNumMaps() { return numMaps; }
@@ -208,10 +208,10 @@
}
private class ReduceValuesIterator extends ValuesIterator {
public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
- WritableComparator comparator, Class keyClass,
- Class valClass,
- Configuration conf, Reporter reporter)
- throws IOException {
+ WritableComparator comparator, Class keyClass,
+ Class valClass,
+ Configuration conf, Reporter reporter)
+ throws IOException {
super(in, comparator, keyClass, valClass, conf, reporter);
}
public void informReduceProgress() {
@@ -232,7 +232,7 @@
throws IOException {
Class valueClass = job.getMapOutputValueClass();
Reducer reducer = (Reducer)ReflectionUtils.newInstance(
- job.getReducerClass(), job);
+ job.getReducerClass(), job);
FileSystem lfs = FileSystem.getLocal(job);
copyPhase.complete(); // copy is already complete
@@ -259,12 +259,12 @@
reportProgress(umbilical);
Thread.sleep(PROGRESS_INTERVAL);
} catch (InterruptedException e) {
- return;
+ return;
} catch (Throwable e) {
- System.out.println("Thread Exception in " +
- "reporting sort progress\n" +
- StringUtils.stringifyException(e));
- continue;
+ System.out.println("Thread Exception in " +
+ "reporting sort progress\n" +
+ StringUtils.stringifyException(e));
+ continue;
}
}
}
@@ -285,7 +285,7 @@
SequenceFile.Sorter sorter =
new SequenceFile.Sorter(lfs, comparator, valueClass, job);
rIter = sorter.merge(mapFiles, tempDir,
- !conf.getKeepFailedTaskFiles()); // sort
+ !conf.getKeepFailedTaskFiles()); // sort
} finally {
sortComplete = true;
@@ -302,8 +302,8 @@
FileSystem fs = FileSystem.get(job) ;
if( runSpeculative ){
- fs = new PhasedFileSystem (fs ,
- getJobId(), getTipId(), getTaskId());
+ fs = new PhasedFileSystem (fs ,
+ getJobId(), getTipId(), getTaskId());
}
final RecordWriter out =
@@ -323,7 +323,7 @@
Class keyClass = job.getMapOutputKeyClass();
Class valClass = job.getMapOutputValueClass();
ReduceValuesIterator values = new ReduceValuesIterator(rIter, comparator,
- keyClass, valClass, job, reporter);
+ keyClass, valClass, job, reporter);
values.informReduceProgress();
while (values.more()) {
reporter.incrCounter(REDUCE_INPUT_GROUPS, 1);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Mon Apr 16 14:44:35 2007
@@ -274,7 +274,7 @@
size = copyOutput(loc);
} catch (IOException e) {
LOG.warn(reduceTask.getTaskId() + " copy failed: " +
- loc.getMapTaskId() + " from " + loc.getHost());
+ loc.getMapTaskId() + " from " + loc.getHost());
LOG.warn(StringUtils.stringifyException(e));
} finally {
finish(size);
@@ -309,8 +309,8 @@
Path tmpFilename = new Path(finalFilename + "-" + id);
// this copies the map output file
tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics,
- tmpFilename, reduceTask.getPartition(),
- STALLED_COPY_TIMEOUT);
+ tmpFilename, reduceTask.getPartition(),
+ STALLED_COPY_TIMEOUT);
if (!neededOutputs.contains(loc.getMapId())) {
if (tmpFilename != null) {
FileSystem fs = tmpFilename.getFileSystem(conf);
@@ -352,7 +352,7 @@
" is " + inMemFileSys.getPercentUsed() +
" full. Triggering merge");
InMemFSMergeThread m = new InMemFSMergeThread(inMemFileSys,
- (LocalFileSystem)localFileSys, sorter);
+ (LocalFileSystem)localFileSys, sorter);
m.setName("Thread for merging in memory files");
m.setDaemon(true);
mergeInProgress = true;
@@ -425,7 +425,7 @@
//create an instance of the sorter
sorter =
new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(),
- conf.getMapOutputValueClass(), conf);
+ conf.getMapOutputValueClass(), conf);
// hosts -> next contact time
this.penaltyBox = new Hashtable();
@@ -437,7 +437,7 @@
MetricsContext metricsContext = MetricsUtil.getContext("mapred");
this.shuffleMetrics =
- MetricsUtil.createRecord(metricsContext, "shuffleInput");
+ MetricsUtil.createRecord(metricsContext, "shuffleInput");
this.shuffleMetrics.setTag("user", conf.getUser());
}
@@ -452,7 +452,7 @@
final int numOutputs = reduceTask.getNumMaps();
Map<Integer, MapOutputLocation> knownOutputs =
- new HashMap<Integer, MapOutputLocation>();
+ new HashMap<Integer, MapOutputLocation>();
int numInFlight = 0, numCopied = 0;
int lowThreshold = numCopiers*2;
long bytesTransferred = 0;
@@ -488,229 +488,229 @@
pingTimer.setDaemon(true);
pingTimer.start();
try {
- // loop until we get all required outputs or are killed
- while (!killed && numCopied < numOutputs && mergeThrowable == null) {
+ // loop until we get all required outputs or are killed
+ while (!killed && numCopied < numOutputs && mergeThrowable == null) {
- LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) +
- " map output(s)");
+ LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) +
+ " map output(s)");
- if (!neededOutputs.isEmpty()) {
- LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
- " map output location(s)");
- try {
- // Put the hash entries for the failed fetches. Entries here
- // might be replaced by (mapId) hashkeys from new successful
- // Map executions, if the fetch failures were due to lost tasks.
- // The replacements, if at all, will happen when we query the
- // JobTracker and put the mapId hashkeys with new MapOutputLocations
- // as values
- knownOutputs.putAll(retryFetches);
- // the call to queryJobTracker will modify fromEventId to a value
- // that it should be for the next call to queryJobTracker
- List <MapOutputLocation> locs = queryJobTracker(fromEventId,
- jobClient);
+ if (!neededOutputs.isEmpty()) {
+ LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
+ " map output location(s)");
+ try {
+ // Put the hash entries for the failed fetches. Entries here
+ // might be replaced by (mapId) hashkeys from new successful
+ // Map executions, if the fetch failures were due to lost tasks.
+ // The replacements, if at all, will happen when we query the
+ // JobTracker and put the mapId hashkeys with new MapOutputLocations
+ // as values
+ knownOutputs.putAll(retryFetches);
+ // the call to queryJobTracker will modify fromEventId to a value
+ // that it should be for the next call to queryJobTracker
+ List <MapOutputLocation> locs = queryJobTracker(fromEventId,
+ jobClient);
- // put discovered them on the known list
- for (int i=0; i < locs.size(); i++) {
- knownOutputs.put(new Integer(locs.get(i).getMapId()), locs.get(i));
+ // put discovered them on the known list
+ for (int i=0; i < locs.size(); i++) {
+ knownOutputs.put(new Integer(locs.get(i).getMapId()), locs.get(i));
+ }
+ LOG.info(reduceTask.getTaskId() +
+ " Got " + locs.size() +
+ " new map outputs from jobtracker and " + retryFetches.size() +
+ " map outputs from previous failures");
+ // clear the "failed" fetches hashmap
+ retryFetches.clear();
+ }
+ catch (IOException ie) {
+ LOG.warn(reduceTask.getTaskId() +
+ " Problem locating map outputs: " +
+ StringUtils.stringifyException(ie));
}
- LOG.info(reduceTask.getTaskId() +
- " Got " + locs.size() +
- " new map outputs from jobtracker and " + retryFetches.size() +
- " map outputs from previous failures");
- // clear the "failed" fetches hashmap
- retryFetches.clear();
- }
- catch (IOException ie) {
- LOG.warn(reduceTask.getTaskId() +
- " Problem locating map outputs: " +
- StringUtils.stringifyException(ie));
}
- }
- // now walk through the cache and schedule what we can
- int numKnown = knownOutputs.size(), numScheduled = 0;
- int numSlow = 0, numDups = 0;
-
- LOG.info(reduceTask.getTaskId() + " Got " + numKnown +
- " known map output location(s); scheduling...");
-
- synchronized (scheduledCopies) {
- Iterator locIt = knownOutputs.values().iterator();
-
- currentTime = System.currentTimeMillis();
- while (locIt.hasNext()) {
-
- MapOutputLocation loc = (MapOutputLocation)locIt.next();
- Long penaltyEnd = (Long)penaltyBox.get(loc.getHost());
- boolean penalized = false, duplicate = false;
+ // now walk through the cache and schedule what we can
+ int numKnown = knownOutputs.size(), numScheduled = 0;
+ int numSlow = 0, numDups = 0;
+
+ LOG.info(reduceTask.getTaskId() + " Got " + numKnown +
+ " known map output location(s); scheduling...");
+
+ synchronized (scheduledCopies) {
+ Iterator locIt = knownOutputs.values().iterator();
+
+ currentTime = System.currentTimeMillis();
+ while (locIt.hasNext()) {
+
+ MapOutputLocation loc = (MapOutputLocation)locIt.next();
+ Long penaltyEnd = (Long)penaltyBox.get(loc.getHost());
+ boolean penalized = false, duplicate = false;
- if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) {
- penalized = true; numSlow++;
- }
- if (uniqueHosts.contains(loc.getHost())) {
- duplicate = true; numDups++;
- }
+ if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) {
+ penalized = true; numSlow++;
+ }
+ if (uniqueHosts.contains(loc.getHost())) {
+ duplicate = true; numDups++;
+ }
- if (!penalized && !duplicate) {
- uniqueHosts.add(loc.getHost());
- scheduledCopies.add(loc);
- locIt.remove(); // remove from knownOutputs
- numInFlight++; numScheduled++;
+ if (!penalized && !duplicate) {
+ uniqueHosts.add(loc.getHost());
+ scheduledCopies.add(loc);
+ locIt.remove(); // remove from knownOutputs
+ numInFlight++; numScheduled++;
+ }
}
+ scheduledCopies.notifyAll();
}
- scheduledCopies.notifyAll();
- }
- LOG.info(reduceTask.getTaskId() + " Scheduled " + numScheduled +
- " of " + numKnown + " known outputs (" + numSlow +
- " slow hosts and " + numDups + " dup hosts)");
+ LOG.info(reduceTask.getTaskId() + " Scheduled " + numScheduled +
+ " of " + numKnown + " known outputs (" + numSlow +
+ " slow hosts and " + numDups + " dup hosts)");
- // if we have no copies in flight and we can't schedule anything
- // new, just wait for a bit
- try {
- if (numInFlight == 0 && numScheduled == 0) {
- Thread.sleep(5000);
- }
- } catch (InterruptedException e) { } // IGNORE
+ // if we have no copies in flight and we can't schedule anything
+ // new, just wait for a bit
+ try {
+ if (numInFlight == 0 && numScheduled == 0) {
+ Thread.sleep(5000);
+ }
+ } catch (InterruptedException e) { } // IGNORE
- while (!killed && numInFlight > 0 && mergeThrowable == null) {
- LOG.debug(reduceTask.getTaskId() + " numInFlight = " + numInFlight);
- CopyResult cr = getCopyResult();
+ while (!killed && numInFlight > 0 && mergeThrowable == null) {
+ LOG.debug(reduceTask.getTaskId() + " numInFlight = " + numInFlight);
+ CopyResult cr = getCopyResult();
- if (cr != null) {
- if (cr.getSuccess()) { // a successful copy
- numCopied++;
- bytesTransferred += cr.getSize();
+ if (cr != null) {
+ if (cr.getSuccess()) { // a successful copy
+ numCopied++;
+ bytesTransferred += cr.getSize();
- long secsSinceStart = (System.currentTimeMillis()-startTime)/1000+1;
- float mbs = ((float)bytesTransferred)/(1024*1024);
- float transferRate = mbs/secsSinceStart;
+ long secsSinceStart = (System.currentTimeMillis()-startTime)/1000+1;
+ float mbs = ((float)bytesTransferred)/(1024*1024);
+ float transferRate = mbs/secsSinceStart;
- copyPhase.startNextPhase();
- copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs +
- " at " +
- mbpsFormat.format(transferRate) + " MB/s)");
- } else if (cr.isObsolete()) {
- //ignore
- LOG.info(reduceTask.getTaskId() +
- " Ignoring obsolete copy result for Map Task: " +
- cr.getLocation().getMapTaskId() + " from host: " +
- cr.getHost());
- } else {
- retryFetches.put(new Integer(cr.getMapId()), cr.getLocation());
+ copyPhase.startNextPhase();
+ copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs +
+ " at " +
+ mbpsFormat.format(transferRate) + " MB/s)");
+ } else if (cr.isObsolete()) {
+ //ignore
+ LOG.info(reduceTask.getTaskId() +
+ " Ignoring obsolete copy result for Map Task: " +
+ cr.getLocation().getMapTaskId() + " from host: " +
+ cr.getHost());
+ } else {
+ retryFetches.put(new Integer(cr.getMapId()), cr.getLocation());
- // wait a random amount of time for next contact
- currentTime = System.currentTimeMillis();
- long nextContact = currentTime + 60 * 1000 +
- backoff.nextInt(maxBackoff*1000);
- penaltyBox.put(cr.getHost(), new Long(nextContact));
- LOG.warn(reduceTask.getTaskId() + " adding host " +
- cr.getHost() + " to penalty box, next contact in " +
- ((nextContact-currentTime)/1000) + " seconds");
-
- // other outputs from the failed host may be present in the
- // knownOutputs cache, purge them. This is important in case
- // the failure is due to a lost tasktracker (causes many
- // unnecessary backoffs). If not, we only take a small hit
- // polling the jobtracker a few more times
- Iterator locIt = knownOutputs.values().iterator();
- while (locIt.hasNext()) {
- MapOutputLocation loc = (MapOutputLocation)locIt.next();
- if (cr.getHost().equals(loc.getHost())) {
- retryFetches.put(new Integer(loc.getMapId()), loc);
- locIt.remove();
+ // wait a random amount of time for next contact
+ currentTime = System.currentTimeMillis();
+ long nextContact = currentTime + 60 * 1000 +
+ backoff.nextInt(maxBackoff*1000);
+ penaltyBox.put(cr.getHost(), new Long(nextContact));
+ LOG.warn(reduceTask.getTaskId() + " adding host " +
+ cr.getHost() + " to penalty box, next contact in " +
+ ((nextContact-currentTime)/1000) + " seconds");
+
+ // other outputs from the failed host may be present in the
+ // knownOutputs cache, purge them. This is important in case
+ // the failure is due to a lost tasktracker (causes many
+ // unnecessary backoffs). If not, we only take a small hit
+ // polling the jobtracker a few more times
+ Iterator locIt = knownOutputs.values().iterator();
+ while (locIt.hasNext()) {
+ MapOutputLocation loc = (MapOutputLocation)locIt.next();
+ if (cr.getHost().equals(loc.getHost())) {
+ retryFetches.put(new Integer(loc.getMapId()), loc);
+ locIt.remove();
+ }
}
}
+ uniqueHosts.remove(cr.getHost());
+ numInFlight--;
}
- uniqueHosts.remove(cr.getHost());
- numInFlight--;
- }
- boolean busy = true;
- // ensure we have enough to keep us busy
- if (numInFlight < lowThreshold && (numOutputs-numCopied) > probe_sample_size) {
- busy = false;
- }
- //Check whether we have more CopyResult to check. If there is none, and
- //we are not busy enough, break
- synchronized (copyResults) {
- if (copyResults.size() == 0 && !busy) {
- break;
+ boolean busy = true;
+ // ensure we have enough to keep us busy
+ if (numInFlight < lowThreshold && (numOutputs-numCopied) > probe_sample_size) {
+ busy = false;
+ }
+ //Check whether we have more CopyResult to check. If there is none, and
+ //we are not busy enough, break
+ synchronized (copyResults) {
+ if (copyResults.size() == 0 && !busy) {
+ break;
+ }
}
}
- }
- }
+ }
- // all done, inform the copiers to exit
- synchronized (copiers) {
- synchronized (scheduledCopies) {
- for (int i=0; i < copiers.length; i++) {
- copiers[i].interrupt();
- copiers[i] = null;
+ // all done, inform the copiers to exit
+ synchronized (copiers) {
+ synchronized (scheduledCopies) {
+ for (int i=0; i < copiers.length; i++) {
+ copiers[i].interrupt();
+ copiers[i] = null;
+ }
}
}
- }
- //Do a merge of in-memory files (if there are any)
- if (!killed && mergeThrowable == null) {
- try {
- //wait for an ongoing merge (if it is in flight) to complete
- while (mergeInProgress) {
- Thread.sleep(200);
- }
- LOG.info(reduceTask.getTaskId() +
- " Copying of all map outputs complete. " +
- "Initiating the last merge on the remaining files in " +
- inMemFileSys.getUri());
- if (mergeThrowable != null) {
- //this could happen if the merge that
- //was in progress threw an exception
- throw mergeThrowable;
- }
- //initiate merge
- Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
- if (inMemClosedFiles.length == 0) {
- LOG.info(reduceTask.getTaskId() + "Nothing to merge from " +
- inMemFileSys.getUri());
- return numCopied == numOutputs;
- }
- //name this output file same as the name of the first file that is
- //there in the current list of inmem files (this is guaranteed to be
- //absent on the disk currently. So we don't overwrite a prev.
- //created spill). Also we need to create the output file now since
- //it is not guaranteed that this file will be present after merge
- //is called (we delete empty sequence files as soon as we see them
- //in the merge method)
- SequenceFile.Writer writer = sorter.cloneFileAttributes(
- inMemFileSys.makeQualified(inMemClosedFiles[0]),
- localFileSys.makeQualified(inMemClosedFiles[0]), null);
-
- RawKeyValueIterator rIter = null;
+ //Do a merge of in-memory files (if there are any)
+ if (!killed && mergeThrowable == null) {
try {
- rIter = sorter.merge(inMemClosedFiles, true, inMemClosedFiles.length,
- new Path(reduceTask.getTaskId()));
- } catch (Exception e) {
- //make sure that we delete the ondisk file that we created earlier
- //when we invoked cloneFileAttributes
+ //wait for an ongoing merge (if it is in flight) to complete
+ while (mergeInProgress) {
+ Thread.sleep(200);
+ }
+ LOG.info(reduceTask.getTaskId() +
+ " Copying of all map outputs complete. " +
+ "Initiating the last merge on the remaining files in " +
+ inMemFileSys.getUri());
+ if (mergeThrowable != null) {
+ //this could happen if the merge that
+ //was in progress threw an exception
+ throw mergeThrowable;
+ }
+ //initiate merge
+ Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
+ if (inMemClosedFiles.length == 0) {
+ LOG.info(reduceTask.getTaskId() + "Nothing to merge from " +
+ inMemFileSys.getUri());
+ return numCopied == numOutputs;
+ }
+ //name this output file same as the name of the first file that is
+ //there in the current list of inmem files (this is guaranteed to be
+ //absent on the disk currently. So we don't overwrite a prev.
+ //created spill). Also we need to create the output file now since
+ //it is not guaranteed that this file will be present after merge
+ //is called (we delete empty sequence files as soon as we see them
+ //in the merge method)
+ SequenceFile.Writer writer = sorter.cloneFileAttributes(
+ inMemFileSys.makeQualified(inMemClosedFiles[0]),
+ localFileSys.makeQualified(inMemClosedFiles[0]), null);
+
+ RawKeyValueIterator rIter = null;
+ try {
+ rIter = sorter.merge(inMemClosedFiles, true, inMemClosedFiles.length,
+ new Path(reduceTask.getTaskId()));
+ } catch (Exception e) {
+ //make sure that we delete the ondisk file that we created earlier
+ //when we invoked cloneFileAttributes
+ writer.close();
+ localFileSys.delete(inMemClosedFiles[0]);
+ throw new IOException (StringUtils.stringifyException(e));
+ }
+ sorter.writeFile(rIter, writer);
writer.close();
- localFileSys.delete(inMemClosedFiles[0]);
- throw new IOException (StringUtils.stringifyException(e));
+ LOG.info(reduceTask.getTaskId() +
+ " Merge of the " +inMemClosedFiles.length +
+ " files in InMemoryFileSystem complete." +
+ " Local file is " + inMemClosedFiles[0]);
+ } catch (Throwable t) {
+ LOG.warn(reduceTask.getTaskId() +
+ " Final merge of the inmemory files threw an exception: " +
+ StringUtils.stringifyException(t));
+ return false;
}
- sorter.writeFile(rIter, writer);
- writer.close();
- LOG.info(reduceTask.getTaskId() +
- " Merge of the " +inMemClosedFiles.length +
- " files in InMemoryFileSystem complete." +
- " Local file is " + inMemClosedFiles[0]);
- } catch (Throwable t) {
- LOG.warn(reduceTask.getTaskId() +
- " Final merge of the inmemory files threw an exception: " +
- StringUtils.stringifyException(t));
- return false;
}
- }
- return mergeThrowable == null && numCopied == numOutputs && !killed;
+ return mergeThrowable == null && numCopied == numOutputs && !killed;
} finally {
inMemFileSys.close();
pingTimer.interrupt();
@@ -719,7 +719,7 @@
private CopyResult getCopyResult() {
- synchronized (copyResults) {
+ synchronized (copyResults) {
while (!killed && copyResults.isEmpty()) {
try {
copyResults.wait();
@@ -741,8 +741,8 @@
* @throws IOException
*/
private List <MapOutputLocation> queryJobTracker(IntWritable fromEventId,
- InterTrackerProtocol jobClient)
- throws IOException {
+ InterTrackerProtocol jobClient)
+ throws IOException {
long currentTime = System.currentTimeMillis();
long pollTime = lastPollTime + MIN_POLL_INTERVAL;
@@ -755,9 +755,9 @@
lastPollTime = currentTime;
TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents(
- reduceTask.getJobId().toString(),
- fromEventId.get(),
- probe_sample_size);
+ reduceTask.getJobId().toString(),
+ fromEventId.get(),
+ probe_sample_size);
List <MapOutputLocation> mapOutputsList = new ArrayList();
for (int i = 0; i < t.length; i++) {
@@ -799,7 +799,7 @@
private SequenceFile.Sorter sorter;
public InMemFSMergeThread(InMemoryFileSystem inMemFileSys,
- LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
+ LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
this.inMemFileSys = inMemFileSys;
this.localFileSys = localFileSys;
this.sorter = sorter;
@@ -813,7 +813,7 @@
//in flight. So we make sure that we have some 'closed' map
//output files to merge to get the benefit of in-memory merge
if (inMemClosedFiles.length >=
- (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
+ (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to be
//absent on the disk currently. So we don't overwrite a prev.
@@ -822,12 +822,12 @@
//is called (we delete empty sequence files as soon as we see them
//in the merge method)
SequenceFile.Writer writer = sorter.cloneFileAttributes(
- inMemFileSys.makeQualified(inMemClosedFiles[0]),
- localFileSys.makeQualified(inMemClosedFiles[0]), null);
+ inMemFileSys.makeQualified(inMemClosedFiles[0]),
+ localFileSys.makeQualified(inMemClosedFiles[0]), null);
RawKeyValueIterator rIter;
try {
rIter = sorter.merge(inMemClosedFiles, true,
- inMemClosedFiles.length, new Path(reduceTask.getTaskId()));
+ inMemClosedFiles.length, new Path(reduceTask.getTaskId()));
} catch (Exception e) {
//make sure that we delete the ondisk file that we created earlier
//when we invoked cloneFileAttributes
@@ -844,12 +844,12 @@
}
else {
LOG.info(reduceTask.getTaskId() + " Nothing to merge from " +
- inMemFileSys.getUri());
+ inMemFileSys.getUri());
}
} catch (Throwable t) {
LOG.warn(reduceTask.getTaskId() +
- " Intermediate Merge of the inmemory files threw an exception: " +
- StringUtils.stringifyException(t));
+ " Intermediate Merge of the inmemory files threw an exception: " +
+ StringUtils.stringifyException(t));
ReduceTaskRunner.this.mergeThrowable = t;
}
finally {
@@ -858,8 +858,8 @@
}
}
final private static PathFilter MAP_OUTPUT_FILTER = new PathFilter() {
- public boolean accept(Path file) {
- return file.toString().endsWith(".out");
- }
- };
+ public boolean accept(Path file) {
+ return file.toString().endsWith(".out");
+ }
+ };
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java Mon Apr 16 14:44:35 2007
@@ -29,13 +29,13 @@
* A constant of Reporter type that does nothing.
*/
public static final Reporter NULL = new Reporter() {
- public void setStatus(String s) {
- }
- public void progress() throws IOException {
- }
- public void incrCounter(Enum key, long amount) {
- }
- };
+ public void setStatus(String s) {
+ }
+ public void progress() throws IOException {
+ }
+ public void incrCounter(Enum key, long amount) {
+ }
+ };
/**
* Alter the application's status description.
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java Mon Apr 16 14:44:35 2007
@@ -27,61 +27,61 @@
* @author Mike Cafarella
*/
public interface RunningJob {
- /**
- * Returns an identifier for the job
- */
- public String getJobID();
-
- /**
- * Returns the path of the submitted job.
- */
- public String getJobFile();
-
- /**
- * Returns a URL where some job progress information will be displayed.
- */
- public String getTrackingURL();
-
- /**
- * Returns a float between 0.0 and 1.0, indicating progress on
- * the map portion of the job. When all map tasks have completed,
- * the function returns 1.0.
- */
- public float mapProgress() throws IOException;
-
- /**
- * Returns a float between 0.0 and 1.0, indicating progress on
- * the reduce portion of the job. When all reduce tasks have completed,
- * the function returns 1.0.
- */
- public float reduceProgress() throws IOException;
-
- /**
- * Non-blocking function to check whether the job is finished or not.
- */
- public boolean isComplete() throws IOException;
-
- /**
- * True iff job completed successfully.
- */
- public boolean isSuccessful() throws IOException;
-
- /**
- * Blocks until the job is complete.
- */
- public void waitForCompletion() throws IOException;
-
- /**
- * Kill the running job. Blocks until all job tasks have been
- * killed as well. If the job is no longer running, it simply returns.
- */
- public void killJob() throws IOException;
+ /**
+ * Returns an identifier for the job
+ */
+ public String getJobID();
+
+ /**
+ * Returns the path of the submitted job.
+ */
+ public String getJobFile();
+
+ /**
+ * Returns a URL where some job progress information will be displayed.
+ */
+ public String getTrackingURL();
+
+ /**
+ * Returns a float between 0.0 and 1.0, indicating progress on
+ * the map portion of the job. When all map tasks have completed,
+ * the function returns 1.0.
+ */
+ public float mapProgress() throws IOException;
+
+ /**
+ * Returns a float between 0.0 and 1.0, indicating progress on
+ * the reduce portion of the job. When all reduce tasks have completed,
+ * the function returns 1.0.
+ */
+ public float reduceProgress() throws IOException;
+
+ /**
+ * Non-blocking function to check whether the job is finished or not.
+ */
+ public boolean isComplete() throws IOException;
+
+ /**
+ * True iff job completed successfully.
+ */
+ public boolean isSuccessful() throws IOException;
+
+ /**
+ * Blocks until the job is complete.
+ */
+ public void waitForCompletion() throws IOException;
+
+ /**
+ * Kill the running job. Blocks until all job tasks have been
+ * killed as well. If the job is no longer running, it simply returns.
+ */
+ public void killJob() throws IOException;
- public TaskCompletionEvent[] getTaskCompletionEvents(
- int startFrom) throws IOException;
+ public TaskCompletionEvent[] getTaskCompletionEvents(
+ int startFrom) throws IOException;
- /**
- * Gets the counters for this job.
- */
- public Counters getCounters() throws IOException;
+ /**
+ * Gets the counters for this job.
+ */
+ public Counters getCounters() throws IOException;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java Mon Apr 16 14:44:35 2007
@@ -32,7 +32,7 @@
}
public RecordReader getRecordReader(InputSplit split, JobConf job,
- Reporter reporter) throws IOException {
+ Reporter reporter) throws IOException {
reporter.setStatus(split.toString());
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java Mon Apr 16 14:44:35 2007
@@ -37,7 +37,7 @@
private Writable innerValue = super.createValue();
public SequenceFileAsTextRecordReader(Configuration conf, FileSplit split)
- throws IOException {
+ throws IOException {
super(conf, split);
}
@@ -51,7 +51,7 @@
/** Read key/value pair in a line. */
public synchronized boolean next(Writable key, Writable value)
- throws IOException {
+ throws IOException {
Text tKey = (Text) key;
Text tValue = (Text) value;
if (!super.next(innerKey, innerValue)) {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java Mon Apr 16 14:44:35 2007
@@ -44,264 +44,264 @@
*/
public class SequenceFileInputFilter extends SequenceFileInputFormat {
- final private static String FILTER_CLASS = "sequencefile.filter.class";
- final private static String FILTER_FREQUENCY
- = "sequencefile.filter.frequency";
- final private static String FILTER_REGEX = "sequencefile.filter.regex";
+ final private static String FILTER_CLASS = "sequencefile.filter.class";
+ final private static String FILTER_FREQUENCY
+ = "sequencefile.filter.frequency";
+ final private static String FILTER_REGEX = "sequencefile.filter.regex";
- public SequenceFileInputFilter() {
- }
+ public SequenceFileInputFilter() {
+ }
- /** Create a record reader for the given split
- * @param split file split
- * @param job job configuration
- * @param reporter reporter who sends report to task tracker
- * @return RecordReader
- */
- public RecordReader getRecordReader(InputSplit split,
- JobConf job, Reporter reporter)
+ /** Create a record reader for the given split
+ * @param split file split
+ * @param job job configuration
+ * @param reporter reporter who sends report to task tracker
+ * @return RecordReader
+ */
+ public RecordReader getRecordReader(InputSplit split,
+ JobConf job, Reporter reporter)
throws IOException {
- reporter.setStatus(split.toString());
+ reporter.setStatus(split.toString());
- return new FilterRecordReader(job, (FileSplit) split);
- }
+ return new FilterRecordReader(job, (FileSplit) split);
+ }
- /** set the filter class
- *
- * @param conf application configuration
- * @param filterClass filter class
- */
- public static void setFilterClass(Configuration conf, Class filterClass) {
- conf.set(FILTER_CLASS, filterClass.getName() );
- }
+ /** set the filter class
+ *
+ * @param conf application configuration
+ * @param filterClass filter class
+ */
+ public static void setFilterClass(Configuration conf, Class filterClass) {
+ conf.set(FILTER_CLASS, filterClass.getName() );
+ }
- /**
- * filter interface
+ /**
+ * filter interface
+ */
+ public interface Filter extends Configurable {
+ /** filter function
+ * Decide if a record should be filtered or not
+ * @param key record key
+ * @return true if a record is accepted; return false otherwise
*/
- public interface Filter extends Configurable {
- /** filter function
- * Decide if a record should be filtered or not
- * @param key record key
- * @return true if a record is accepted; return false otherwise
- */
- public abstract boolean accept(Writable key);
- }
+ public abstract boolean accept(Writable key);
+ }
- /**
- * base calss for Filters
- */
- public static abstract class FilterBase implements Filter {
- Configuration conf;
+ /**
+ * base calss for Filters
+ */
+ public static abstract class FilterBase implements Filter {
+ Configuration conf;
- public Configuration getConf() {
- return conf;
- }
+ public Configuration getConf() {
+ return conf;
}
+ }
- /** Records filter by matching key to regex
+ /** Records filter by matching key to regex
+ */
+ public static class RegexFilter extends FilterBase {
+ private Pattern p;
+ /** Define the filtering regex and stores it in conf
+ * @param conf where the regex is set
+ * @param regex regex used as a filter
*/
- public static class RegexFilter extends FilterBase {
- private Pattern p;
- /** Define the filtering regex and stores it in conf
- * @param conf where the regex is set
- * @param regex regex used as a filter
- */
- public static void setPattern(Configuration conf, String regex )
- throws PatternSyntaxException {
- try {
- Pattern.compile(regex);
- } catch (PatternSyntaxException e) {
- throw new IllegalArgumentException("Invalid pattern: "+regex);
- }
- conf.set(FILTER_REGEX, regex);
- }
+ public static void setPattern(Configuration conf, String regex )
+ throws PatternSyntaxException {
+ try {
+ Pattern.compile(regex);
+ } catch (PatternSyntaxException e) {
+ throw new IllegalArgumentException("Invalid pattern: "+regex);
+ }
+ conf.set(FILTER_REGEX, regex);
+ }
- public RegexFilter() { }
+ public RegexFilter() { }
- /** configure the Filter by checking the configuration
- */
- public void setConf(Configuration conf) {
- String regex = conf.get(FILTER_REGEX);
- if(regex==null)
- throw new RuntimeException(FILTER_REGEX + "not set");
- this.p = Pattern.compile(regex);
- this.conf = conf;
- }
+ /** configure the Filter by checking the configuration
+ */
+ public void setConf(Configuration conf) {
+ String regex = conf.get(FILTER_REGEX);
+ if(regex==null)
+ throw new RuntimeException(FILTER_REGEX + "not set");
+ this.p = Pattern.compile(regex);
+ this.conf = conf;
+ }
- /** Filtering method
- * If key matches the regex, return true; otherwise return false
- * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
- */
- public boolean accept(Writable key) {
- return p.matcher(key.toString()).matches();
- }
+ /** Filtering method
+ * If key matches the regex, return true; otherwise return false
+ * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
+ */
+ public boolean accept(Writable key) {
+ return p.matcher(key.toString()).matches();
}
+ }
- /** This class returns a percentage of records
- * The percentage is determined by a filtering frequency <i>f</i> using
- * the criteria record# % f == 0.
- * For example, if the frequency is 10, one out of 10 records is returned.
+ /** This class returns a percentage of records
+ * The percentage is determined by a filtering frequency <i>f</i> using
+ * the criteria record# % f == 0.
+ * For example, if the frequency is 10, one out of 10 records is returned.
+ */
+ public static class PercentFilter extends FilterBase {
+ private int frequency;
+ private int count;
+
+ /** set the frequency and stores it in conf
+ * @param conf configuration
+ * @param frequency filtering frequencey
*/
- public static class PercentFilter extends FilterBase {
- private int frequency;
- private int count;
-
- /** set the frequency and stores it in conf
- * @param conf configuration
- * @param frequency filtering frequencey
- */
- public static void setFrequency(Configuration conf, int frequency ){
- if(frequency<=0)
- throw new IllegalArgumentException(
- "Negative " + FILTER_FREQUENCY + ": "+frequency);
- conf.setInt(FILTER_FREQUENCY, frequency);
- }
+ public static void setFrequency(Configuration conf, int frequency ){
+ if(frequency<=0)
+ throw new IllegalArgumentException(
+ "Negative " + FILTER_FREQUENCY + ": "+frequency);
+ conf.setInt(FILTER_FREQUENCY, frequency);
+ }
- public PercentFilter() { }
+ public PercentFilter() { }
- /** configure the filter by checking the configuration
- *
- * @param conf configuration
- */
- public void setConf(Configuration conf) {
- this.frequency = conf.getInt("sequencefile.filter.frequency", 10);
- if(this.frequency <=0 ) {
- throw new RuntimeException(
- "Negative "+FILTER_FREQUENCY+": "+this.frequency);
- }
- this.conf = conf;
- }
-
- /** Filtering method
- * If record# % frequency==0, return true; otherwise return false
- * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
- */
- public boolean accept(Writable key) {
- boolean accepted = false;
- if(count == 0)
- accepted = true;
- if( ++count == frequency ) {
- count = 0;
- }
- return accepted;
- }
+ /** configure the filter by checking the configuration
+ *
+ * @param conf configuration
+ */
+ public void setConf(Configuration conf) {
+ this.frequency = conf.getInt("sequencefile.filter.frequency", 10);
+ if(this.frequency <=0 ) {
+ throw new RuntimeException(
+ "Negative "+FILTER_FREQUENCY+": "+this.frequency);
+ }
+ this.conf = conf;
}
- /** This class returns a set of records by examing the MD5 digest of its
- * key against a filtering frequency <i>f</i>. The filtering criteria is
- * MD5(key) % f == 0.
+ /** Filtering method
+ * If record# % frequency==0, return true; otherwise return false
+ * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
*/
- public static class MD5Filter extends FilterBase {
- private int frequency;
- private static final MessageDigest DIGESTER;
- public static final int MD5_LEN = 16;
- private byte [] digest = new byte[MD5_LEN];
-
- static {
- try {
- DIGESTER = MessageDigest.getInstance("MD5");
- } catch (NoSuchAlgorithmException e) {
- throw new RuntimeException(e);
- }
- }
+ public boolean accept(Writable key) {
+ boolean accepted = false;
+ if(count == 0)
+ accepted = true;
+ if( ++count == frequency ) {
+ count = 0;
+ }
+ return accepted;
+ }
+ }
+ /** This class returns a set of records by examing the MD5 digest of its
+ * key against a filtering frequency <i>f</i>. The filtering criteria is
+ * MD5(key) % f == 0.
+ */
+ public static class MD5Filter extends FilterBase {
+ private int frequency;
+ private static final MessageDigest DIGESTER;
+ public static final int MD5_LEN = 16;
+ private byte [] digest = new byte[MD5_LEN];
+
+ static {
+ try {
+ DIGESTER = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ }
- /** set the filtering frequency in configuration
- *
- * @param conf configuration
- * @param frequency filtering frequency
- */
- public static void setFrequency(Configuration conf, int frequency ){
- if(frequency<=0)
- throw new IllegalArgumentException(
- "Negative " + FILTER_FREQUENCY + ": "+frequency);
- conf.setInt(FILTER_FREQUENCY, frequency);
- }
+
+ /** set the filtering frequency in configuration
+ *
+ * @param conf configuration
+ * @param frequency filtering frequency
+ */
+ public static void setFrequency(Configuration conf, int frequency ){
+ if(frequency<=0)
+ throw new IllegalArgumentException(
+ "Negative " + FILTER_FREQUENCY + ": "+frequency);
+ conf.setInt(FILTER_FREQUENCY, frequency);
+ }
- public MD5Filter() { }
+ public MD5Filter() { }
- /** configure the filter according to configuration
- *
- * @param conf configuration
- */
- public void setConf(Configuration conf) {
- this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
- if(this.frequency <=0 ) {
- throw new RuntimeException(
- "Negative "+FILTER_FREQUENCY+": "+this.frequency);
- }
- this.conf = conf;
- }
+ /** configure the filter according to configuration
+ *
+ * @param conf configuration
+ */
+ public void setConf(Configuration conf) {
+ this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
+ if(this.frequency <=0 ) {
+ throw new RuntimeException(
+ "Negative "+FILTER_FREQUENCY+": "+this.frequency);
+ }
+ this.conf = conf;
+ }
- /** Filtering method
- * If MD5(key) % frequency==0, return true; otherwise return false
- * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
- */
- public boolean accept(Writable key) {
- try {
- long hashcode;
- if( key instanceof Text) {
- hashcode = MD5Hashcode((Text)key);
- } else if( key instanceof BytesWritable) {
- hashcode = MD5Hashcode((BytesWritable)key);
- } else {
- ByteBuffer bb;
- bb = Text.encode(key.toString());
- hashcode = MD5Hashcode(bb.array(),0, bb.limit());
- }
- if(hashcode/frequency*frequency==hashcode)
- return true;
- } catch(Exception e) {
- LOG.warn(e);
- throw new RuntimeException(e);
- }
- return false;
- }
+ /** Filtering method
+ * If MD5(key) % frequency==0, return true; otherwise return false
+ * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
+ */
+ public boolean accept(Writable key) {
+ try {
+ long hashcode;
+ if( key instanceof Text) {
+ hashcode = MD5Hashcode((Text)key);
+ } else if( key instanceof BytesWritable) {
+ hashcode = MD5Hashcode((BytesWritable)key);
+ } else {
+ ByteBuffer bb;
+ bb = Text.encode(key.toString());
+ hashcode = MD5Hashcode(bb.array(),0, bb.limit());
+ }
+ if(hashcode/frequency*frequency==hashcode)
+ return true;
+ } catch(Exception e) {
+ LOG.warn(e);
+ throw new RuntimeException(e);
+ }
+ return false;
+ }
- private long MD5Hashcode(Text key) throws DigestException {
- return MD5Hashcode(key.getBytes(), 0, key.getLength());
- }
+ private long MD5Hashcode(Text key) throws DigestException {
+ return MD5Hashcode(key.getBytes(), 0, key.getLength());
+ }
- private long MD5Hashcode(BytesWritable key) throws DigestException {
- return MD5Hashcode(key.get(), 0, key.getSize());
- }
- synchronized private long MD5Hashcode(byte[] bytes,
- int start, int length ) throws DigestException {
- DIGESTER.update(bytes, 0, length);
- DIGESTER.digest(digest, 0, MD5_LEN);
- long hashcode=0;
- for (int i = 0; i < 8; i++)
- hashcode |= ((digest[i] & 0xffL) << (8*(7-i)));
- return hashcode;
- }
+ private long MD5Hashcode(BytesWritable key) throws DigestException {
+ return MD5Hashcode(key.get(), 0, key.getSize());
}
+ synchronized private long MD5Hashcode(byte[] bytes,
+ int start, int length ) throws DigestException {
+ DIGESTER.update(bytes, 0, length);
+ DIGESTER.digest(digest, 0, MD5_LEN);
+ long hashcode=0;
+ for (int i = 0; i < 8; i++)
+ hashcode |= ((digest[i] & 0xffL) << (8*(7-i)));
+ return hashcode;
+ }
+ }
- private static class FilterRecordReader extends SequenceFileRecordReader {
- private Filter filter;
+ private static class FilterRecordReader extends SequenceFileRecordReader {
+ private Filter filter;
- public FilterRecordReader(Configuration conf, FileSplit split)
- throws IOException {
- super(conf, split);
- // instantiate filter
- filter = (Filter)ReflectionUtils.newInstance(
- conf.getClass(FILTER_CLASS, PercentFilter.class),
- conf);
- }
+ public FilterRecordReader(Configuration conf, FileSplit split)
+ throws IOException {
+ super(conf, split);
+ // instantiate filter
+ filter = (Filter)ReflectionUtils.newInstance(
+ conf.getClass(FILTER_CLASS, PercentFilter.class),
+ conf);
+ }
- public synchronized boolean next(Writable key, Writable value)
- throws IOException {
- while (next(key)) {
- if(filter.accept(key)) {
- getCurrentValue(value);
- return true;
- }
- }
-
- return false;
+ public synchronized boolean next(Writable key, Writable value)
+ throws IOException {
+ while (next(key)) {
+ if(filter.accept(key)) {
+ getCurrentValue(value);
+ return true;
}
+ }
+
+ return false;
}
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java Mon Apr 16 14:44:35 2007
@@ -111,8 +111,8 @@
* @param servletClass The servlet class
*/
public <T extends HttpServlet>
- void addServlet(String name, String pathSpec,
- Class<T> servletClass) {
+ void addServlet(String name, String pathSpec,
+ Class<T> servletClass) {
WebApplicationContext context = webAppContext;
try {
if (name == null) {
@@ -223,7 +223,7 @@
public static class StackServlet extends HttpServlet {
public void doGet(HttpServletRequest request,
HttpServletResponse response
- ) throws ServletException, IOException {
+ ) throws ServletException, IOException {
OutputStream outStream = response.getOutputStream();
ReflectionUtils.printThreadInfo(new PrintWriter(outStream), "");
outStream.close();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon Apr 16 14:44:35 2007
@@ -68,7 +68,7 @@
public Task() {}
public Task(String jobId, String jobFile, String tipId,
- String taskId, int partition) {
+ String taskId, int partition) {
this.jobFile = jobFile;
this.taskId = taskId;
this.jobId = jobId;
@@ -184,14 +184,14 @@
}
}
public void progress() throws IOException {
- reportProgress(umbilical);
+ reportProgress(umbilical);
}
public void incrCounter(Enum key, long amount) {
- Counters counters = getCounters();
- if (counters != null) {
- counters.incrCounter(key, amount);
- }
+ Counters counters = getCounters();
+ if (counters != null) {
+ counters.incrCounter(key, amount);
}
+ }
};
}