You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/08/16 20:45:56 UTC

svn commit: r566798 [2/3] - in /lucene/hadoop/trunk: ./ src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/examples/org/apache/hadoop/examples/ src/examples/org/apache/hadoop/examp...

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java Thu Aug 16 11:45:49 2007
@@ -50,15 +50,21 @@
  * <b>mapred.map.multithreadedrunner.threads</b> property).
  * <p>
  */
-public class MultithreadedMapRunner implements MapRunnable {
+public class MultithreadedMapRunner<K1 extends WritableComparable,
+                                    V1 extends Writable,
+                                    K2 extends WritableComparable,
+                                    V2 extends Writable>
+    implements MapRunnable<K1, V1, K2, V2> {
+
   private static final Log LOG =
     LogFactory.getLog(MultithreadedMapRunner.class.getName());
 
   private JobConf job;
-  private Mapper mapper;
+  private Mapper<K1, V1, K2, V2> mapper;
   private ExecutorService executorService;
   private volatile IOException ioException;
 
+  @SuppressWarnings("unchecked")
   public void configure(JobConf job) {
     int numberOfThreads =
       job.getInt("mapred.map.multithreadedrunner.threads", 10);
@@ -76,14 +82,14 @@
     executorService = Executors.newFixedThreadPool(numberOfThreads);
   }
 
-  public void run(RecordReader input, OutputCollector output,
+  public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
                   Reporter reporter)
     throws IOException {
     try {
       // allocate key & value instances these objects will not be reused
       // because execution of Mapper.map is not serialized.
-      WritableComparable key = input.createKey();
-      Writable value = input.createValue();
+      K1 key = input.createKey();
+      V1 value = input.createValue();
 
       while (input.next(key, value)) {
 
@@ -166,9 +172,9 @@
    * Runnable to execute a single Mapper.map call from a forked thread.
    */
   private class MapperInvokeRunable implements Runnable {
-    private WritableComparable key;
-    private Writable value;
-    private OutputCollector output;
+    private K1 key;
+    private V1 value;
+    private OutputCollector<K2, V2> output;
     private Reporter reporter;
 
     /**
@@ -180,8 +186,9 @@
      * @param output
      * @param reporter
      */
-    public MapperInvokeRunable(WritableComparable key, Writable value,
-                               OutputCollector output, Reporter reporter) {
+    public MapperInvokeRunable(K1 key, V1 value,
+                               OutputCollector<K2, V2> output,
+                               Reporter reporter) {
       this.key = key;
       this.value = value;
       this.output = output;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java Thu Aug 16 11:45:49 2007
@@ -30,11 +30,14 @@
 /**
  * Consume all outputs and put them in /dev/null. 
  */
-public class NullOutputFormat implements OutputFormat {
-  public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, 
+public class NullOutputFormat<K extends WritableComparable,
+                              V extends Writable>
+  implements OutputFormat<K, V> {
+  
+  public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, 
                                       String name, Progressable progress) {
-    return new RecordWriter(){
-        public void write(WritableComparable key, Writable value) { }
+    return new RecordWriter<K, V>(){
+        public void write(K key, V value) { }
         public void close(Reporter reporter) { }
       };
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java Thu Aug 16 11:45:49 2007
@@ -19,25 +19,23 @@
 package org.apache.hadoop.mapred.lib;
 
 import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.MapReduceBase;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-
-import java.util.regex.Pattern;
-import java.util.regex.Matcher;
 
 
 /** A {@link Mapper} that extracts text matching a regular expression. */
-public class RegexMapper extends MapReduceBase implements Mapper {
+public class RegexMapper<K extends WritableComparable>
+    extends MapReduceBase
+    implements Mapper<K, Text, Text, LongWritable> {
 
   private Pattern pattern;
   private int group;
@@ -47,10 +45,11 @@
     group = job.getInt("mapred.mapper.regex.group", 0);
   }
 
-  public void map(WritableComparable key, Writable value,
-                  OutputCollector output, Reporter reporter)
+  public void map(K key, Text value,
+                  OutputCollector<Text, LongWritable> output,
+                  Reporter reporter)
     throws IOException {
-    String text = ((Text)value).toString();
+    String text = value.toString();
     Matcher matcher = pattern.matcher(text);
     while (matcher.find()) {
       output.collect(new Text(matcher.group(group)), new LongWritable(1));

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java Thu Aug 16 11:45:49 2007
@@ -21,26 +21,27 @@
 import java.io.IOException;
 import java.util.StringTokenizer;
 
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.MapReduceBase;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
 
 
 /** A {@link Mapper} that maps text values into <token,freq> pairs.  Uses
  * {@link StringTokenizer} to break text into tokens. */
-public class TokenCountMapper extends MapReduceBase implements Mapper {
+public class TokenCountMapper<K extends WritableComparable>
+    extends MapReduceBase
+    implements Mapper<K, Text, Text, LongWritable> {
 
-  public void map(WritableComparable key, Writable value,
-                  OutputCollector output, Reporter reporter)
+  public void map(K key, Text value,
+                  OutputCollector<Text, LongWritable> output,
+                  Reporter reporter)
     throws IOException {
     // get input text
-    String text = ((Text)value).toString();       // value is line of text
+    String text = value.toString();       // value is line of text
 
     // tokenize the value
     StringTokenizer st = new StringTokenizer(text);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UserDefinedValueAggregatorDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UserDefinedValueAggregatorDescriptor.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UserDefinedValueAggregatorDescriptor.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UserDefinedValueAggregatorDescriptor.java Thu Aug 16 11:45:49 2007
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.Map.Entry;
 
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 
 /**
@@ -87,8 +88,9 @@
    *         aggregation type which is used to guide the way to aggregate the
    *         value in the reduce/combiner phrase of an Aggregate based job.
    */
-  public ArrayList<Entry> generateKeyValPairs(Object key, Object val) {
-    ArrayList<Entry> retv = new ArrayList<Entry>();
+  public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key,
+                                                          Object val) {
+    ArrayList<Entry<Text, Text>> retv = new ArrayList<Entry<Text, Text>>();
     if (this.theAggregatorDescriptor != null) {
       retv = this.theAggregatorDescriptor.generateKeyValPairs(key, val);
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorBaseDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorBaseDescriptor.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorBaseDescriptor.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorBaseDescriptor.java Thu Aug 16 11:45:49 2007
@@ -49,25 +49,25 @@
   
   public String inputFile = null;
 
-  private static class MyEntry implements Entry {
-    Object key;
+  private static class MyEntry implements Entry<Text, Text> {
+    Text key;
 
-    Object val;
+    Text val;
 
-    public Object getKey() {
+    public Text getKey() {
       return key;
     }
 
-    public Object getValue() {
+    public Text getValue() {
       return val;
     }
 
-    public Object setValue(Object val) {
+    public Text setValue(Text val) {
       this.val = val;
       return val;
     }
 
-    public MyEntry(Object key, Object val) {
+    public MyEntry(Text key, Text val) {
       this.key = key;
       this.val = val;
     }
@@ -81,7 +81,7 @@
    * @return an Entry whose key is the aggregation id prefixed with 
    * the aggregation type.
    */
-  public static Entry generateEntry(String type, String id, Object val) {
+  public static Entry<Text, Text> generateEntry(String type, String id, Text val) {
     Text key = new Text(type + TYPE_SEPARATOR + id);
     return new MyEntry(key, val);
   }
@@ -129,11 +129,12 @@
    *         aggregation type which is used to guide the way to aggregate the
    *         value in the reduce/combiner phrase of an Aggregate based job.
    */
-  public ArrayList<Entry> generateKeyValPairs(Object key, Object val) {
-    ArrayList<Entry> retv = new ArrayList<Entry>();
+  public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key,
+                                                          Object val) {
+    ArrayList<Entry<Text, Text>> retv = new ArrayList<Entry<Text, Text>>();
     String countType = LONG_VALUE_SUM;
     String id = "record_count";
-    Entry e = generateEntry(countType, id, ONE);
+    Entry<Text, Text> e = generateEntry(countType, id, ONE);
     if (e != null) {
       retv.add(e);
     }
@@ -153,6 +154,7 @@
    */
   public void configure(JobConf job) {
     this.inputFile = job.get("map.input.file");
-    maxNumItems = job.getLong("aggregate.max.num.unique.values", Long.MAX_VALUE);
+    maxNumItems = job.getLong("aggregate.max.num.unique.values",
+                              Long.MAX_VALUE);
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorCombiner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorCombiner.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorCombiner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorCombiner.java Thu Aug 16 11:45:49 2007
@@ -31,7 +31,9 @@
 /**
  * This class implements the generic combiner of Aggregate.
  */
-public class ValueAggregatorCombiner extends ValueAggregatorJobBase {
+public class ValueAggregatorCombiner<K1 extends WritableComparable,
+                                     V1 extends Writable>
+  extends ValueAggregatorJobBase<K1, V1> {
 
   /**
    * Combiner does not need to configure.
@@ -46,8 +48,8 @@
    * @param values the values to combine
    * @param output to collect combined values
    */
-  public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output, Reporter reporter) throws IOException {
+  public void reduce(Text key, Iterator<Text> values,
+                     OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
     String keyStr = key.toString();
     int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
     String type = keyStr.substring(0, pos);
@@ -80,7 +82,7 @@
    * Do nothing. Should not be called. 
    *
    */
-  public void map(WritableComparable arg0, Writable arg1, OutputCollector arg2,
+  public void map(K1 arg0, V1 arg1, OutputCollector<Text, Text> arg2,
                   Reporter arg3) throws IOException {
     throw new IOException ("should not be called\n");
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorDescriptor.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorDescriptor.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorDescriptor.java Thu Aug 16 11:45:49 2007
@@ -54,7 +54,8 @@
    *         aggregation type which is used to guide the way to aggregate the
    *         value in the reduce/combiner phrase of an Aggregate based job.
    */
-  public ArrayList<Entry> generateKeyValPairs(Object key, Object val);
+  public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key,
+                                                          Object val);
 
   /**
    * Configure the object

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJobBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJobBase.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJobBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJobBase.java Thu Aug 16 11:45:49 2007
@@ -21,6 +21,9 @@
 import java.io.IOException;
 import java.util.ArrayList;
 
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reducer;
@@ -29,7 +32,9 @@
  * This abstract class implements some common functionalities of the
  * the generic mapper, reducer and combiner classes of Aggregate.
  */
-public abstract class ValueAggregatorJobBase implements Mapper, Reducer {
+public abstract class ValueAggregatorJobBase<K1 extends WritableComparable,
+                                             V1 extends Writable>
+  implements Mapper<K1, V1, Text, Text>, Reducer<Text, Text, Text, Text> {
 
   protected ArrayList<ValueAggregatorDescriptor> aggregatorDescriptorList = null;
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorMapper.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorMapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorMapper.java Thu Aug 16 11:45:49 2007
@@ -22,6 +22,7 @@
 import java.util.Iterator;
 import java.util.Map.Entry;
 
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -30,23 +31,25 @@
 /**
  * This class implements the generic mapper of Aggregate.
  */
-public class ValueAggregatorMapper extends ValueAggregatorJobBase {
+public class ValueAggregatorMapper<K1 extends WritableComparable,
+                                   V1 extends Writable>
+  extends ValueAggregatorJobBase<K1, V1> {
 
   /**
    *  the map function. It iterates through the value aggregator descriptor 
    *  list to generate aggregation id/value pairs and emit them.
    */
-  public void map(WritableComparable key, Writable value,
-                  OutputCollector output, Reporter reporter) throws IOException {
+  public void map(K1 key, V1 value,
+                  OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
 
     Iterator iter = this.aggregatorDescriptorList.iterator();
     while (iter.hasNext()) {
       ValueAggregatorDescriptor ad = (ValueAggregatorDescriptor) iter.next();
-      Iterator<Entry> ens = ad.generateKeyValPairs(key, value).iterator();
+      Iterator<Entry<Text, Text>> ens =
+        ad.generateKeyValPairs(key, value).iterator();
       while (ens.hasNext()) {
-        Entry en = ens.next();
-        output.collect((WritableComparable) en.getKey(), (Writable) en
-                       .getValue());
+        Entry<Text, Text> en = ens.next();
+        output.collect(en.getKey(), en.getValue());
       }
     }
   }
@@ -54,8 +57,9 @@
   /**
    * Do nothing. Should not be called.
    */
-  public void reduce(WritableComparable arg0, Iterator arg1,
-                     OutputCollector arg2, Reporter arg3) throws IOException {
-    throw new IOException ("should not be called\n");
+  public void reduce(Text arg0, Iterator<Text> arg1,
+                     OutputCollector<Text, Text> arg2,
+                     Reporter arg3) throws IOException {
+    throw new IOException("should not be called\n");
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorReducer.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorReducer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorReducer.java Thu Aug 16 11:45:49 2007
@@ -32,7 +32,9 @@
  * 
  * 
  */
-public class ValueAggregatorReducer extends ValueAggregatorJobBase {
+public class ValueAggregatorReducer<K1 extends WritableComparable,
+                                    V1 extends Writable>
+  extends ValueAggregatorJobBase<K1, V1> {
 
   /**
    * @param key
@@ -43,8 +45,8 @@
    *          may be further customiized.
    * @value the values to be aggregated
    */
-  public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output, Reporter reporter) throws IOException {
+  public void reduce(Text key, Iterator<Text> values,
+                     OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
     String keyStr = key.toString();
     int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
     String type = keyStr.substring(0, pos);
@@ -65,7 +67,7 @@
   /**
    * Do nothing. Should not be called
    */
-  public void map(WritableComparable arg0, Writable arg1, OutputCollector arg2,
+  public void map(K1 arg0, V1 arg1, OutputCollector<Text, Text> arg2,
                   Reporter arg3) throws IOException {
     throw new IOException ("should not be called\n");
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java Thu Aug 16 11:45:49 2007
@@ -44,13 +44,14 @@
  * This class is responsible for launching and communicating with the child 
  * process.
  */
-class Application {
+class Application<K1 extends WritableComparable, V1 extends Writable,
+                  K2 extends WritableComparable, V2 extends Writable> {
   private static final Log LOG = LogFactory.getLog(Application.class.getName());
   private ServerSocket serverSocket;
   private Process process;
   private Socket clientSocket;
-  private OutputHandler handler;
-  private BinaryProtocol downlink;
+  private OutputHandler<K2, V2> handler;
+  private BinaryProtocol<K1, V1, K2, V2> downlink;
 
   /**
    * Start the child process to handle the task for us.
@@ -62,7 +63,8 @@
    * @throws IOException
    * @throws InterruptedException
    */
-  Application(JobConf conf, OutputCollector output, Reporter reporter,
+  @SuppressWarnings("unchecked")
+  Application(JobConf conf, OutputCollector<K2, V2> output, Reporter reporter,
               Class outputKeyClass, Class outputValueClass
               ) throws IOException, InterruptedException {
     serverSocket = new ServerSocket(0);
@@ -81,12 +83,12 @@
     cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength);
     process = runClient(cmd, env);
     clientSocket = serverSocket.accept();
-    handler = new OutputHandler(output, reporter);
-    WritableComparable outputKey = (WritableComparable)
+    handler = new OutputHandler<K2, V2>(output, reporter);
+    K2 outputKey = (K2)
       ReflectionUtils.newInstance(outputKeyClass, conf);
-    Writable outputValue = (Writable) 
+    V2 outputValue = (V2) 
       ReflectionUtils.newInstance(outputValueClass, conf);
-    downlink = new BinaryProtocol(clientSocket, handler, 
+    downlink = new BinaryProtocol<K1, V1, K2, V2>(clientSocket, handler, 
                                   outputKey, outputValue, conf);
     downlink.start();
     downlink.setJobConf(conf);
@@ -97,7 +99,7 @@
    * application.
    * @return the downlink proxy
    */
-  DownwardProtocol getDownlink() {
+  DownwardProtocol<K1, V1> getDownlink() {
     return downlink;
   }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java Thu Aug 16 11:45:49 2007
@@ -39,7 +39,10 @@
 /**
  * This protocol is a binary implementation of the Pipes protocol.
  */
-class BinaryProtocol implements DownwardProtocol {
+class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable,
+                     K2 extends WritableComparable, V2 extends Writable>
+  implements DownwardProtocol<K1, V1> {
+  
   public static final int CURRENT_PROTOCOL_VERSION = 0;
   private DataOutputStream stream;
   private DataOutputBuffer buffer = new DataOutputBuffer();
@@ -72,15 +75,18 @@
     }
   }
 
-  private static class UplinkReaderThread extends Thread {
+  private static class UplinkReaderThread<K2 extends WritableComparable,
+                                          V2 extends Writable>  
+    extends Thread {
+    
     private DataInputStream inStream;
-    private UpwardProtocol handler;
-    private WritableComparable key;
-    private Writable value;
+    private UpwardProtocol<K2, V2> handler;
+    private K2 key;
+    private V2 value;
     
-    public UplinkReaderThread(InputStream stream, UpwardProtocol handler, 
-                              WritableComparable key, Writable value
-                              ) throws IOException{
+    public UplinkReaderThread(InputStream stream,
+                              UpwardProtocol<K2, V2> handler, 
+                              K2 key, V2 value) throws IOException{
       inStream = new DataInputStream(stream);
       this.handler = handler;
       this.key = key;
@@ -192,9 +198,9 @@
    * @throws IOException
    */
   public BinaryProtocol(Socket sock, 
-                        UpwardProtocol handler,
-                        WritableComparable key,
-                        Writable value,
+                        UpwardProtocol<K2, V2> handler,
+                        K2 key,
+                        V2 value,
                         JobConf config) throws IOException {
     OutputStream raw = sock.getOutputStream();
     // If we are debugging, save a copy of the downlink commands to a file
@@ -202,7 +208,8 @@
       raw = new TeeOutputStream("downlink.data", raw);
     }
     stream = new DataOutputStream(raw);
-    uplink = new UplinkReaderThread(sock.getInputStream(), handler, key, value);
+    uplink = new UplinkReaderThread<K2, V2>(sock.getInputStream(),
+                                            handler, key, value);
     uplink.setName("pipe-uplink-handler");
     uplink.start();
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java Thu Aug 16 11:45:49 2007
@@ -30,7 +30,7 @@
  * All of these calls are asynchronous and return before the message has been 
  * processed.
  */
-interface DownwardProtocol {
+interface DownwardProtocol<K extends WritableComparable, V extends Writable> {
   /**
    * Start communication
    * @throws IOException
@@ -68,7 +68,7 @@
    * @param value The record's value
    * @throws IOException
    */
-  void mapItem(WritableComparable key, Writable value) throws IOException;
+  void mapItem(K key, V value) throws IOException;
   
   /**
    * Run a reduce task in the child
@@ -83,14 +83,14 @@
    * @param key the new key
    * @throws IOException
    */
-  void reduceKey(WritableComparable key) throws IOException;
+  void reduceKey(K key) throws IOException;
   
   /**
    * The reduce should be given a new value
    * @param value the new value
    * @throws IOException
    */
-  void reduceValue(Writable value) throws IOException;
+  void reduceValue(V value) throws IOException;
   
   /**
    * The task has no more input coming, but it should finish processing it's 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java Thu Aug 16 11:45:49 2007
@@ -28,9 +28,12 @@
 /**
  * Handles the upward (C++ to Java) messages from the application.
  */
-class OutputHandler implements UpwardProtocol {
+class OutputHandler<K extends WritableComparable,
+                    V extends Writable>
+  implements UpwardProtocol<K, V> {
+  
   private Reporter reporter;
-  private OutputCollector collector;
+  private OutputCollector<K, V> collector;
   private float progressValue = 0.0f;
   private boolean done = false;
   private Throwable exception = null;
@@ -40,7 +43,7 @@
    * @param collector the "real" collector that takes the output
    * @param reporter the reporter for reporting progress
    */
-  public OutputHandler(OutputCollector collector, Reporter reporter) {
+  public OutputHandler(OutputCollector<K, V> collector, Reporter reporter) {
     this.reporter = reporter;
     this.collector = collector;
   }
@@ -48,16 +51,15 @@
   /**
    * The task output a normal record.
    */
-  public void output(WritableComparable key, 
-                     Writable value) throws IOException {
+  public void output(K key, V value) throws IOException {
     collector.collect(key, value);
   }
 
   /**
    * The task output a record with a partition number attached.
    */
-  public void partitionedOutput(int reduce, WritableComparable key, 
-                                Writable value) throws IOException {
+  public void partitionedOutput(int reduce, K key, 
+                                V value) throws IOException {
     PipesPartitioner.setNextPartition(reduce);
     collector.collect(key, value);
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java Thu Aug 16 11:45:49 2007
@@ -31,7 +31,9 @@
 /**
  * An adaptor to run a C++ mapper.
  */
-class PipesMapRunner extends MapRunner {
+class PipesMapRunner<K1 extends WritableComparable, V1 extends Writable,
+    K2 extends WritableComparable, V2 extends Writable>
+    extends MapRunner<K1, V1, K2, V2> {
   private JobConf job;
 
   /**
@@ -48,26 +50,25 @@
    * @param output the object to collect the outputs of the map
    * @param reporter the object to update with status
    */
-  public void run(RecordReader input, OutputCollector output,
-                  Reporter reporter
-                  ) throws IOException {
-    Application application = null;
+  public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
+                  Reporter reporter) throws IOException {
+    Application<K1, V1, K2, V2> application = null;
     try {
-      application = new Application(job, output, reporter,
+      application = new Application<K1, V1, K2, V2>(job, output, reporter,
                                     job.getMapOutputKeyClass(),
                                     job.getMapOutputValueClass());
     } catch (InterruptedException ie) {
       throw new RuntimeException("interrupted", ie);
     }
-    DownwardProtocol downlink = application.getDownlink();
+    DownwardProtocol<K1, V1> downlink = application.getDownlink();
     boolean isJavaInput = Submitter.getIsJavaRecordReader(job);
     downlink.runMap(reporter.getInputSplit(), 
                     job.getNumReduceTasks(), isJavaInput);
     try {
       if (isJavaInput) {
         // allocate key & value instances that are re-used for all entries
-        WritableComparable key = input.createKey();
-        Writable value = input.createValue();
+        K1 key = input.createKey();
+        V1 value = input.createValue();
         downlink.setInputTypes(key.getClass().getName(),
                                value.getClass().getName());
         

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesPartitioner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesPartitioner.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesPartitioner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesPartitioner.java Thu Aug 16 11:45:49 2007
@@ -28,10 +28,14 @@
  * This partitioner is one that can either be set manually per a record or it
  * can fall back onto a Java partitioner that was set by the user.
  */
-class PipesPartitioner implements Partitioner {
+class PipesPartitioner<K extends WritableComparable,
+                       V extends Writable>
+  implements Partitioner<K, V> {
+  
   private static ThreadLocal<Integer> cache = new ThreadLocal<Integer>();
-  private Partitioner part = null;
+  private Partitioner<K, V> part = null;
   
+  @SuppressWarnings("unchecked")
   public void configure(JobConf conf) {
     part = (Partitioner) 
       ReflectionUtils.newInstance(Submitter.getJavaPartitioner(conf), conf);
@@ -52,7 +56,7 @@
    * @param value the value to partition
    * @param numPartitions the number of reduces
    */
-  public int getPartition(WritableComparable key, Writable value, 
+  public int getPartition(K key, V value, 
                           int numPartitions) {
     Integer result = cache.get();
     if (result == null) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java Thu Aug 16 11:45:49 2007
@@ -33,11 +33,13 @@
 /**
  * This class is used to talk to a C++ reduce task.
  */
-class PipesReducer implements Reducer {
+class PipesReducer<K2 extends WritableComparable, V2 extends Writable,
+    K3 extends WritableComparable, V3 extends Writable>
+    implements Reducer<K2, V2, K3, V3> {
   private static final Log LOG= LogFactory.getLog(PipesReducer.class.getName());
   private JobConf job;
-  private Application application = null;
-  private DownwardProtocol downlink = null;
+  private Application<K2, V2, K3, V3> application = null;
+  private DownwardProtocol<K2, V2> downlink = null;
   private boolean isOk = true;
 
   public void configure(JobConf job) {
@@ -48,23 +50,23 @@
    * Process all of the keys and values. Start up the application if we haven't
    * started it yet.
    */
-  public void reduce(WritableComparable key, Iterator values, 
-                     OutputCollector output, Reporter reporter
+  public void reduce(K2 key, Iterator<V2> values, 
+                     OutputCollector<K3, V3> output, Reporter reporter
                      ) throws IOException {
     isOk = false;
     startApplication(output, reporter);
     downlink.reduceKey(key);
     while (values.hasNext()) {
-      downlink.reduceValue((Writable) values.next());
+      downlink.reduceValue(values.next());
     }
     isOk = true;
   }
 
-  private void startApplication(OutputCollector output, Reporter reporter) throws IOException {
+  private void startApplication(OutputCollector<K3, V3> output, Reporter reporter) throws IOException {
     if (application == null) {
       try {
         LOG.info("starting application");
-        application = new Application(job, output, reporter, 
+        application = new Application<K2, V2, K3, V3>(job, output, reporter, 
                                       job.getOutputKeyClass(), 
                                       job.getOutputValueClass());
         downlink = application.getDownlink();
@@ -82,9 +84,9 @@
   public void close() throws IOException {
     // if we haven't started the application, we have nothing to do
     if (isOk) {
-      OutputCollector nullCollector = new OutputCollector() {
-        public void collect(WritableComparable key, 
-                            Writable value) throws IOException {
+      OutputCollector<K3, V3> nullCollector = new OutputCollector<K3, V3>() {
+        public void collect(K3 key, 
+                            V3 value) throws IOException {
           // NULL
         }
       };

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/UpwardProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/UpwardProtocol.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/UpwardProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/UpwardProtocol.java Thu Aug 16 11:45:49 2007
@@ -26,14 +26,14 @@
  * The interface for the messages that can come up from the child. All of these
  * calls are asynchronous and return before the message has been processed.
  */
-interface UpwardProtocol {
+interface UpwardProtocol<K extends WritableComparable, V extends Writable> {
   /**
    * Output a record from the child.
    * @param key the record's key
    * @param value the record's value
    * @throws IOException
    */
-  void output(WritableComparable key, Writable value) throws IOException;
+  void output(K key, V value) throws IOException;
   
   /**
    * Map functions where the application has defined a partition function
@@ -43,8 +43,8 @@
    * @param value the record's value
    * @throws IOException
    */
-  void partitionedOutput(int reduce, WritableComparable key, 
-                         Writable value) throws IOException;
+  void partitionedOutput(int reduce, K key, 
+                         V value) throws IOException;
   
   /**
    * Update the task's status message

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java Thu Aug 16 11:45:49 2007
@@ -18,30 +18,32 @@
 
 package org.apache.hadoop.tools;
 
-import java.io.*;
-
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
 import java.util.Random;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.fs.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.util.CopyFiles;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.mapred.lib.LongSumReducer;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
-
-import java.util.regex.Pattern;
-import java.util.regex.Matcher;
+import org.apache.hadoop.util.CopyFiles;
 
 /**
  * Logalyzer: A utility tool for archiving and analyzing hadoop logs.
@@ -62,7 +64,9 @@
   private static Configuration fsConfig = new Configuration();
   
   /** A {@link Mapper} that extracts text matching a regular expression. */
-  public static class LogRegexMapper extends MapReduceBase implements Mapper {
+  public static class LogRegexMapper<K extends WritableComparable>
+    extends MapReduceBase
+    implements Mapper<K, Text, Text, LongWritable> {
     
     private Pattern pattern;
     
@@ -70,13 +74,14 @@
       pattern = Pattern.compile(job.get("mapred.mapper.regex"));
     }
     
-    public void map(WritableComparable key, Writable value,
-                    OutputCollector output, Reporter reporter)
+    public void map(K key, Text value,
+                    OutputCollector<Text, LongWritable> output,
+                    Reporter reporter)
       throws IOException {
-      String text = ((Text)value).toString();
+      String text = value.toString();
       Matcher matcher = pattern.matcher(text);
       while (matcher.find()) {
-        output.collect((Text)value, new LongWritable(1));
+        output.collect(value, new LongWritable(1));
       }
     }
     

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Thu Aug 16 11:45:49 2007
@@ -182,7 +182,7 @@
    * DFSCopyFilesMapper: The mapper for copying files from the DFS.
    */
   public static class FSCopyFilesMapper extends CopyFilesMapper 
-    implements Mapper 
+    implements Mapper<Text, Writable, WritableComparable, Text> 
   {
     private int sizeBuf = 4096;
     private FileSystem srcFileSys = null;
@@ -397,11 +397,11 @@
      * @param out not-used.
      * @param reporter
      */
-    public void map(WritableComparable key,
+    public void map(Text key,
                     Writable value,
-                    OutputCollector out,
+                    OutputCollector<WritableComparable, Text> out,
                     Reporter reporter) throws IOException {
-      String src = ((Text) key).toString();
+      String src = key.toString();
       try {
         copy(src, reporter);
       } catch (IOException except) {

Modified: lucene/hadoop/trunk/src/test/checkstyle.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/checkstyle.xml?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/checkstyle.xml (original)
+++ lucene/hadoop/trunk/src/test/checkstyle.xml Thu Aug 16 11:45:49 2007
@@ -52,6 +52,7 @@
         <!-- See http://checkstyle.sf.net/config_javadoc.html -->
         <module name="JavadocType">
           <property name="scope" value="public"/>
+          <property name="allowMissingParamTags" value="true"/>
         </module>
         <module name="JavadocStyle"/>
 

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java Thu Aug 16 11:45:49 2007
@@ -44,7 +44,9 @@
  * </ul>
  * 
  */
-public class AccumulatingReducer extends MapReduceBase implements Reducer {
+public class AccumulatingReducer extends MapReduceBase
+    implements Reducer<UTF8, UTF8, UTF8, UTF8> {
+  
   protected String hostName;
   
   public AccumulatingReducer () {
@@ -57,12 +59,12 @@
     TaskTracker.LOG.info("Starting AccumulatingReducer on " + hostName);
   }
   
-  public void reduce(WritableComparable key, 
-                     Iterator values,
-                     OutputCollector output, 
+  public void reduce(UTF8 key, 
+                     Iterator<UTF8> values,
+                     OutputCollector<UTF8, UTF8> output, 
                      Reporter reporter
                      ) throws IOException {
-    String field = ((UTF8) key).toString();
+    String field = key.toString();
 
     reporter.setStatus("starting " + field + " ::host = " + hostName);
 
@@ -70,7 +72,7 @@
     if (field.startsWith("s:")) {
       String sSum = "";
       while (values.hasNext())
-        sSum += ((UTF8) values.next()).toString() + ";";
+        sSum += values.next().toString() + ";";
       output.collect(key, new UTF8(sSum));
       reporter.setStatus("finished " + field + " ::host = " + hostName);
       return;
@@ -79,7 +81,7 @@
     if (field.startsWith("f:")) {
       float fSum = 0;
       while (values.hasNext())
-        fSum += Float.parseFloat(((UTF8) values.next()).toString());
+        fSum += Float.parseFloat(values.next().toString());
       output.collect(key, new UTF8(String.valueOf(fSum)));
       reporter.setStatus("finished " + field + " ::host = " + hostName);
       return;
@@ -88,7 +90,7 @@
     if (field.startsWith("l:")) {
       long lSum = 0;
       while (values.hasNext()) {
-        lSum += Long.parseLong(((UTF8) values.next()).toString());
+        lSum += Long.parseLong(values.next().toString());
       }
       output.collect(key, new UTF8(String.valueOf(lSum)));
     }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java Thu Aug 16 11:45:49 2007
@@ -159,7 +159,7 @@
       super(fsConfig);
     }
     
-    void collectStats(OutputCollector output, 
+    void collectStats(OutputCollector<UTF8, UTF8> output, 
                       String name,
                       long execTime, 
                       Object objSize) throws IOException {

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java Thu Aug 16 11:45:49 2007
@@ -163,7 +163,7 @@
       return new Long(actualSize);
     }
     
-    void collectStats(OutputCollector output, 
+    void collectStats(OutputCollector<UTF8, UTF8> output, 
                       String name, 
                       long execTime, 
                       Object corruptedBlock) throws IOException {

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java Thu Aug 16 11:45:49 2007
@@ -39,7 +39,9 @@
  * statistics data to be collected by subsequent reducers.
  * 
  */
-public abstract class IOMapperBase extends Configured implements Mapper {
+public abstract class IOMapperBase extends Configured
+    implements Mapper<UTF8, LongWritable, UTF8, UTF8> {
+  
   protected byte[] buffer;
   protected int bufferSize;
   protected FileSystem fs;
@@ -91,7 +93,7 @@
    * @param doIOReturnValue value returned by {@link #doIO(Reporter,String,long)}
    * @throws IOException
    */
-  abstract void collectStats(OutputCollector output, 
+  abstract void collectStats(OutputCollector<UTF8, UTF8> output, 
                              String name, 
                              long execTime, 
                              Object doIOReturnValue) throws IOException;
@@ -109,12 +111,12 @@
    * {@link #collectStats(OutputCollector,String,long,Object)} 
    * is called to prepare stat data for a subsequent reducer.
    */
-  public void map(WritableComparable key, 
-                  Writable value,
-                  OutputCollector output, 
+  public void map(UTF8 key, 
+                  LongWritable value,
+                  OutputCollector<UTF8, UTF8> output, 
                   Reporter reporter) throws IOException {
-    String name = ((UTF8)key).toString();
-    long longValue = ((LongWritable)value).get();
+    String name = key.toString();
+    long longValue = value.get();
     
     reporter.setStatus("starting " + name + " ::host = " + hostName);
     

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java Thu Aug 16 11:45:49 2007
@@ -154,7 +154,7 @@
       super(fsConfig);
     }
     
-    void collectStats(OutputCollector output, 
+    void collectStats(OutputCollector<UTF8, UTF8> output, 
                       String name,
                       long execTime, 
                       Object objSize) throws IOException {

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java Thu Aug 16 11:45:49 2007
@@ -106,7 +106,9 @@
     LOG.info("created control file for: "+totalSize+" bytes");
   }
 
-  public static class WriteMapper extends Configured implements Mapper {
+  public static class WriteMapper extends Configured
+      implements Mapper<UTF8, LongWritable, UTF8, LongWritable> {
+    
     private Random random = new Random();
     private byte[] buffer = new byte[BUFFER_SIZE];
     private FileSystem fs;
@@ -132,11 +134,13 @@
       fastCheck = job.getBoolean("fs.test.fastCheck", false);
     }
 
-    public void map(WritableComparable key, Writable value,
-                    OutputCollector collector, Reporter reporter)
+    public void map(UTF8 key, LongWritable value,
+                    OutputCollector<UTF8, LongWritable> collector,
+                    Reporter reporter)
       throws IOException {
-      String name = ((UTF8)key).toString();
-      long size = ((LongWritable)value).get();
+      
+      String name = key.toString();
+      long size = value.get();
       long seed = Long.parseLong(name);
 
       random.setSeed(seed);
@@ -200,7 +204,9 @@
     JobClient.runJob(job);
   }
 
-  public static class ReadMapper extends Configured implements Mapper {
+  public static class ReadMapper extends Configured
+      implements Mapper<UTF8, LongWritable, UTF8, LongWritable> {
+    
     private Random random = new Random();
     private byte[] buffer = new byte[BUFFER_SIZE];
     private byte[] check  = new byte[BUFFER_SIZE];
@@ -224,11 +230,13 @@
       fastCheck = job.getBoolean("fs.test.fastCheck", false);
     }
 
-    public void map(WritableComparable key, Writable value,
-                    OutputCollector collector, Reporter reporter)
+    public void map(UTF8 key, LongWritable value,
+                    OutputCollector<UTF8, LongWritable> collector,
+                    Reporter reporter)
       throws IOException {
-      String name = ((UTF8)key).toString();
-      long size = ((LongWritable)value).get();
+      
+      String name = key.toString();
+      long size = value.get();
       long seed = Long.parseLong(name);
 
       random.setSeed(seed);

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRBench.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRBench.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRBench.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRBench.java Thu Aug 16 11:45:49 2007
@@ -51,9 +51,12 @@
    * Takes input format as text lines, runs some processing on it and 
    * writes out data as text again. 
    */
-  public static class Map extends MapReduceBase implements Mapper {
-    public void map(WritableComparable key, Writable value,
-                    OutputCollector output, Reporter reporter) throws IOException 
+  public static class Map extends MapReduceBase
+    implements Mapper<WritableComparable, UTF8, UTF8, UTF8> {
+    
+    public void map(WritableComparable key, UTF8 value,
+                    OutputCollector<UTF8, UTF8> output,
+                    Reporter reporter) throws IOException 
     {
       String line = value.toString();
       output.collect(new UTF8(process(line)), new UTF8(""));		
@@ -66,9 +69,11 @@
   /**
    * Ignores the key and writes values to the output. 
    */
-  public static class Reduce extends MapReduceBase implements Reducer {
-    public void reduce(WritableComparable key, Iterator values,
-                       OutputCollector output, Reporter reporter) throws IOException 
+  public static class Reduce extends MapReduceBase
+    implements Reducer<UTF8, UTF8, UTF8, UTF8> {
+    
+    public void reduce(UTF8 key, Iterator<UTF8> values,
+                       OutputCollector<UTF8, UTF8> output, Reporter reporter) throws IOException 
     {
       while(values.hasNext()) {
         output.collect(key, new UTF8(values.next().toString()));

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java Thu Aug 16 11:45:49 2007
@@ -21,9 +21,9 @@
 import java.io.*;
 import java.util.*;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -48,7 +48,9 @@
    * archives/files are set and then are checked in the map if they have been
    * localized or not.
    */
-  public static class MapClass extends MapReduceBase implements Mapper {
+  public static class MapClass extends MapReduceBase
+    implements Mapper<LongWritable, Text, Text, IntWritable> {
+    
     JobConf conf;
 
     private final static IntWritable one = new IntWritable(1);
@@ -97,9 +99,10 @@
       }
     }
 
-    public void map(WritableComparable key, Writable value,
-                    OutputCollector output, Reporter reporter) throws IOException {
-      String line = ((Text) value).toString();
+    public void map(LongWritable key, Text value,
+                    OutputCollector<Text, IntWritable> output,
+                    Reporter reporter) throws IOException {
+      String line = value.toString();
       StringTokenizer itr = new StringTokenizer(line);
       while (itr.hasMoreTokens()) {
         word.set(itr.nextToken());
@@ -112,13 +115,15 @@
   /**
    * A reducer class that just emits the sum of the input values.
    */
-  public static class ReduceClass extends MapReduceBase implements Reducer {
+  public static class ReduceClass extends MapReduceBase
+    implements Reducer<Text, IntWritable, Text, IntWritable> {
 
-    public void reduce(WritableComparable key, Iterator values,
-                       OutputCollector output, Reporter reporter) throws IOException {
+    public void reduce(Text key, Iterator<IntWritable> values,
+                       OutputCollector<Text, IntWritable> output,
+                       Reporter reporter) throws IOException {
       int sum = 0;
       while (values.hasNext()) {
-        sum += ((IntWritable) values.next()).get();
+        sum += values.next().get();
       }
       output.collect(key, new IntWritable(sum));
     }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java Thu Aug 16 11:45:49 2007
@@ -24,6 +24,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -39,7 +40,8 @@
    * Mappper class for Pi estimation.
    */
   
-  public static class PiMapper extends MapReduceBase implements Mapper {
+  public static class PiMapper extends MapReduceBase
+    implements Mapper<IntWritable, Writable, IntWritable, IntWritable> {
     
     static Random r = new Random();
     
@@ -49,11 +51,11 @@
      * @param out
      * @param reporter
      */
-    public void map(WritableComparable key,
+    public void map(IntWritable key,
                     Writable val,
-                    OutputCollector out,
+                    OutputCollector<IntWritable, IntWritable> out,
                     Reporter reporter) throws IOException {
-      int nSamples = ((IntWritable) key).get();
+      int nSamples = key.get();
       for(int idx = 0; idx < nSamples; idx++) {
         double x = r.nextDouble();
         double y = r.nextDouble();
@@ -74,7 +76,9 @@
     }
   }
   
-  public static class PiReducer extends MapReduceBase implements Reducer {
+  public static class PiReducer extends MapReduceBase 
+    implements Reducer<IntWritable, IntWritable, WritableComparable, Writable> {
+    
     int numInside = 0;
     int numOutside = 0;
     JobConf conf;
@@ -91,18 +95,18 @@
      * @param output
      * @param reporter
      */
-    public void reduce(WritableComparable key,
-                       Iterator values,
-                       OutputCollector output,
+    public void reduce(IntWritable key,
+                       Iterator<IntWritable> values,
+                       OutputCollector<WritableComparable, Writable> output,
                        Reporter reporter) throws IOException {
-      if (((IntWritable)key).get() == 1) {
+      if (key.get() == 1) {
         while (values.hasNext()) {
-          int num = ((IntWritable)values.next()).get();
+          int num = values.next().get();
           numInside += num;
         }
       } else {
         while (values.hasNext()) {
-          int num = ((IntWritable)values.next()).get();
+          int num = values.next().get();
           numOutside += num;
         }
       }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/SortValidator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/SortValidator.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/SortValidator.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/SortValidator.java Thu Aug 16 11:45:49 2007
@@ -31,6 +31,7 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.RecordStatsWritable;
 import org.apache.hadoop.mapred.lib.HashPartitioner;
 import org.apache.hadoop.fs.*;
 
@@ -129,10 +130,13 @@
       public int getChecksum() { return checksum; }
     }
     
-    public static class Map extends MapReduceBase implements Mapper {
+    public static class Map extends MapReduceBase
+      implements Mapper<BytesWritable, BytesWritable,
+                        IntWritable, RecordStatsWritable> {
+      
       private IntWritable key = null;
       private BytesWritable prevKey = null;
-      private Partitioner partitioner = null;
+      private Partitioner<BytesWritable, BytesWritable> partitioner = null;
       private int partition = -1;
       private int noSortReducers = -1;
       private long recordId = -1;
@@ -142,7 +146,7 @@
         key = deduceInputFile(job);
         
         if (key == sortOutput) {
-          partitioner = new HashPartitioner();
+          partitioner = new HashPartitioner<BytesWritable, BytesWritable>();
           
           // Figure the 'current' partition and no. of reduces of the 'sort'
           try {
@@ -159,12 +163,12 @@
         }
       }
       
-      public void map(WritableComparable key, 
-                      Writable value,
-                      OutputCollector output, 
+      public void map(BytesWritable key, 
+                      BytesWritable value,
+                      OutputCollector<IntWritable, RecordStatsWritable> output, 
                       Reporter reporter) throws IOException {
-        BytesWritable bwKey = (BytesWritable)key;
-        BytesWritable bwValue = (BytesWritable)value;
+        BytesWritable bwKey = key;
+        BytesWritable bwValue = value;
         ++recordId;
         
         if (this.key == sortOutput) {
@@ -201,15 +205,19 @@
       }
     }
     
-    public static class Reduce extends MapReduceBase implements Reducer {
-      public void reduce(WritableComparable key, Iterator values,
-                         OutputCollector output, 
+    public static class Reduce extends MapReduceBase
+      implements Reducer<IntWritable, RecordStatsWritable,
+                         IntWritable, RecordStatsWritable> {
+      
+      public void reduce(IntWritable key, Iterator<RecordStatsWritable> values,
+                         OutputCollector<IntWritable,
+                                         RecordStatsWritable> output, 
                          Reporter reporter) throws IOException {
         long bytes = 0;
         long records = 0;
         int xor = 0;
         while (values.hasNext()) {
-          RecordStatsWritable stats = ((RecordStatsWritable)values.next());
+          RecordStatsWritable stats = values.next();
           bytes += stats.getBytes();
           records += stats.getRecords();
           xor ^= stats.getChecksum(); 
@@ -308,7 +316,10 @@
    */
   public static class RecordChecker {
     
-    public static class Map extends MapReduceBase implements Mapper {
+    public static class Map extends MapReduceBase
+      implements Mapper<BytesWritable, BytesWritable,
+                        BytesWritable, IntWritable> {
+      
       private IntWritable value = null;
       
       public void configure(JobConf job) {
@@ -316,27 +327,29 @@
         value = deduceInputFile(job);
       }
       
-      public void map(WritableComparable key, 
-                      Writable value,
-                      OutputCollector output, 
+      public void map(BytesWritable key, 
+                      BytesWritable value,
+                      OutputCollector<BytesWritable, IntWritable> output, 
                       Reporter reporter) throws IOException {
         // newKey = (key, value)
-        BytesWritable keyValue = 
-          new BytesWritable(pair((BytesWritable)key, (BytesWritable)value));
+        BytesWritable keyValue = new BytesWritable(pair(key, value));
     
         // output (newKey, value)
         output.collect(keyValue, this.value);
       }
     }
     
-    public static class Reduce extends MapReduceBase implements Reducer {
-      public void reduce(WritableComparable key, Iterator values,
-                         OutputCollector output, 
+    public static class Reduce extends MapReduceBase
+      implements Reducer<BytesWritable, IntWritable,
+                        BytesWritable, IntWritable> {
+      
+      public void reduce(BytesWritable key, Iterator<IntWritable> values,
+                         OutputCollector<BytesWritable, IntWritable> output,
                          Reporter reporter) throws IOException {
         int ones = 0;
         int twos = 0;
         while (values.hasNext()) {
-          IntWritable count = ((IntWritable) values.next()); 
+          IntWritable count = values.next(); 
           if (count.equals(sortInput)) {
             ++ones;
           } else if (count.equals(sortOutput)) {

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java Thu Aug 16 11:45:49 2007
@@ -51,16 +51,19 @@
    * mediate key value pairs are ordered by {input key, value}.
    * Think of the random value as a timestamp associated with the record. 
    */
-  static class RandomGenMapper implements Mapper {
+  static class RandomGenMapper
+    implements Mapper<IntWritable, Writable, IntWritable, IntWritable> {
+    
     public void configure(JobConf job) {
     }
     
-    public void map(WritableComparable key, Writable value,
-                    OutputCollector out, Reporter reporter) throws IOException {
+    public void map(IntWritable key, Writable value,
+                    OutputCollector<IntWritable, IntWritable> out,
+                    Reporter reporter) throws IOException {
       int num_values = 5;
       for(int i = 0; i < num_values; ++i) {
         int val = rng.nextInt(num_values);
-        int compositeKey = ((IntWritable)(key)).get() * 100 + val;
+        int compositeKey = key.get() * 100 + val;
         out.collect(new IntWritable(compositeKey), new IntWritable(val));
       }
     }
@@ -72,12 +75,16 @@
   /** 
    * Your basic identity mapper. 
    */
-  static class IdentityMapper implements Mapper {
+  static class IdentityMapper
+    implements Mapper<WritableComparable, Writable,
+                      WritableComparable, Writable> {
+    
     public void configure(JobConf job) {
     }
     
     public void map(WritableComparable key, Writable value,
-                    OutputCollector out, Reporter reporter) throws IOException {
+                    OutputCollector<WritableComparable, Writable> out,
+                    Reporter reporter) throws IOException {
       out.collect(key, value);
     }
     
@@ -88,14 +95,17 @@
   /** 
    * Checks whether keys are in ascending order.  
    */
-  static class AscendingKeysReducer implements Reducer {
+  static class AscendingKeysReducer
+    implements Reducer<IntWritable, Writable, IntWritable, Text> {
+    
     public void configure(JobConf job) {}
 
     // keep track of the last key we've seen
     private int lastKey = Integer.MIN_VALUE;
-    public void reduce(WritableComparable key, Iterator values, 
-        OutputCollector out, Reporter reporter) throws IOException {
-      int currentKey = ((IntWritable)(key)).get();
+    public void reduce(IntWritable key, Iterator<Writable> values, 
+                       OutputCollector<IntWritable, Text> out,
+                       Reporter reporter) throws IOException {
+      int currentKey = key.get();
       // keys should be in ascending order
       if (currentKey < lastKey) {
         fail("Keys not in sorted ascending order");
@@ -110,13 +120,15 @@
   /** 
    * Checks whether keys are in ascending order.  
    */
-  static class DescendingKeysReducer implements Reducer {
+  static class DescendingKeysReducer
+    implements Reducer<IntWritable, Writable, IntWritable, Text> {
     public void configure(JobConf job) {}
 
     // keep track of the last key we've seen
     private int lastKey = Integer.MAX_VALUE;
-    public void reduce(WritableComparable key, Iterator values, 
-        OutputCollector out, Reporter reporter) throws IOException {
+    public void reduce(IntWritable key, Iterator<Writable> values, 
+                       OutputCollector<IntWritable, Text> out,
+                       Reporter reporter) throws IOException {
       int currentKey = ((IntWritable)(key)).get();
       // keys should be in descending order
       if (currentKey > lastKey) {
@@ -134,19 +146,20 @@
    * should have 5 values if the grouping is correct). It also checks whether
    * the keys themselves are in ascending order.
    */
-  static class AscendingGroupReducer implements Reducer {
+  static class AscendingGroupReducer
+    implements Reducer<IntWritable, IntWritable, IntWritable, Text> {
     
     public void configure(JobConf job) {
     }
 
     // keep track of the last key we've seen
     private int lastKey = Integer.MIN_VALUE;
-    public void reduce(WritableComparable key,
-                       Iterator values,
-                       OutputCollector out,
+    public void reduce(IntWritable key,
+                       Iterator<IntWritable> values,
+                       OutputCollector<IntWritable, Text> out,
                        Reporter reporter) throws IOException {
       // check key order
-      int currentKey = ((IntWritable)(key)).get();
+      int currentKey = key.get();
       if (currentKey < lastKey) {
         fail("Keys not in sorted ascending order");
       }
@@ -155,7 +168,7 @@
       IntWritable previous = new IntWritable(Integer.MIN_VALUE);
       int valueCount = 0;
       while (values.hasNext()) {
-        IntWritable current = (IntWritable) values.next();
+        IntWritable current = values.next();
         
         // Check that the values are sorted
         if (current.compareTo(previous) < 0)
@@ -177,19 +190,20 @@
    * whether they are correctly grouped by key (i.e. each call to reduce
    * should have 5 values if the grouping is correct). 
    */
-  static class DescendingGroupReducer implements Reducer {
+  static class DescendingGroupReducer
+    implements Reducer<IntWritable, IntWritable, IntWritable, Text> {
     
     public void configure(JobConf job) {
     }
 
     // keep track of the last key we've seen
     private int lastKey = Integer.MAX_VALUE;
-    public void reduce(WritableComparable key,
-                       Iterator values,
-                       OutputCollector out,
+    public void reduce(IntWritable key,
+                       Iterator<IntWritable> values,
+                       OutputCollector<IntWritable, Text> out,
                        Reporter reporter) throws IOException {
       // check key order
-      int currentKey = ((IntWritable)(key)).get();
+      int currentKey = key.get();
       if (currentKey > lastKey) {
         fail("Keys not in sorted descending order");
       }
@@ -198,7 +212,7 @@
       IntWritable previous = new IntWritable(Integer.MAX_VALUE);
       int valueCount = 0;
       while (values.hasNext()) {
-        IntWritable current = (IntWritable) values.next();
+        IntWritable current = values.next();
         
         // Check that the values are sorted
         if (current.compareTo(previous) > 0)

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java Thu Aug 16 11:45:49 2007
@@ -81,7 +81,7 @@
       }
 
       // try splitting the file in a variety of sizes
-      TextInputFormat format = new KeyValueTextInputFormat();
+      KeyValueTextInputFormat format = new KeyValueTextInputFormat();
       format.configure(job);
       for (int i = 0; i < 3; i++) {
         int numSplits = random.nextInt(MAX_LENGTH/20)+1;
@@ -93,14 +93,14 @@
         BitSet bits = new BitSet(length);
         for (int j = 0; j < splits.length; j++) {
           LOG.debug("split["+j+"]= " + splits[j]);
-          RecordReader reader =
+          RecordReader<Text, Text> reader =
             format.getRecordReader(splits[j], job, reporter);
           Class readerClass = reader.getClass();
           assertEquals("reader class is KeyValueLineRecordReader.", KeyValueLineRecordReader.class, readerClass);        
 
-          Writable key = reader.createKey();
+          Text key = reader.createKey();
           Class keyClass = key.getClass();
-          Writable value = reader.createValue();
+          Text value = reader.createValue();
           Class valueClass = value.getClass();
           assertEquals("Key class is Text.", Text.class, keyClass);
           assertEquals("Value class is Text.", Text.class, valueClass);
@@ -187,14 +187,14 @@
   
   private static final Reporter voidReporter = Reporter.NULL;
   
-  private static List<Text> readSplit(InputFormat format, 
+  private static List<Text> readSplit(KeyValueTextInputFormat format, 
                                       InputSplit split, 
                                       JobConf job) throws IOException {
     List<Text> result = new ArrayList<Text>();
-    RecordReader reader = format.getRecordReader(split, job,
+    RecordReader<Text, Text> reader = format.getRecordReader(split, job,
                                                  voidReporter);
-    Text key = (Text) reader.createKey();
-    Text value = (Text) reader.createValue();
+    Text key = reader.createKey();
+    Text value = reader.createValue();
     while (reader.next(key, value)) {
       result.add(value);
       value = (Text) reader.createValue();

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputType.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputType.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputType.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputType.java Thu Aug 16 11:45:49 2007
@@ -38,16 +38,16 @@
    * type specified in conf will be anything but.
    */
    
-  static class TextGen implements Mapper {
+  static class TextGen
+    implements Mapper<WritableComparable, Writable, Text, Text> {
+    
     public void configure(JobConf job) {
     }
     
-    public void map(WritableComparable key, Writable val, OutputCollector out,
+    public void map(WritableComparable key, Writable val,
+                    OutputCollector<Text, Text> out,
                     Reporter reporter) throws IOException {
-      key = new Text("Hello");
-      val = new Text("World");
-      
-      out.collect(key, val);
+      out.collect(new Text("Hello"), new Text("World"));
     }
     
     public void close() {
@@ -57,14 +57,15 @@
   /** A do-nothing reducer class. We won't get this far, really.
    *
    */
-  static class TextReduce implements Reducer {
+  static class TextReduce
+    implements Reducer<Text, Text, Text, Text> {
     
     public void configure(JobConf job) {
     }
 
-    public void reduce(WritableComparable key,
-                       Iterator values,
-                       OutputCollector out,
+    public void reduce(Text key,
+                       Iterator<Text> values,
+                       OutputCollector<Text, Text> out,
                        Reporter reporter) throws IOException {
       out.collect(new Text("Test"), new Text("Me"));
     }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Thu Aug 16 11:45:49 2007
@@ -83,13 +83,17 @@
    * of numbers in random order, but where each number appears
    * as many times as we were instructed.
    */
-  static class RandomGenMapper implements Mapper {
+  static class RandomGenMapper
+    implements Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
+    
     public void configure(JobConf job) {
     }
 
-    public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
-      int randomVal = ((IntWritable) key).get();
-      int randomCount = ((IntWritable) val).get();
+    public void map(IntWritable key, IntWritable val,
+                    OutputCollector<IntWritable, IntWritable> out,
+                    Reporter reporter) throws IOException {
+      int randomVal = key.get();
+      int randomCount = key.get();
 
       for (int i = 0; i < randomCount; i++) {
         out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
@@ -100,13 +104,17 @@
   }
   /**
    */
-  static class RandomGenReducer implements Reducer {
+  static class RandomGenReducer
+    implements Reducer<IntWritable, IntWritable, Text, Text> {
+    
     public void configure(JobConf job) {
     }
 
-    public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
+    public void reduce(IntWritable key, Iterator<IntWritable> it,
+                       OutputCollector<Text, Text> out,
+                       Reporter reporter) throws IOException {
       while (it.hasNext()) {
-        int val = ((IntWritable) it.next()).get();
+        int val = it.next().get();
         out.collect(new Text("" + val), new Text(""));
       }
     }
@@ -130,26 +138,31 @@
    * Each key here is a random number, and the count is the
    * number of times the number was emitted.
    */
-  static class RandomCheckMapper implements Mapper {
+  static class RandomCheckMapper
+    implements Mapper<WritableComparable, Text, IntWritable, IntWritable> {
+    
     public void configure(JobConf job) {
     }
 
-    public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
-      Text str = (Text) val;
-
-      out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1));
+    public void map(WritableComparable key, Text val,
+                    OutputCollector<IntWritable, IntWritable> out,
+                    Reporter reporter) throws IOException {
+      out.collect(new IntWritable(Integer.parseInt(val.toString().trim())), new IntWritable(1));
     }
     public void close() {
     }
   }
   /**
    */
-  static class RandomCheckReducer implements Reducer {
+  static class RandomCheckReducer
+      implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
     public void configure(JobConf job) {
     }
         
-    public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
-      int keyint = ((IntWritable) key).get();
+    public void reduce(IntWritable key, Iterator<IntWritable> it,
+                       OutputCollector<IntWritable, IntWritable> out,
+                       Reporter reporter) throws IOException {
+      int keyint = key.get();
       int count = 0;
       while (it.hasNext()) {
         it.next();
@@ -169,28 +182,35 @@
    * Thus, the map() function is just the identity function
    * and reduce() just sums.  Nothing to see here!
    */
-  static class MergeMapper implements Mapper {
+  static class MergeMapper
+    implements Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
+    
     public void configure(JobConf job) {
     }
 
-    public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
-      int keyint = ((IntWritable) key).get();
-      int valint = ((IntWritable) val).get();
+    public void map(IntWritable key, IntWritable val,
+                    OutputCollector<IntWritable, IntWritable> out,
+                    Reporter reporter) throws IOException {
+      int keyint = key.get();
+      int valint = val.get();
 
       out.collect(new IntWritable(keyint), new IntWritable(valint));
     }
     public void close() {
     }
   }
-  static class MergeReducer implements Reducer {
+  static class MergeReducer
+    implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
     public void configure(JobConf job) {
     }
         
-    public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
-      int keyint = ((IntWritable) key).get();
+    public void reduce(IntWritable key, Iterator<IntWritable> it,
+                       OutputCollector<IntWritable, IntWritable> out,
+                       Reporter reporter) throws IOException {
+      int keyint = key.get();
       int total = 0;
       while (it.hasNext()) {
-        total += ((IntWritable) it.next()).get();
+        total += it.next().get();
       }
       out.collect(new IntWritable(keyint), new IntWritable(total));
     }
@@ -214,15 +234,16 @@
     launch();
   }
 
-  private static class MyMap implements Mapper {
+  private static class MyMap
+    implements Mapper<WritableComparable, Text, Text, Text> {
       
     public void configure(JobConf conf) {
     }
       
-    public void map(WritableComparable key, Writable value,
-                    OutputCollector output, Reporter reporter
-                    ) throws IOException {
-      String str = ((Text) value).toString().toLowerCase();
+    public void map(WritableComparable key, Text value,
+                    OutputCollector<Text, Text> output,
+                    Reporter reporter) throws IOException {
+      String str = value.toString().toLowerCase();
       output.collect(new Text(str), value);
     }
 

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Thu Aug 16 11:45:49 2007
@@ -111,7 +111,9 @@
     
   }
   
-  private static class MyInputFormat implements InputFormat {
+  private static class MyInputFormat
+    implements InputFormat<IntWritable, Text> {
+    
     static final String[] data = new String[]{
       "crocodile pants", 
       "aunt annie", 
@@ -151,7 +153,7 @@
       }
     }
 
-    static class MyRecordReader implements RecordReader {
+    static class MyRecordReader implements RecordReader<IntWritable, Text> {
       int index;
       int past;
       int length;
@@ -162,21 +164,21 @@
         this.length = length;
       }
 
-      public boolean next(Writable key, Writable value) throws IOException {
+      public boolean next(IntWritable key, Text value) throws IOException {
         if (index < past) {
-          ((IntWritable) key).set(index);
-          ((Text) value).set(data[index]);
+          key.set(index);
+          value.set(data[index]);
           index += 1;
           return true;
         }
         return false;
       }
       
-      public WritableComparable createKey() {
+      public IntWritable createKey() {
         return new IntWritable();
       }
       
-      public Writable createValue() {
+      public Text createValue() {
         return new Text();
       }
 
@@ -200,18 +202,23 @@
                            new MySplit(4, 2)};
     }
 
-    public RecordReader getRecordReader(InputSplit split,
-                                        JobConf job, 
-                                        Reporter reporter) throws IOException {
+    public RecordReader<IntWritable, Text> getRecordReader(InputSplit split,
+                                                           JobConf job, 
+                                                           Reporter reporter)
+                                                           throws IOException {
       MySplit sp = (MySplit) split;
       return new MyRecordReader(sp.first, sp.length);
     }
     
   }
   
-  static class MyMapper extends MapReduceBase implements Mapper {
+  static class MyMapper extends MapReduceBase
+    implements Mapper<WritableComparable, Writable,
+                      WritableComparable, Writable> {
+    
     public void map(WritableComparable key, Writable value, 
-                    OutputCollector out, Reporter reporter) throws IOException {
+                    OutputCollector<WritableComparable, Writable> out,
+                    Reporter reporter) throws IOException {
       System.out.println("map: " + key + ", " + value);
       out.collect((WritableComparable) value, key);
       InputSplit split = reporter.getInputSplit();
@@ -222,10 +229,12 @@
     }
   }
 
-  static class MyReducer extends MapReduceBase implements Reducer {
-    public void reduce(WritableComparable key, Iterator values, 
-                       OutputCollector output, Reporter reporter
-                       ) throws IOException {
+  static class MyReducer extends MapReduceBase
+    implements Reducer<WritableComparable, Writable,
+                      WritableComparable, Writable> {
+    public void reduce(WritableComparable key, Iterator<Writable> values, 
+                       OutputCollector<WritableComparable, Writable> output,
+                       Reporter reporter) throws IOException {
       try {
         InputSplit split = reporter.getInputSplit();
         throw new IOException("Got an input split of " + split);