You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2009/03/19 01:00:58 UTC
svn commit: r755792 - in /hadoop/core/branches/branch-0.20: ./
src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/
src/test/org/apache/hadoop/mapreduce/
Author: omalley
Date: Thu Mar 19 00:00:58 2009
New Revision: 755792
URL: http://svn.apache.org/viewvc?rev=755792&view=rev
Log:
HADOOP-5382. Support combiners in the new context object API. (omalley)
Modified:
hadoop/core/branches/branch-0.20/ (props changed)
hadoop/core/branches/branch-0.20/CHANGES.txt (contents, props changed)
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Propchange: hadoop/core/branches/branch-0.20/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 19 00:00:58 2009
@@ -1,2 +1,2 @@
/hadoop/core/branches/branch-0.19:713112
-/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746338,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426
+/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746338,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790
Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=755792&r1=755791&r2=755792&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Thu Mar 19 00:00:58 2009
@@ -642,8 +642,9 @@
HADOOP-5255. Fix use of Math.abs to avoid overflow. (Jonathan Ellis via
cdouglas)
- HADOOP-5269. Fixes a problem to do with tasktracker holding on to FAILED_UNCLEAN
- or KILLED_UNCLEAN tasks forever. (Amareshwari Sriramadasu via ddas)
+ HADOOP-5269. Fixes a problem to do with tasktracker holding on to
+ FAILED_UNCLEAN or KILLED_UNCLEAN tasks forever. (Amareshwari Sriramadasu
+ via ddas)
HADOOP-5214. Fixes a ConcurrentModificationException while the Fairshare
Scheduler accesses the tasktrackers stored by the JobTracker.
@@ -651,37 +652,41 @@
HADOOP-5233. Addresses the three issues - Race condition in updating
status, NPE in TaskTracker task localization when the conf file is missing
- (HADOOP-5234) and NPE in handling KillTaskAction of a cleanup task (HADOOP-5235).
- (Amareshwari Sriramadasu via ddas)
+ (HADOOP-5234) and NPE in handling KillTaskAction of a cleanup task
+ (HADOOP-5235). (Amareshwari Sriramadasu via ddas)
HADOOP-5247. Introduces a broadcast of KillJobAction to all trackers when
- a job finishes. This fixes a bunch of problems to do with NPE when a completed
- job is not in memory and a tasktracker comes to the jobtracker with a status
- report of a task belonging to that job. (Amar Kamat via ddas)
-
- HADOOP-5282. Fixed job history logs for task attempts that are failed by the
- JobTracker, say due to lost task trackers. (Amar Kamat via yhemanth)
+ a job finishes. This fixes a bunch of problems to do with NPE when a
+ completed job is not in memory and a tasktracker comes to the jobtracker
+ with a status report of a task belonging to that job. (Amar Kamat via ddas)
+
+ HADOOP-5282. Fixed job history logs for task attempts that are
+ failed by the JobTracker, say due to lost task trackers. (Amar
+ Kamat via yhemanth)
HADOOP-4963. Fixes a logging to do with getting the location of
map output file. (Amareshwari Sriramadasu via ddas)
HADOOP-5292. Fix NPE in KFS::getBlockLocations. (Sriram Rao via lohit)
- HADOOP-5241. Fixes a bug in disk-space resource estimation. Makes the estimation
- formula linear where blowUp = Total-Output/Total-Input. (Sharad Agarwal via ddas)
+ HADOOP-5241. Fixes a bug in disk-space resource estimation. Makes
+ the estimation formula linear where blowUp =
+ Total-Output/Total-Input. (Sharad Agarwal via ddas)
HADOOP-5142. Fix MapWritable#putAll to store key/value classes.
(DoÄacan Güney via enis)
- HADOOP-4744. Workaround for jetty6 returning -1 when getLocalPort is invoked on
- the connector. The workaround patch retries a few times before failing.
- (Jothi Padmanabhan via yhemanth)
-
- HADOOP-5280. Adds a check to prevent a task state transition from FAILED to any of
- UNASSIGNED, RUNNING, COMMIT_PENDING or SUCCEEDED. (ddas)
-
- HADOOP-5272. Fixes a problem to do with detecting whether an attempt is the first
- attempt of a Task. This affects JobTracker restart. (Amar Kamat via ddas)
+ HADOOP-4744. Workaround for jetty6 returning -1 when getLocalPort
+ is invoked on the connector. The workaround patch retries a few
+ times before failing. (Jothi Padmanabhan via yhemanth)
+
+ HADOOP-5280. Adds a check to prevent a task state transition from
+ FAILED to any of UNASSIGNED, RUNNING, COMMIT_PENDING or
+ SUCCEEDED. (ddas)
+
+ HADOOP-5272. Fixes a problem to do with detecting whether an
+ attempt is the first attempt of a Task. This affects JobTracker
+ restart. (Amar Kamat via ddas)
HADOOP-5306. Fixes a problem to do with logging/parsing the http port of a
lost tracker. Affects JobTracker restart. (Amar Kamat via ddas)
@@ -690,12 +695,13 @@
HADOOP-5274. Fix gridmix2 dependency on wordcount example. (cdouglas)
- HADOOP-5145. Balancer sometimes runs out of memory after running days or weeks.
- (hairong)
+ HADOOP-5145. Balancer sometimes runs out of memory after running
+ days or weeks. (hairong)
- HADOOP-5338. Fix jobtracker restart to clear task completion events cached by
- tasktrackers forcing them to fetch all events afresh, thus avoiding missed
- task completion events on the tasktrackers. (Amar Kamat via yhemanth)
+ HADOOP-5338. Fix jobtracker restart to clear task completion
+ events cached by tasktrackers forcing them to fetch all events
+ afresh, thus avoiding missed task completion events on the
+ tasktrackers. (Amar Kamat via yhemanth)
HADOOP-4695. Change TestGlobalFilter so that it allows a web page to be
filtered more than once for a single access. (Kan Zhang via szetszwo)
@@ -716,28 +722,32 @@
HADOOP-5395. Change the exception message when a job is submitted to an
invalid queue. (Rahul Kumar Singh via yhemanth)
- HADOOP-5276. Fixes a problem to do with updating the start time of a task when
- the tracker that ran the task is lost. (Amar Kamat via ddas)
+ HADOOP-5276. Fixes a problem to do with updating the start time of
+ a task when the tracker that ran the task is lost. (Amar Kamat via
+ ddas)
- HADOOP-5278. Fixes a problem to do with logging the finish time of a task
- during recovery (after a JobTracker restart). (Amar Kamat via ddas)
+ HADOOP-5278. Fixes a problem to do with logging the finish time of
+ a task during recovery (after a JobTracker restart). (Amar Kamat
+ via ddas)
- HADOOP-5490. Fixes a synchronization problem in the EagerTaskInitializationListener
- class. (Jothi Padmanabhan via ddas)
+ HADOOP-5490. Fixes a synchronization problem in the
+ EagerTaskInitializationListener class. (Jothi Padmanabhan via
+ ddas)
- HADOOP-5493. The shuffle copier threads return the codecs back to the pool when the
- shuffle completes. (Jothi Padmanabhan via ddas)
+ HADOOP-5493. The shuffle copier threads return the codecs back to
+ the pool when the shuffle completes. (Jothi Padmanabhan via ddas)
HADOOP-5505. Fix JspHelper initialization in the context of
MiniDFSCluster. (Raghu Angadi)
- HADOOP-5414. Fixes IO exception while executing hadoop fs -touchz fileName by
- making sure that lease renewal thread exits before dfs client exits.
- (hairong)
-
- HADOOP-5103. FileInputFormat now reuses the clusterMap network topology object
- and that brings down the log messages in the JobClient to do with
- NetworkTopology.add significantly. (Jothi Padmanabhan via ddas)
+ HADOOP-5414. Fixes IO exception while executing hadoop fs -touchz
+ fileName by making sure that lease renewal thread exits before dfs
+ client exits. (hairong)
+
+ HADOOP-5103. FileInputFormat now reuses the clusterMap network
+ topology object and that brings down the log messages in the
+ JobClient to do with NetworkTopology.add significantly. (Jothi
+ Padmanabhan via ddas)
HADOOP-5483. Fixes a problem in the Directory Cleanup Thread due to which
TestMiniMRWithDFS sometimes used to fail. (ddas)
@@ -751,9 +761,11 @@
HADOOP-5514. Fix JobTracker metrics and add metrics for wating, failed
tasks. (cdouglas)
- HADOOP-5516. Fix NullPointerException in TaskMemoryManagerThread that comes when
- monitored processes disappear when the thread is running.
- (Vinod Kumar Vavilapalli via yhemanth)
+ HADOOP-5516. Fix NullPointerException in TaskMemoryManagerThread
+ that comes when monitored processes disappear when the thread is
+ running. (Vinod Kumar Vavilapalli via yhemanth)
+
+ HADOOP-5382. Support combiners in the new context object API. (omalley)
Release 0.19.2 - Unreleased
Propchange: hadoop/core/branches/branch-0.20/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 19 00:00:58 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
/hadoop/core/branches/branch-0.19/CHANGES.txt:713112
-/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426
+/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=755792&r1=755791&r2=755792&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java Thu Mar 19 00:00:58 2009
@@ -312,7 +312,9 @@
void runOldMapper(final JobConf job,
final BytesWritable rawSplit,
final TaskUmbilicalProtocol umbilical,
- TaskReporter reporter) throws IOException {
+ TaskReporter reporter
+ ) throws IOException, InterruptedException,
+ ClassNotFoundException {
InputSplit inputSplit = null;
// reinstantiate the split
try {
@@ -429,7 +431,7 @@
NewOutputCollector(JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
- ) throws IOException {
+ ) throws IOException, ClassNotFoundException {
collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
}
@@ -439,8 +441,13 @@
}
@Override
- public void close(TaskAttemptContext context) throws IOException {
- collector.flush();
+ public void close(TaskAttemptContext context
+ ) throws IOException,InterruptedException {
+ try {
+ collector.flush();
+ } catch (ClassNotFoundException cnf) {
+ throw new IOException("can't find class ", cnf);
+ }
collector.close();
}
}
@@ -525,9 +532,10 @@
interface MapOutputCollector<K, V>
extends OutputCollector<K, V> {
- public void close() throws IOException;
+ public void close() throws IOException, InterruptedException;
- public void flush() throws IOException;
+ public void flush() throws IOException, InterruptedException,
+ ClassNotFoundException;
}
@@ -559,7 +567,8 @@
}
- public void flush() throws IOException {
+ public void flush() throws IOException, InterruptedException,
+ ClassNotFoundException {
}
public void collect(K key, V value) throws IOException {
@@ -582,7 +591,7 @@
private final SerializationFactory serializationFactory;
private final Serializer<K> keySerializer;
private final Serializer<V> valSerializer;
- private final Class<? extends Reducer> combinerClass;
+ private final CombinerRunner<K,V> combinerRunner;
private final CombineOutputCollector<K, V> combineCollector;
// Compression for map-outputs
@@ -627,7 +636,6 @@
private final Counters.Counter mapOutputByteCounter;
private final Counters.Counter mapOutputRecordCounter;
- private final Counters.Counter combineInputCounter;
private final Counters.Counter combineOutputCounter;
private ArrayList<SpillRecord> indexCacheList;
@@ -636,7 +644,8 @@
@SuppressWarnings("unchecked")
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
- TaskReporter reporter) throws IOException {
+ TaskReporter reporter
+ ) throws IOException, ClassNotFoundException {
this.job = job;
this.reporter = reporter;
localFs = FileSystem.getLocal(job);
@@ -688,7 +697,8 @@
// counters
mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES);
mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
- combineInputCounter = reporter.getCounter(COMBINE_INPUT_RECORDS);
+ Counters.Counter combineInputCounter =
+ reporter.getCounter(COMBINE_INPUT_RECORDS);
combineOutputCounter = reporter.getCounter(COMBINE_OUTPUT_RECORDS);
// compression
if (job.getCompressMapOutput()) {
@@ -697,10 +707,14 @@
codec = ReflectionUtils.newInstance(codecClass, job);
}
// combiner
- combinerClass = job.getCombinerClass();
- combineCollector = (null != combinerClass)
- ? new CombineOutputCollector<K,V>(combineOutputCounter)
- : null;
+ combinerRunner = CombinerRunner.create(job, getTaskID(),
+ combineInputCounter,
+ reporter, null);
+ if (combinerRunner != null) {
+ combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter);
+ } else {
+ combineCollector = null;
+ }
minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
spillThread.setDaemon(true);
spillThread.setName("SpillThread");
@@ -995,7 +1009,8 @@
}
}
- public synchronized void flush() throws IOException {
+ public synchronized void flush() throws IOException, ClassNotFoundException,
+ InterruptedException {
LOG.info("Starting flush of map output");
spillLock.lock();
try {
@@ -1085,7 +1100,8 @@
spillReady.signal();
}
- private void sortAndSpill() throws IOException {
+ private void sortAndSpill() throws IOException, ClassNotFoundException,
+ InterruptedException {
//approximate the length of the output file to be the length of the
//buffer + header lengths for the partitions
long size = (bufend >= bufstart
@@ -1113,7 +1129,7 @@
long segmentStart = out.getPos();
writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
spilledRecordsCounter);
- if (null == combinerClass) {
+ if (combinerRunner == null) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
while (spindex < endPosition &&
@@ -1140,7 +1156,7 @@
combineCollector.setWriter(writer);
RawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
- combineAndSpill(kvIter, combineInputCounter);
+ combinerRunner.combine(kvIter, combineCollector);
}
}
@@ -1257,25 +1273,6 @@
vbytes.reset(kvbuffer, kvindices[kvoff + VALSTART], vallen);
}
- @SuppressWarnings("unchecked")
- private void combineAndSpill(RawKeyValueIterator kvIter,
- Counters.Counter inCounter) throws IOException {
- Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
- try {
- CombineValuesIterator<K, V> values = new CombineValuesIterator<K, V>(
- kvIter, comparator, keyClass, valClass, job, reporter,
- inCounter);
- while (values.more()) {
- combiner.reduce(values.getKey(), values, combineCollector, reporter);
- values.nextKey();
- // indicate we're making progress
- reporter.progress();
- }
- } finally {
- combiner.close();
- }
- }
-
/**
* Inner class wrapping valuebytes, used for appendRaw.
*/
@@ -1329,7 +1326,8 @@
public void close() { }
}
- private void mergeParts() throws IOException {
+ private void mergeParts() throws IOException, InterruptedException,
+ ClassNotFoundException {
// get the approximate size of the final output/index files
long finalOutFileSize = 0;
long finalIndexFileSize = 0;
@@ -1428,11 +1426,11 @@
Writer<K, V> writer =
new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
spilledRecordsCounter);
- if (null == combinerClass || numSpills < minSpillsForCombine) {
+ if (combinerRunner == null || numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter, job);
} else {
combineCollector.setWriter(writer);
- combineAndSpill(kvIter, combineInputCounter);
+ combinerRunner.combine(kvIter, combineCollector);
}
//close
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=755792&r1=755791&r2=755792&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Thu Mar 19 00:00:58 2009
@@ -119,8 +119,6 @@
getCounters().findCounter(Counter.REDUCE_INPUT_RECORDS);
private Counters.Counter reduceOutputCounter =
getCounters().findCounter(Counter.REDUCE_OUTPUT_RECORDS);
- private Counters.Counter reduceCombineInputCounter =
- getCounters().findCounter(Counter.COMBINE_INPUT_RECORDS);
private Counters.Counter reduceCombineOutputCounter =
getCounters().findCounter(Counter.COMBINE_OUTPUT_RECORDS);
@@ -518,7 +516,7 @@
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
- final Reporter reporter,
+ final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass,
@@ -536,39 +534,14 @@
(org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
outputFormat.getRecordWriter(taskContext);
job.setBoolean("mapred.skip.on", isSkipping());
- org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
- reducerContext = null;
- try {
- Constructor<org.apache.hadoop.mapreduce.Reducer.Context> contextConstructor =
- org.apache.hadoop.mapreduce.Reducer.Context.class.getConstructor
- (new Class[]{org.apache.hadoop.mapreduce.Reducer.class,
- Configuration.class,
- org.apache.hadoop.mapreduce.TaskAttemptID.class,
- RawKeyValueIterator.class,
- org.apache.hadoop.mapreduce.RecordWriter.class,
- org.apache.hadoop.mapreduce.OutputCommitter.class,
- org.apache.hadoop.mapreduce.StatusReporter.class,
- RawComparator.class,
- Class.class,
- Class.class});
-
- reducerContext = contextConstructor.newInstance(reducer, job,
- getTaskID(),
- rIter, output, committer,
- reporter, comparator,
- keyClass, valueClass);
-
- reducer.run(reducerContext);
- output.close(reducerContext);
- } catch (NoSuchMethodException e) {
- throw new IOException("Can't find Context constructor", e);
- } catch (InstantiationException e) {
- throw new IOException("Can't create Context", e);
- } catch (InvocationTargetException e) {
- throw new IOException("Can't invoke Context constructor", e);
- } catch (IllegalAccessException e) {
- throw new IOException("Can't invoke Context constructor", e);
- }
+ org.apache.hadoop.mapreduce.Reducer.Context
+ reducerContext = createReduceContext(reducer, job, getTaskID(),
+ rIter, reduceInputValueCounter,
+ output, committer,
+ reporter, comparator, keyClass,
+ valueClass);
+ reducer.run(reducerContext);
+ output.close(reducerContext);
}
class ReduceCopier<K, V> implements MRConstants {
@@ -722,14 +695,14 @@
private volatile int maxFetchRetriesPerMap;
/**
- * Combiner class to run during in-memory merge, if defined.
+ * Combiner runner, if a combiner is needed
*/
- private final Class<? extends Reducer> combinerClass;
+ private CombinerRunner combinerRunner;
/**
* Resettable collector used for combine.
*/
- private final CombineOutputCollector combineCollector;
+ private CombineOutputCollector combineCollector = null;
/**
* Maximum percent of failed fetch attempt before killing the reduce task.
@@ -1680,7 +1653,8 @@
}
public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf,
- TaskReporter reporter)throws IOException {
+ TaskReporter reporter
+ )throws ClassNotFoundException, IOException {
configureClasspath(conf);
this.reporter = reporter;
@@ -1693,10 +1667,15 @@
this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
this.maxInFlight = 4 * numCopiers;
this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
- this.combinerClass = conf.getCombinerClass();
- combineCollector = (null != combinerClass)
- ? new CombineOutputCollector(reduceCombineOutputCounter)
- : null;
+ Counters.Counter combineInputCounter =
+ reporter.getCounter(Task.Counter.COMBINE_INPUT_RECORDS);
+ this.combinerRunner = CombinerRunner.create(conf, getTaskID(),
+ combineInputCounter,
+ reporter, null);
+ if (combinerRunner != null) {
+ combineCollector =
+ new CombineOutputCollector(reduceCombineOutputCounter);
+ }
this.ioSortFactor = conf.getInt("io.sort.factor", 10);
// the exponential backoff formula
@@ -2507,11 +2486,11 @@
conf.getOutputKeyComparator(), reporter,
spilledRecordsCounter, null);
- if (null == combinerClass) {
+ if (combinerRunner == null) {
Merger.writeFile(rIter, writer, reporter, conf);
} else {
combineCollector.setWriter(writer);
- combineAndSpill(rIter, reduceCombineInputCounter);
+ combinerRunner.combine(rIter, combineCollector);
}
writer.close();
@@ -2536,29 +2515,6 @@
}
}
- @SuppressWarnings("unchecked")
- private void combineAndSpill(
- RawKeyValueIterator kvIter,
- Counters.Counter inCounter) throws IOException {
- JobConf job = (JobConf)getConf();
- Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
- Class keyClass = job.getMapOutputKeyClass();
- Class valClass = job.getMapOutputValueClass();
- RawComparator comparator = job.getOutputKeyComparator();
- try {
- CombineValuesIterator values = new CombineValuesIterator(
- kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
- inCounter);
- while (values.more()) {
- combiner.reduce(values.getKey(), values, combineCollector,
- Reporter.NULL);
- values.nextKey();
- }
- } finally {
- combiner.close();
- }
- }
-
private class GetMapEventsThread extends Thread {
private IntWritable fromEventId = new IntWritable(0);
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java?rev=755792&r1=755791&r2=755792&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java Thu Mar 19 00:00:58 2009
@@ -21,6 +21,8 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Iterator;
@@ -41,6 +43,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progress;
@@ -986,4 +989,208 @@
}
}
+ private static final Constructor<org.apache.hadoop.mapreduce.Reducer.Context>
+ contextConstructor;
+ static {
+ try {
+ contextConstructor =
+ org.apache.hadoop.mapreduce.Reducer.Context.class.getConstructor
+ (new Class[]{org.apache.hadoop.mapreduce.Reducer.class,
+ Configuration.class,
+ org.apache.hadoop.mapreduce.TaskAttemptID.class,
+ RawKeyValueIterator.class,
+ org.apache.hadoop.mapreduce.Counter.class,
+ org.apache.hadoop.mapreduce.RecordWriter.class,
+ org.apache.hadoop.mapreduce.OutputCommitter.class,
+ org.apache.hadoop.mapreduce.StatusReporter.class,
+ RawComparator.class,
+ Class.class,
+ Class.class});
+ } catch (NoSuchMethodException nme) {
+ throw new IllegalArgumentException("Can't find constructor");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected static <INKEY,INVALUE,OUTKEY,OUTVALUE>
+ org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
+ createReduceContext(org.apache.hadoop.mapreduce.Reducer
+ <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
+ Configuration job,
+ org.apache.hadoop.mapreduce.TaskAttemptID taskId,
+ RawKeyValueIterator rIter,
+ org.apache.hadoop.mapreduce.Counter inputCounter,
+ org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,
+ org.apache.hadoop.mapreduce.OutputCommitter committer,
+ org.apache.hadoop.mapreduce.StatusReporter reporter,
+ RawComparator<INKEY> comparator,
+ Class<INKEY> keyClass, Class<INVALUE> valueClass
+ ) throws IOException, ClassNotFoundException {
+ try {
+
+ return contextConstructor.newInstance(reducer, job, taskId,
+ rIter, inputCounter, output,
+ committer, reporter, comparator,
+ keyClass, valueClass);
+ } catch (InstantiationException e) {
+ throw new IOException("Can't create Context", e);
+ } catch (InvocationTargetException e) {
+ throw new IOException("Can't invoke Context constructor", e);
+ } catch (IllegalAccessException e) {
+ throw new IOException("Can't invoke Context constructor", e);
+ }
+ }
+
+ protected static abstract class CombinerRunner<K,V> {
+ protected final Counters.Counter inputCounter;
+ protected final JobConf job;
+ protected final TaskReporter reporter;
+
+ CombinerRunner(Counters.Counter inputCounter,
+ JobConf job,
+ TaskReporter reporter) {
+ this.inputCounter = inputCounter;
+ this.job = job;
+ this.reporter = reporter;
+ }
+
+ /**
+ * Run the combiner over a set of inputs.
+ * @param iterator the key/value pairs to use as input
+ * @param collector the output collector
+ */
+ abstract void combine(RawKeyValueIterator iterator,
+ OutputCollector<K,V> collector
+ ) throws IOException, InterruptedException,
+ ClassNotFoundException;
+
+ static <K,V>
+ CombinerRunner<K,V> create(JobConf job,
+ TaskAttemptID taskId,
+ Counters.Counter inputCounter,
+ TaskReporter reporter,
+ org.apache.hadoop.mapreduce.OutputCommitter committer
+ ) throws ClassNotFoundException {
+ Class<? extends Reducer<K,V,K,V>> cls =
+ (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
+ if (cls != null) {
+ return new OldCombinerRunner(cls, job, inputCounter, reporter);
+ }
+ // make a task context so we can get the classes
+ org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
+ new org.apache.hadoop.mapreduce.TaskAttemptContext(job, taskId);
+ Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls =
+ (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
+ taskContext.getCombinerClass();
+ if (newcls != null) {
+ return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext,
+ inputCounter, reporter, committer);
+ }
+
+ return null;
+ }
+ }
+
+ protected static class OldCombinerRunner<K,V> extends CombinerRunner<K,V> {
+ private final Class<? extends Reducer<K,V,K,V>> combinerClass;
+ private final Class<K> keyClass;
+ private final Class<V> valueClass;
+ private final RawComparator<K> comparator;
+
+ protected OldCombinerRunner(Class<? extends Reducer<K,V,K,V>> cls,
+ JobConf conf,
+ Counters.Counter inputCounter,
+ TaskReporter reporter) {
+ super(inputCounter, conf, reporter);
+ combinerClass = cls;
+ keyClass = (Class<K>) job.getMapOutputKeyClass();
+ valueClass = (Class<V>) job.getMapOutputValueClass();
+ comparator = (RawComparator<K>) job.getOutputKeyComparator();
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void combine(RawKeyValueIterator kvIter,
+ OutputCollector<K,V> combineCollector
+ ) throws IOException {
+ Reducer<K,V,K,V> combiner =
+ ReflectionUtils.newInstance(combinerClass, job);
+ try {
+ CombineValuesIterator<K,V> values =
+ new CombineValuesIterator<K,V>(kvIter, comparator, keyClass,
+ valueClass, job, Reporter.NULL,
+ inputCounter);
+ while (values.more()) {
+ combiner.reduce(values.getKey(), values, combineCollector,
+ Reporter.NULL);
+ values.nextKey();
+ }
+ } finally {
+ combiner.close();
+ }
+ }
+ }
+
+ protected static class NewCombinerRunner<K, V> extends CombinerRunner<K,V> {
+ private final Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>
+ reducerClass;
+ private final org.apache.hadoop.mapreduce.TaskAttemptID taskId;
+ private final RawComparator<K> comparator;
+ private final Class<K> keyClass;
+ private final Class<V> valueClass;
+ private final org.apache.hadoop.mapreduce.OutputCommitter committer;
+
+ NewCombinerRunner(Class reducerClass,
+ JobConf job,
+ org.apache.hadoop.mapreduce.TaskAttemptID taskId,
+ org.apache.hadoop.mapreduce.TaskAttemptContext context,
+ Counters.Counter inputCounter,
+ TaskReporter reporter,
+ org.apache.hadoop.mapreduce.OutputCommitter committer) {
+ super(inputCounter, job, reporter);
+ this.reducerClass = reducerClass;
+ this.taskId = taskId;
+ keyClass = (Class<K>) context.getMapOutputKeyClass();
+ valueClass = (Class<V>) context.getMapOutputValueClass();
+ comparator = (RawComparator<K>) context.getSortComparator();
+ this.committer = committer;
+ }
+
+ private static class OutputConverter<K,V>
+ extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
+ OutputCollector<K,V> output;
+ OutputConverter(OutputCollector<K,V> output) {
+ this.output = output;
+ }
+
+ @Override
+ public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context){
+ }
+
+ @Override
+ public void write(K key, V value
+ ) throws IOException, InterruptedException {
+ output.collect(key,value);
+ }
+ }
+
+ @Override
+ void combine(RawKeyValueIterator iterator,
+ OutputCollector<K,V> collector
+ ) throws IOException, InterruptedException,
+ ClassNotFoundException {
+ // make a reducer
+ org.apache.hadoop.mapreduce.Reducer<K,V,K,V> reducer =
+ (org.apache.hadoop.mapreduce.Reducer<K,V,K,V>)
+ ReflectionUtils.newInstance(reducerClass, job);
+ org.apache.hadoop.mapreduce.Reducer.Context
+ reducerContext = createReduceContext(reducer, job, taskId,
+ iterator, inputCounter,
+ new OutputConverter(collector),
+ committer,
+ reporter, comparator, keyClass,
+ valueClass);
+ reducer.run(reducerContext);
+ }
+
+ }
}
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=755792&r1=755791&r2=755792&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java Thu Mar 19 00:00:58 2009
@@ -365,7 +365,7 @@
* @return the counters for this job.
* @throws IOException
*/
- public Iterable<CounterGroup> getCounters() throws IOException {
+ public Counters getCounters() throws IOException {
ensureState(JobState.RUNNING);
return new Counters(info.getCounters());
}
@@ -385,7 +385,6 @@
int numReduces = conf.getNumReduceTasks();
String oldMapperClass = "mapred.mapper.class";
String oldReduceClass = "mapred.reducer.class";
- String oldCombineClass = "mapred.combiner.class";
conf.setBooleanIfUnset("mapred.mapper.new-api",
conf.get(oldMapperClass) == null);
if (conf.getUseNewMapper()) {
@@ -393,7 +392,6 @@
ensureNotSet("mapred.input.format.class", mode);
ensureNotSet(oldMapperClass, mode);
if (numReduces != 0) {
- ensureNotSet(oldCombineClass, mode);
ensureNotSet("mapred.partitioner.class", mode);
} else {
ensureNotSet("mapred.output.format.class", mode);
@@ -403,7 +401,6 @@
ensureNotSet(JobContext.INPUT_FORMAT_CLASS_ATTR, mode);
ensureNotSet(JobContext.MAP_CLASS_ATTR, mode);
if (numReduces != 0) {
- ensureNotSet(JobContext.COMBINE_CLASS_ATTR, mode);
ensureNotSet(JobContext.PARTITIONER_CLASS_ATTR, mode);
} else {
ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
@@ -416,12 +413,10 @@
String mode = "new reduce API";
ensureNotSet("mapred.output.format.class", mode);
ensureNotSet(oldReduceClass, mode);
- ensureNotSet(oldCombineClass, mode);
} else {
String mode = "reduce compatability";
ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
ensureNotSet(JobContext.REDUCE_CLASS_ATTR, mode);
- ensureNotSet(JobContext.COMBINE_CLASS_ATTR, mode);
}
}
}
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java?rev=755792&r1=755791&r2=755792&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java Thu Mar 19 00:00:58 2009
@@ -41,6 +41,7 @@
public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
private RawKeyValueIterator input;
+ private Counter inputCounter;
private RawComparator<KEYIN> comparator;
private KEYIN key; // current key
private VALUEIN value; // current value
@@ -56,6 +57,7 @@
public ReduceContext(Configuration conf, TaskAttemptID taskid,
RawKeyValueIterator input,
+ Counter inputCounter,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
StatusReporter reporter,
@@ -65,6 +67,7 @@
) throws InterruptedException, IOException{
super(conf, taskid, output, committer, reporter);
this.input = input;
+ this.inputCounter = inputCounter;
this.comparator = comparator;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
@@ -106,6 +109,7 @@
buffer.reset(next.getData(), next.getPosition(), next.getLength());
value = valueDeserializer.deserialize(value);
hasMore = input.next();
+ inputCounter.increment(1);
if (hasMore) {
next = input.getKey();
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java?rev=755792&r1=755791&r2=755792&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java Thu Mar 19 00:00:58 2009
@@ -121,6 +121,7 @@
extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
public Context(Configuration conf, TaskAttemptID taskid,
RawKeyValueIterator input,
+ Counter inputCounter,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
StatusReporter reporter,
@@ -128,8 +129,8 @@
Class<KEYIN> keyClass,
Class<VALUEIN> valueClass
) throws IOException, InterruptedException {
- super(conf, taskid, input, output, committer, reporter, comparator,
- keyClass, valueClass);
+ super(conf, taskid, input, inputCounter, output, committer, reporter,
+ comparator, keyClass, valueClass);
}
}
Modified: hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=755792&r1=755791&r2=755792&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (original)
+++ hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Thu Mar 19 00:00:58 2009
@@ -96,9 +96,10 @@
) throws IOException,
InterruptedException,
ClassNotFoundException {
+ final String COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter";
localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
- writeFile("in/part1", "this is a test\nof word count\n");
+ writeFile("in/part1", "this is a test\nof word count test\ntest\n");
writeFile("in/part2", "more test");
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
@@ -112,8 +113,21 @@
assertTrue(job.waitForCompletion());
String out = readFile("out/part-r-00000");
System.out.println(out);
- assertEquals("a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t2\nthis\t1\nword\t1\n",
+ assertEquals("a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t4\nthis\t1\nword\t1\n",
out);
+ Counters ctrs = job.getCounters();
+ System.out.println("Counters: " + ctrs);
+ long combineIn = ctrs.findCounter(COUNTER_GROUP,
+ "COMBINE_INPUT_RECORDS").getValue();
+ long combineOut = ctrs.findCounter(COUNTER_GROUP,
+ "COMBINE_OUTPUT_RECORDS").getValue();
+ long reduceIn = ctrs.findCounter(COUNTER_GROUP,
+ "REDUCE_INPUT_RECORDS").getValue();
+ long mapOut = ctrs.findCounter(COUNTER_GROUP,
+ "MAP_OUTPUT_RECORDS").getValue();
+ assertEquals("map out = combine in", mapOut, combineIn);
+ assertEquals("combine out = reduce in", combineOut, reduceIn);
+ assertTrue("combine in > combine out", combineIn > combineOut);
}
private void runSecondarySort(Configuration conf) throws IOException,