You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/08/16 20:45:56 UTC
svn commit: r566798 [2/3] - in /lucene/hadoop/trunk: ./
src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/examples/org/apache/hadoop/examples/
src/examples/org/apache/hadoop/examp...
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java Thu Aug 16 11:45:49 2007
@@ -50,15 +50,21 @@
* <b>mapred.map.multithreadedrunner.threads</b> property).
* <p>
*/
-public class MultithreadedMapRunner implements MapRunnable {
+public class MultithreadedMapRunner<K1 extends WritableComparable,
+ V1 extends Writable,
+ K2 extends WritableComparable,
+ V2 extends Writable>
+ implements MapRunnable<K1, V1, K2, V2> {
+
private static final Log LOG =
LogFactory.getLog(MultithreadedMapRunner.class.getName());
private JobConf job;
- private Mapper mapper;
+ private Mapper<K1, V1, K2, V2> mapper;
private ExecutorService executorService;
private volatile IOException ioException;
+ @SuppressWarnings("unchecked")
public void configure(JobConf job) {
int numberOfThreads =
job.getInt("mapred.map.multithreadedrunner.threads", 10);
@@ -76,14 +82,14 @@
executorService = Executors.newFixedThreadPool(numberOfThreads);
}
- public void run(RecordReader input, OutputCollector output,
+ public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
Reporter reporter)
throws IOException {
try {
// allocate key & value instances these objects will not be reused
// because execution of Mapper.map is not serialized.
- WritableComparable key = input.createKey();
- Writable value = input.createValue();
+ K1 key = input.createKey();
+ V1 value = input.createValue();
while (input.next(key, value)) {
@@ -166,9 +172,9 @@
* Runnable to execute a single Mapper.map call from a forked thread.
*/
private class MapperInvokeRunable implements Runnable {
- private WritableComparable key;
- private Writable value;
- private OutputCollector output;
+ private K1 key;
+ private V1 value;
+ private OutputCollector<K2, V2> output;
private Reporter reporter;
/**
@@ -180,8 +186,9 @@
* @param output
* @param reporter
*/
- public MapperInvokeRunable(WritableComparable key, Writable value,
- OutputCollector output, Reporter reporter) {
+ public MapperInvokeRunable(K1 key, V1 value,
+ OutputCollector<K2, V2> output,
+ Reporter reporter) {
this.key = key;
this.value = value;
this.output = output;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java Thu Aug 16 11:45:49 2007
@@ -30,11 +30,14 @@
/**
* Consume all outputs and put them in /dev/null.
*/
-public class NullOutputFormat implements OutputFormat {
- public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
+public class NullOutputFormat<K extends WritableComparable,
+ V extends Writable>
+ implements OutputFormat<K, V> {
+
+ public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress) {
- return new RecordWriter(){
- public void write(WritableComparable key, Writable value) { }
+ return new RecordWriter<K, V>(){
+ public void write(K key, V value) { }
public void close(Reporter reporter) { }
};
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java Thu Aug 16 11:45:49 2007
@@ -19,25 +19,23 @@
package org.apache.hadoop.mapred.lib;
import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.MapReduceBase;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-
-import java.util.regex.Pattern;
-import java.util.regex.Matcher;
/** A {@link Mapper} that extracts text matching a regular expression. */
-public class RegexMapper extends MapReduceBase implements Mapper {
+public class RegexMapper<K extends WritableComparable>
+ extends MapReduceBase
+ implements Mapper<K, Text, Text, LongWritable> {
private Pattern pattern;
private int group;
@@ -47,10 +45,11 @@
group = job.getInt("mapred.mapper.regex.group", 0);
}
- public void map(WritableComparable key, Writable value,
- OutputCollector output, Reporter reporter)
+ public void map(K key, Text value,
+ OutputCollector<Text, LongWritable> output,
+ Reporter reporter)
throws IOException {
- String text = ((Text)value).toString();
+ String text = value.toString();
Matcher matcher = pattern.matcher(text);
while (matcher.find()) {
output.collect(new Text(matcher.group(group)), new LongWritable(1));
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java Thu Aug 16 11:45:49 2007
@@ -21,26 +21,27 @@
import java.io.IOException;
import java.util.StringTokenizer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.MapReduceBase;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
/** A {@link Mapper} that maps text values into <token,freq> pairs. Uses
* {@link StringTokenizer} to break text into tokens. */
-public class TokenCountMapper extends MapReduceBase implements Mapper {
+public class TokenCountMapper<K extends WritableComparable>
+ extends MapReduceBase
+ implements Mapper<K, Text, Text, LongWritable> {
- public void map(WritableComparable key, Writable value,
- OutputCollector output, Reporter reporter)
+ public void map(K key, Text value,
+ OutputCollector<Text, LongWritable> output,
+ Reporter reporter)
throws IOException {
// get input text
- String text = ((Text)value).toString(); // value is line of text
+ String text = value.toString(); // value is line of text
// tokenize the value
StringTokenizer st = new StringTokenizer(text);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UserDefinedValueAggregatorDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UserDefinedValueAggregatorDescriptor.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UserDefinedValueAggregatorDescriptor.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UserDefinedValueAggregatorDescriptor.java Thu Aug 16 11:45:49 2007
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Map.Entry;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
/**
@@ -87,8 +88,9 @@
* aggregation type which is used to guide the way to aggregate the
* value in the reduce/combiner phrase of an Aggregate based job.
*/
- public ArrayList<Entry> generateKeyValPairs(Object key, Object val) {
- ArrayList<Entry> retv = new ArrayList<Entry>();
+ public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key,
+ Object val) {
+ ArrayList<Entry<Text, Text>> retv = new ArrayList<Entry<Text, Text>>();
if (this.theAggregatorDescriptor != null) {
retv = this.theAggregatorDescriptor.generateKeyValPairs(key, val);
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorBaseDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorBaseDescriptor.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorBaseDescriptor.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorBaseDescriptor.java Thu Aug 16 11:45:49 2007
@@ -49,25 +49,25 @@
public String inputFile = null;
- private static class MyEntry implements Entry {
- Object key;
+ private static class MyEntry implements Entry<Text, Text> {
+ Text key;
- Object val;
+ Text val;
- public Object getKey() {
+ public Text getKey() {
return key;
}
- public Object getValue() {
+ public Text getValue() {
return val;
}
- public Object setValue(Object val) {
+ public Text setValue(Text val) {
this.val = val;
return val;
}
- public MyEntry(Object key, Object val) {
+ public MyEntry(Text key, Text val) {
this.key = key;
this.val = val;
}
@@ -81,7 +81,7 @@
* @return an Entry whose key is the aggregation id prefixed with
* the aggregation type.
*/
- public static Entry generateEntry(String type, String id, Object val) {
+ public static Entry<Text, Text> generateEntry(String type, String id, Text val) {
Text key = new Text(type + TYPE_SEPARATOR + id);
return new MyEntry(key, val);
}
@@ -129,11 +129,12 @@
* aggregation type which is used to guide the way to aggregate the
* value in the reduce/combiner phrase of an Aggregate based job.
*/
- public ArrayList<Entry> generateKeyValPairs(Object key, Object val) {
- ArrayList<Entry> retv = new ArrayList<Entry>();
+ public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key,
+ Object val) {
+ ArrayList<Entry<Text, Text>> retv = new ArrayList<Entry<Text, Text>>();
String countType = LONG_VALUE_SUM;
String id = "record_count";
- Entry e = generateEntry(countType, id, ONE);
+ Entry<Text, Text> e = generateEntry(countType, id, ONE);
if (e != null) {
retv.add(e);
}
@@ -153,6 +154,7 @@
*/
public void configure(JobConf job) {
this.inputFile = job.get("map.input.file");
- maxNumItems = job.getLong("aggregate.max.num.unique.values", Long.MAX_VALUE);
+ maxNumItems = job.getLong("aggregate.max.num.unique.values",
+ Long.MAX_VALUE);
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorCombiner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorCombiner.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorCombiner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorCombiner.java Thu Aug 16 11:45:49 2007
@@ -31,7 +31,9 @@
/**
* This class implements the generic combiner of Aggregate.
*/
-public class ValueAggregatorCombiner extends ValueAggregatorJobBase {
+public class ValueAggregatorCombiner<K1 extends WritableComparable,
+ V1 extends Writable>
+ extends ValueAggregatorJobBase<K1, V1> {
/**
* Combiner does not need to configure.
@@ -46,8 +48,8 @@
* @param values the values to combine
* @param output to collect combined values
*/
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter) throws IOException {
+ public void reduce(Text key, Iterator<Text> values,
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String keyStr = key.toString();
int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
String type = keyStr.substring(0, pos);
@@ -80,7 +82,7 @@
* Do nothing. Should not be called.
*
*/
- public void map(WritableComparable arg0, Writable arg1, OutputCollector arg2,
+ public void map(K1 arg0, V1 arg1, OutputCollector<Text, Text> arg2,
Reporter arg3) throws IOException {
throw new IOException ("should not be called\n");
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorDescriptor.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorDescriptor.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorDescriptor.java Thu Aug 16 11:45:49 2007
@@ -54,7 +54,8 @@
* aggregation type which is used to guide the way to aggregate the
* value in the reduce/combiner phrase of an Aggregate based job.
*/
- public ArrayList<Entry> generateKeyValPairs(Object key, Object val);
+ public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key,
+ Object val);
/**
* Configure the object
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJobBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJobBase.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJobBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJobBase.java Thu Aug 16 11:45:49 2007
@@ -21,6 +21,9 @@
import java.io.IOException;
import java.util.ArrayList;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
@@ -29,7 +32,9 @@
* This abstract class implements some common functionalities of the
* the generic mapper, reducer and combiner classes of Aggregate.
*/
-public abstract class ValueAggregatorJobBase implements Mapper, Reducer {
+public abstract class ValueAggregatorJobBase<K1 extends WritableComparable,
+ V1 extends Writable>
+ implements Mapper<K1, V1, Text, Text>, Reducer<Text, Text, Text, Text> {
protected ArrayList<ValueAggregatorDescriptor> aggregatorDescriptorList = null;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorMapper.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorMapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorMapper.java Thu Aug 16 11:45:49 2007
@@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.Map.Entry;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.OutputCollector;
@@ -30,23 +31,25 @@
/**
* This class implements the generic mapper of Aggregate.
*/
-public class ValueAggregatorMapper extends ValueAggregatorJobBase {
+public class ValueAggregatorMapper<K1 extends WritableComparable,
+ V1 extends Writable>
+ extends ValueAggregatorJobBase<K1, V1> {
/**
* the map function. It iterates through the value aggregator descriptor
* list to generate aggregation id/value pairs and emit them.
*/
- public void map(WritableComparable key, Writable value,
- OutputCollector output, Reporter reporter) throws IOException {
+ public void map(K1 key, V1 value,
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
Iterator iter = this.aggregatorDescriptorList.iterator();
while (iter.hasNext()) {
ValueAggregatorDescriptor ad = (ValueAggregatorDescriptor) iter.next();
- Iterator<Entry> ens = ad.generateKeyValPairs(key, value).iterator();
+ Iterator<Entry<Text, Text>> ens =
+ ad.generateKeyValPairs(key, value).iterator();
while (ens.hasNext()) {
- Entry en = ens.next();
- output.collect((WritableComparable) en.getKey(), (Writable) en
- .getValue());
+ Entry<Text, Text> en = ens.next();
+ output.collect(en.getKey(), en.getValue());
}
}
}
@@ -54,8 +57,9 @@
/**
* Do nothing. Should not be called.
*/
- public void reduce(WritableComparable arg0, Iterator arg1,
- OutputCollector arg2, Reporter arg3) throws IOException {
- throw new IOException ("should not be called\n");
+ public void reduce(Text arg0, Iterator<Text> arg1,
+ OutputCollector<Text, Text> arg2,
+ Reporter arg3) throws IOException {
+ throw new IOException("should not be called\n");
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorReducer.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorReducer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorReducer.java Thu Aug 16 11:45:49 2007
@@ -32,7 +32,9 @@
*
*
*/
-public class ValueAggregatorReducer extends ValueAggregatorJobBase {
+public class ValueAggregatorReducer<K1 extends WritableComparable,
+ V1 extends Writable>
+ extends ValueAggregatorJobBase<K1, V1> {
/**
* @param key
@@ -43,8 +45,8 @@
* may be further customiized.
* @value the values to be aggregated
*/
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter) throws IOException {
+ public void reduce(Text key, Iterator<Text> values,
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String keyStr = key.toString();
int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
String type = keyStr.substring(0, pos);
@@ -65,7 +67,7 @@
/**
* Do nothing. Should not be called
*/
- public void map(WritableComparable arg0, Writable arg1, OutputCollector arg2,
+ public void map(K1 arg0, V1 arg1, OutputCollector<Text, Text> arg2,
Reporter arg3) throws IOException {
throw new IOException ("should not be called\n");
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java Thu Aug 16 11:45:49 2007
@@ -44,13 +44,14 @@
* This class is responsible for launching and communicating with the child
* process.
*/
-class Application {
+class Application<K1 extends WritableComparable, V1 extends Writable,
+ K2 extends WritableComparable, V2 extends Writable> {
private static final Log LOG = LogFactory.getLog(Application.class.getName());
private ServerSocket serverSocket;
private Process process;
private Socket clientSocket;
- private OutputHandler handler;
- private BinaryProtocol downlink;
+ private OutputHandler<K2, V2> handler;
+ private BinaryProtocol<K1, V1, K2, V2> downlink;
/**
* Start the child process to handle the task for us.
@@ -62,7 +63,8 @@
* @throws IOException
* @throws InterruptedException
*/
- Application(JobConf conf, OutputCollector output, Reporter reporter,
+ @SuppressWarnings("unchecked")
+ Application(JobConf conf, OutputCollector<K2, V2> output, Reporter reporter,
Class outputKeyClass, Class outputValueClass
) throws IOException, InterruptedException {
serverSocket = new ServerSocket(0);
@@ -81,12 +83,12 @@
cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength);
process = runClient(cmd, env);
clientSocket = serverSocket.accept();
- handler = new OutputHandler(output, reporter);
- WritableComparable outputKey = (WritableComparable)
+ handler = new OutputHandler<K2, V2>(output, reporter);
+ K2 outputKey = (K2)
ReflectionUtils.newInstance(outputKeyClass, conf);
- Writable outputValue = (Writable)
+ V2 outputValue = (V2)
ReflectionUtils.newInstance(outputValueClass, conf);
- downlink = new BinaryProtocol(clientSocket, handler,
+ downlink = new BinaryProtocol<K1, V1, K2, V2>(clientSocket, handler,
outputKey, outputValue, conf);
downlink.start();
downlink.setJobConf(conf);
@@ -97,7 +99,7 @@
* application.
* @return the downlink proxy
*/
- DownwardProtocol getDownlink() {
+ DownwardProtocol<K1, V1> getDownlink() {
return downlink;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java Thu Aug 16 11:45:49 2007
@@ -39,7 +39,10 @@
/**
* This protocol is a binary implementation of the Pipes protocol.
*/
-class BinaryProtocol implements DownwardProtocol {
+class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable,
+ K2 extends WritableComparable, V2 extends Writable>
+ implements DownwardProtocol<K1, V1> {
+
public static final int CURRENT_PROTOCOL_VERSION = 0;
private DataOutputStream stream;
private DataOutputBuffer buffer = new DataOutputBuffer();
@@ -72,15 +75,18 @@
}
}
- private static class UplinkReaderThread extends Thread {
+ private static class UplinkReaderThread<K2 extends WritableComparable,
+ V2 extends Writable>
+ extends Thread {
+
private DataInputStream inStream;
- private UpwardProtocol handler;
- private WritableComparable key;
- private Writable value;
+ private UpwardProtocol<K2, V2> handler;
+ private K2 key;
+ private V2 value;
- public UplinkReaderThread(InputStream stream, UpwardProtocol handler,
- WritableComparable key, Writable value
- ) throws IOException{
+ public UplinkReaderThread(InputStream stream,
+ UpwardProtocol<K2, V2> handler,
+ K2 key, V2 value) throws IOException{
inStream = new DataInputStream(stream);
this.handler = handler;
this.key = key;
@@ -192,9 +198,9 @@
* @throws IOException
*/
public BinaryProtocol(Socket sock,
- UpwardProtocol handler,
- WritableComparable key,
- Writable value,
+ UpwardProtocol<K2, V2> handler,
+ K2 key,
+ V2 value,
JobConf config) throws IOException {
OutputStream raw = sock.getOutputStream();
// If we are debugging, save a copy of the downlink commands to a file
@@ -202,7 +208,8 @@
raw = new TeeOutputStream("downlink.data", raw);
}
stream = new DataOutputStream(raw);
- uplink = new UplinkReaderThread(sock.getInputStream(), handler, key, value);
+ uplink = new UplinkReaderThread<K2, V2>(sock.getInputStream(),
+ handler, key, value);
uplink.setName("pipe-uplink-handler");
uplink.start();
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java Thu Aug 16 11:45:49 2007
@@ -30,7 +30,7 @@
* All of these calls are asynchronous and return before the message has been
* processed.
*/
-interface DownwardProtocol {
+interface DownwardProtocol<K extends WritableComparable, V extends Writable> {
/**
* Start communication
* @throws IOException
@@ -68,7 +68,7 @@
* @param value The record's value
* @throws IOException
*/
- void mapItem(WritableComparable key, Writable value) throws IOException;
+ void mapItem(K key, V value) throws IOException;
/**
* Run a reduce task in the child
@@ -83,14 +83,14 @@
* @param key the new key
* @throws IOException
*/
- void reduceKey(WritableComparable key) throws IOException;
+ void reduceKey(K key) throws IOException;
/**
* The reduce should be given a new value
* @param value the new value
* @throws IOException
*/
- void reduceValue(Writable value) throws IOException;
+ void reduceValue(V value) throws IOException;
/**
* The task has no more input coming, but it should finish processing it's
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java Thu Aug 16 11:45:49 2007
@@ -28,9 +28,12 @@
/**
* Handles the upward (C++ to Java) messages from the application.
*/
-class OutputHandler implements UpwardProtocol {
+class OutputHandler<K extends WritableComparable,
+ V extends Writable>
+ implements UpwardProtocol<K, V> {
+
private Reporter reporter;
- private OutputCollector collector;
+ private OutputCollector<K, V> collector;
private float progressValue = 0.0f;
private boolean done = false;
private Throwable exception = null;
@@ -40,7 +43,7 @@
* @param collector the "real" collector that takes the output
* @param reporter the reporter for reporting progress
*/
- public OutputHandler(OutputCollector collector, Reporter reporter) {
+ public OutputHandler(OutputCollector<K, V> collector, Reporter reporter) {
this.reporter = reporter;
this.collector = collector;
}
@@ -48,16 +51,15 @@
/**
* The task output a normal record.
*/
- public void output(WritableComparable key,
- Writable value) throws IOException {
+ public void output(K key, V value) throws IOException {
collector.collect(key, value);
}
/**
* The task output a record with a partition number attached.
*/
- public void partitionedOutput(int reduce, WritableComparable key,
- Writable value) throws IOException {
+ public void partitionedOutput(int reduce, K key,
+ V value) throws IOException {
PipesPartitioner.setNextPartition(reduce);
collector.collect(key, value);
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java Thu Aug 16 11:45:49 2007
@@ -31,7 +31,9 @@
/**
* An adaptor to run a C++ mapper.
*/
-class PipesMapRunner extends MapRunner {
+class PipesMapRunner<K1 extends WritableComparable, V1 extends Writable,
+ K2 extends WritableComparable, V2 extends Writable>
+ extends MapRunner<K1, V1, K2, V2> {
private JobConf job;
/**
@@ -48,26 +50,25 @@
* @param output the object to collect the outputs of the map
* @param reporter the object to update with status
*/
- public void run(RecordReader input, OutputCollector output,
- Reporter reporter
- ) throws IOException {
- Application application = null;
+ public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
+ Reporter reporter) throws IOException {
+ Application<K1, V1, K2, V2> application = null;
try {
- application = new Application(job, output, reporter,
+ application = new Application<K1, V1, K2, V2>(job, output, reporter,
job.getMapOutputKeyClass(),
job.getMapOutputValueClass());
} catch (InterruptedException ie) {
throw new RuntimeException("interrupted", ie);
}
- DownwardProtocol downlink = application.getDownlink();
+ DownwardProtocol<K1, V1> downlink = application.getDownlink();
boolean isJavaInput = Submitter.getIsJavaRecordReader(job);
downlink.runMap(reporter.getInputSplit(),
job.getNumReduceTasks(), isJavaInput);
try {
if (isJavaInput) {
// allocate key & value instances that are re-used for all entries
- WritableComparable key = input.createKey();
- Writable value = input.createValue();
+ K1 key = input.createKey();
+ V1 value = input.createValue();
downlink.setInputTypes(key.getClass().getName(),
value.getClass().getName());
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesPartitioner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesPartitioner.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesPartitioner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesPartitioner.java Thu Aug 16 11:45:49 2007
@@ -28,10 +28,14 @@
* This partitioner is one that can either be set manually per a record or it
* can fall back onto a Java partitioner that was set by the user.
*/
-class PipesPartitioner implements Partitioner {
+class PipesPartitioner<K extends WritableComparable,
+ V extends Writable>
+ implements Partitioner<K, V> {
+
private static ThreadLocal<Integer> cache = new ThreadLocal<Integer>();
- private Partitioner part = null;
+ private Partitioner<K, V> part = null;
+ @SuppressWarnings("unchecked")
public void configure(JobConf conf) {
part = (Partitioner)
ReflectionUtils.newInstance(Submitter.getJavaPartitioner(conf), conf);
@@ -52,7 +56,7 @@
* @param value the value to partition
* @param numPartitions the number of reduces
*/
- public int getPartition(WritableComparable key, Writable value,
+ public int getPartition(K key, V value,
int numPartitions) {
Integer result = cache.get();
if (result == null) {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java Thu Aug 16 11:45:49 2007
@@ -33,11 +33,13 @@
/**
* This class is used to talk to a C++ reduce task.
*/
-class PipesReducer implements Reducer {
+class PipesReducer<K2 extends WritableComparable, V2 extends Writable,
+ K3 extends WritableComparable, V3 extends Writable>
+ implements Reducer<K2, V2, K3, V3> {
private static final Log LOG= LogFactory.getLog(PipesReducer.class.getName());
private JobConf job;
- private Application application = null;
- private DownwardProtocol downlink = null;
+ private Application<K2, V2, K3, V3> application = null;
+ private DownwardProtocol<K2, V2> downlink = null;
private boolean isOk = true;
public void configure(JobConf job) {
@@ -48,23 +50,23 @@
* Process all of the keys and values. Start up the application if we haven't
* started it yet.
*/
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter
+ public void reduce(K2 key, Iterator<V2> values,
+ OutputCollector<K3, V3> output, Reporter reporter
) throws IOException {
isOk = false;
startApplication(output, reporter);
downlink.reduceKey(key);
while (values.hasNext()) {
- downlink.reduceValue((Writable) values.next());
+ downlink.reduceValue(values.next());
}
isOk = true;
}
- private void startApplication(OutputCollector output, Reporter reporter) throws IOException {
+ private void startApplication(OutputCollector<K3, V3> output, Reporter reporter) throws IOException {
if (application == null) {
try {
LOG.info("starting application");
- application = new Application(job, output, reporter,
+ application = new Application<K2, V2, K3, V3>(job, output, reporter,
job.getOutputKeyClass(),
job.getOutputValueClass());
downlink = application.getDownlink();
@@ -82,9 +84,9 @@
public void close() throws IOException {
// if we haven't started the application, we have nothing to do
if (isOk) {
- OutputCollector nullCollector = new OutputCollector() {
- public void collect(WritableComparable key,
- Writable value) throws IOException {
+ OutputCollector<K3, V3> nullCollector = new OutputCollector<K3, V3>() {
+ public void collect(K3 key,
+ V3 value) throws IOException {
// NULL
}
};
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/UpwardProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/UpwardProtocol.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/UpwardProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/UpwardProtocol.java Thu Aug 16 11:45:49 2007
@@ -26,14 +26,14 @@
* The interface for the messages that can come up from the child. All of these
* calls are asynchronous and return before the message has been processed.
*/
-interface UpwardProtocol {
+interface UpwardProtocol<K extends WritableComparable, V extends Writable> {
/**
* Output a record from the child.
* @param key the record's key
* @param value the record's value
* @throws IOException
*/
- void output(WritableComparable key, Writable value) throws IOException;
+ void output(K key, V value) throws IOException;
/**
* Map functions where the application has defined a partition function
@@ -43,8 +43,8 @@
* @param value the record's value
* @throws IOException
*/
- void partitionedOutput(int reduce, WritableComparable key,
- Writable value) throws IOException;
+ void partitionedOutput(int reduce, K key,
+ V value) throws IOException;
/**
* Update the task's status message
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java Thu Aug 16 11:45:49 2007
@@ -18,30 +18,32 @@
package org.apache.hadoop.tools;
-import java.io.*;
-
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
import java.util.Random;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.fs.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.util.CopyFiles;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.LongSumReducer;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
-
-import java.util.regex.Pattern;
-import java.util.regex.Matcher;
+import org.apache.hadoop.util.CopyFiles;
/**
* Logalyzer: A utility tool for archiving and analyzing hadoop logs.
@@ -62,7 +64,9 @@
private static Configuration fsConfig = new Configuration();
/** A {@link Mapper} that extracts text matching a regular expression. */
- public static class LogRegexMapper extends MapReduceBase implements Mapper {
+ public static class LogRegexMapper<K extends WritableComparable>
+ extends MapReduceBase
+ implements Mapper<K, Text, Text, LongWritable> {
private Pattern pattern;
@@ -70,13 +74,14 @@
pattern = Pattern.compile(job.get("mapred.mapper.regex"));
}
- public void map(WritableComparable key, Writable value,
- OutputCollector output, Reporter reporter)
+ public void map(K key, Text value,
+ OutputCollector<Text, LongWritable> output,
+ Reporter reporter)
throws IOException {
- String text = ((Text)value).toString();
+ String text = value.toString();
Matcher matcher = pattern.matcher(text);
while (matcher.find()) {
- output.collect((Text)value, new LongWritable(1));
+ output.collect(value, new LongWritable(1));
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Thu Aug 16 11:45:49 2007
@@ -182,7 +182,7 @@
* DFSCopyFilesMapper: The mapper for copying files from the DFS.
*/
public static class FSCopyFilesMapper extends CopyFilesMapper
- implements Mapper
+ implements Mapper<Text, Writable, WritableComparable, Text>
{
private int sizeBuf = 4096;
private FileSystem srcFileSys = null;
@@ -397,11 +397,11 @@
* @param out not-used.
* @param reporter
*/
- public void map(WritableComparable key,
+ public void map(Text key,
Writable value,
- OutputCollector out,
+ OutputCollector<WritableComparable, Text> out,
Reporter reporter) throws IOException {
- String src = ((Text) key).toString();
+ String src = key.toString();
try {
copy(src, reporter);
} catch (IOException except) {
Modified: lucene/hadoop/trunk/src/test/checkstyle.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/checkstyle.xml?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/checkstyle.xml (original)
+++ lucene/hadoop/trunk/src/test/checkstyle.xml Thu Aug 16 11:45:49 2007
@@ -52,6 +52,7 @@
<!-- See http://checkstyle.sf.net/config_javadoc.html -->
<module name="JavadocType">
<property name="scope" value="public"/>
+ <property name="allowMissingParamTags" value="true"/>
</module>
<module name="JavadocStyle"/>
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java Thu Aug 16 11:45:49 2007
@@ -44,7 +44,9 @@
* </ul>
*
*/
-public class AccumulatingReducer extends MapReduceBase implements Reducer {
+public class AccumulatingReducer extends MapReduceBase
+ implements Reducer<UTF8, UTF8, UTF8, UTF8> {
+
protected String hostName;
public AccumulatingReducer () {
@@ -57,12 +59,12 @@
TaskTracker.LOG.info("Starting AccumulatingReducer on " + hostName);
}
- public void reduce(WritableComparable key,
- Iterator values,
- OutputCollector output,
+ public void reduce(UTF8 key,
+ Iterator<UTF8> values,
+ OutputCollector<UTF8, UTF8> output,
Reporter reporter
) throws IOException {
- String field = ((UTF8) key).toString();
+ String field = key.toString();
reporter.setStatus("starting " + field + " ::host = " + hostName);
@@ -70,7 +72,7 @@
if (field.startsWith("s:")) {
String sSum = "";
while (values.hasNext())
- sSum += ((UTF8) values.next()).toString() + ";";
+ sSum += values.next().toString() + ";";
output.collect(key, new UTF8(sSum));
reporter.setStatus("finished " + field + " ::host = " + hostName);
return;
@@ -79,7 +81,7 @@
if (field.startsWith("f:")) {
float fSum = 0;
while (values.hasNext())
- fSum += Float.parseFloat(((UTF8) values.next()).toString());
+ fSum += Float.parseFloat(values.next().toString());
output.collect(key, new UTF8(String.valueOf(fSum)));
reporter.setStatus("finished " + field + " ::host = " + hostName);
return;
@@ -88,7 +90,7 @@
if (field.startsWith("l:")) {
long lSum = 0;
while (values.hasNext()) {
- lSum += Long.parseLong(((UTF8) values.next()).toString());
+ lSum += Long.parseLong(values.next().toString());
}
output.collect(key, new UTF8(String.valueOf(lSum)));
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java Thu Aug 16 11:45:49 2007
@@ -159,7 +159,7 @@
super(fsConfig);
}
- void collectStats(OutputCollector output,
+ void collectStats(OutputCollector<UTF8, UTF8> output,
String name,
long execTime,
Object objSize) throws IOException {
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java Thu Aug 16 11:45:49 2007
@@ -163,7 +163,7 @@
return new Long(actualSize);
}
- void collectStats(OutputCollector output,
+ void collectStats(OutputCollector<UTF8, UTF8> output,
String name,
long execTime,
Object corruptedBlock) throws IOException {
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java Thu Aug 16 11:45:49 2007
@@ -39,7 +39,9 @@
* statistics data to be collected by subsequent reducers.
*
*/
-public abstract class IOMapperBase extends Configured implements Mapper {
+public abstract class IOMapperBase extends Configured
+ implements Mapper<UTF8, LongWritable, UTF8, UTF8> {
+
protected byte[] buffer;
protected int bufferSize;
protected FileSystem fs;
@@ -91,7 +93,7 @@
* @param doIOReturnValue value returned by {@link #doIO(Reporter,String,long)}
* @throws IOException
*/
- abstract void collectStats(OutputCollector output,
+ abstract void collectStats(OutputCollector<UTF8, UTF8> output,
String name,
long execTime,
Object doIOReturnValue) throws IOException;
@@ -109,12 +111,12 @@
* {@link #collectStats(OutputCollector,String,long,Object)}
* is called to prepare stat data for a subsequent reducer.
*/
- public void map(WritableComparable key,
- Writable value,
- OutputCollector output,
+ public void map(UTF8 key,
+ LongWritable value,
+ OutputCollector<UTF8, UTF8> output,
Reporter reporter) throws IOException {
- String name = ((UTF8)key).toString();
- long longValue = ((LongWritable)value).get();
+ String name = key.toString();
+ long longValue = value.get();
reporter.setStatus("starting " + name + " ::host = " + hostName);
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java Thu Aug 16 11:45:49 2007
@@ -154,7 +154,7 @@
super(fsConfig);
}
- void collectStats(OutputCollector output,
+ void collectStats(OutputCollector<UTF8, UTF8> output,
String name,
long execTime,
Object objSize) throws IOException {
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java Thu Aug 16 11:45:49 2007
@@ -106,7 +106,9 @@
LOG.info("created control file for: "+totalSize+" bytes");
}
- public static class WriteMapper extends Configured implements Mapper {
+ public static class WriteMapper extends Configured
+ implements Mapper<UTF8, LongWritable, UTF8, LongWritable> {
+
private Random random = new Random();
private byte[] buffer = new byte[BUFFER_SIZE];
private FileSystem fs;
@@ -132,11 +134,13 @@
fastCheck = job.getBoolean("fs.test.fastCheck", false);
}
- public void map(WritableComparable key, Writable value,
- OutputCollector collector, Reporter reporter)
+ public void map(UTF8 key, LongWritable value,
+ OutputCollector<UTF8, LongWritable> collector,
+ Reporter reporter)
throws IOException {
- String name = ((UTF8)key).toString();
- long size = ((LongWritable)value).get();
+
+ String name = key.toString();
+ long size = value.get();
long seed = Long.parseLong(name);
random.setSeed(seed);
@@ -200,7 +204,9 @@
JobClient.runJob(job);
}
- public static class ReadMapper extends Configured implements Mapper {
+ public static class ReadMapper extends Configured
+ implements Mapper<UTF8, LongWritable, UTF8, LongWritable> {
+
private Random random = new Random();
private byte[] buffer = new byte[BUFFER_SIZE];
private byte[] check = new byte[BUFFER_SIZE];
@@ -224,11 +230,13 @@
fastCheck = job.getBoolean("fs.test.fastCheck", false);
}
- public void map(WritableComparable key, Writable value,
- OutputCollector collector, Reporter reporter)
+ public void map(UTF8 key, LongWritable value,
+ OutputCollector<UTF8, LongWritable> collector,
+ Reporter reporter)
throws IOException {
- String name = ((UTF8)key).toString();
- long size = ((LongWritable)value).get();
+
+ String name = key.toString();
+ long size = value.get();
long seed = Long.parseLong(name);
random.setSeed(seed);
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRBench.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRBench.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRBench.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRBench.java Thu Aug 16 11:45:49 2007
@@ -51,9 +51,12 @@
* Takes input format as text lines, runs some processing on it and
* writes out data as text again.
*/
- public static class Map extends MapReduceBase implements Mapper {
- public void map(WritableComparable key, Writable value,
- OutputCollector output, Reporter reporter) throws IOException
+ public static class Map extends MapReduceBase
+ implements Mapper<WritableComparable, UTF8, UTF8, UTF8> {
+
+ public void map(WritableComparable key, UTF8 value,
+ OutputCollector<UTF8, UTF8> output,
+ Reporter reporter) throws IOException
{
String line = value.toString();
output.collect(new UTF8(process(line)), new UTF8(""));
@@ -66,9 +69,11 @@
/**
* Ignores the key and writes values to the output.
*/
- public static class Reduce extends MapReduceBase implements Reducer {
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter) throws IOException
+ public static class Reduce extends MapReduceBase
+ implements Reducer<UTF8, UTF8, UTF8, UTF8> {
+
+ public void reduce(UTF8 key, Iterator<UTF8> values,
+ OutputCollector<UTF8, UTF8> output, Reporter reporter) throws IOException
{
while(values.hasNext()) {
output.collect(key, new UTF8(values.next().toString()));
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java Thu Aug 16 11:45:49 2007
@@ -21,9 +21,9 @@
import java.io.*;
import java.util.*;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -48,7 +48,9 @@
* archives/files are set and then are checked in the map if they have been
* localized or not.
*/
- public static class MapClass extends MapReduceBase implements Mapper {
+ public static class MapClass extends MapReduceBase
+ implements Mapper<LongWritable, Text, Text, IntWritable> {
+
JobConf conf;
private final static IntWritable one = new IntWritable(1);
@@ -97,9 +99,10 @@
}
}
- public void map(WritableComparable key, Writable value,
- OutputCollector output, Reporter reporter) throws IOException {
- String line = ((Text) value).toString();
+ public void map(LongWritable key, Text value,
+ OutputCollector<Text, IntWritable> output,
+ Reporter reporter) throws IOException {
+ String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
@@ -112,13 +115,15 @@
/**
* A reducer class that just emits the sum of the input values.
*/
- public static class ReduceClass extends MapReduceBase implements Reducer {
+ public static class ReduceClass extends MapReduceBase
+ implements Reducer<Text, IntWritable, Text, IntWritable> {
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter) throws IOException {
+ public void reduce(Text key, Iterator<IntWritable> values,
+ OutputCollector<Text, IntWritable> output,
+ Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
- sum += ((IntWritable) values.next()).get();
+ sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java Thu Aug 16 11:45:49 2007
@@ -24,6 +24,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -39,7 +40,8 @@
* Mappper class for Pi estimation.
*/
- public static class PiMapper extends MapReduceBase implements Mapper {
+ public static class PiMapper extends MapReduceBase
+ implements Mapper<IntWritable, Writable, IntWritable, IntWritable> {
static Random r = new Random();
@@ -49,11 +51,11 @@
* @param out
* @param reporter
*/
- public void map(WritableComparable key,
+ public void map(IntWritable key,
Writable val,
- OutputCollector out,
+ OutputCollector<IntWritable, IntWritable> out,
Reporter reporter) throws IOException {
- int nSamples = ((IntWritable) key).get();
+ int nSamples = key.get();
for(int idx = 0; idx < nSamples; idx++) {
double x = r.nextDouble();
double y = r.nextDouble();
@@ -74,7 +76,9 @@
}
}
- public static class PiReducer extends MapReduceBase implements Reducer {
+ public static class PiReducer extends MapReduceBase
+ implements Reducer<IntWritable, IntWritable, WritableComparable, Writable> {
+
int numInside = 0;
int numOutside = 0;
JobConf conf;
@@ -91,18 +95,18 @@
* @param output
* @param reporter
*/
- public void reduce(WritableComparable key,
- Iterator values,
- OutputCollector output,
+ public void reduce(IntWritable key,
+ Iterator<IntWritable> values,
+ OutputCollector<WritableComparable, Writable> output,
Reporter reporter) throws IOException {
- if (((IntWritable)key).get() == 1) {
+ if (key.get() == 1) {
while (values.hasNext()) {
- int num = ((IntWritable)values.next()).get();
+ int num = values.next().get();
numInside += num;
}
} else {
while (values.hasNext()) {
- int num = ((IntWritable)values.next()).get();
+ int num = values.next().get();
numOutside += num;
}
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/SortValidator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/SortValidator.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/SortValidator.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/SortValidator.java Thu Aug 16 11:45:49 2007
@@ -31,6 +31,7 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.RecordStatsWritable;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.fs.*;
@@ -129,10 +130,13 @@
public int getChecksum() { return checksum; }
}
- public static class Map extends MapReduceBase implements Mapper {
+ public static class Map extends MapReduceBase
+ implements Mapper<BytesWritable, BytesWritable,
+ IntWritable, RecordStatsWritable> {
+
private IntWritable key = null;
private BytesWritable prevKey = null;
- private Partitioner partitioner = null;
+ private Partitioner<BytesWritable, BytesWritable> partitioner = null;
private int partition = -1;
private int noSortReducers = -1;
private long recordId = -1;
@@ -142,7 +146,7 @@
key = deduceInputFile(job);
if (key == sortOutput) {
- partitioner = new HashPartitioner();
+ partitioner = new HashPartitioner<BytesWritable, BytesWritable>();
// Figure the 'current' partition and no. of reduces of the 'sort'
try {
@@ -159,12 +163,12 @@
}
}
- public void map(WritableComparable key,
- Writable value,
- OutputCollector output,
+ public void map(BytesWritable key,
+ BytesWritable value,
+ OutputCollector<IntWritable, RecordStatsWritable> output,
Reporter reporter) throws IOException {
- BytesWritable bwKey = (BytesWritable)key;
- BytesWritable bwValue = (BytesWritable)value;
+ BytesWritable bwKey = key;
+ BytesWritable bwValue = value;
++recordId;
if (this.key == sortOutput) {
@@ -201,15 +205,19 @@
}
}
- public static class Reduce extends MapReduceBase implements Reducer {
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output,
+ public static class Reduce extends MapReduceBase
+ implements Reducer<IntWritable, RecordStatsWritable,
+ IntWritable, RecordStatsWritable> {
+
+ public void reduce(IntWritable key, Iterator<RecordStatsWritable> values,
+ OutputCollector<IntWritable,
+ RecordStatsWritable> output,
Reporter reporter) throws IOException {
long bytes = 0;
long records = 0;
int xor = 0;
while (values.hasNext()) {
- RecordStatsWritable stats = ((RecordStatsWritable)values.next());
+ RecordStatsWritable stats = values.next();
bytes += stats.getBytes();
records += stats.getRecords();
xor ^= stats.getChecksum();
@@ -308,7 +316,10 @@
*/
public static class RecordChecker {
- public static class Map extends MapReduceBase implements Mapper {
+ public static class Map extends MapReduceBase
+ implements Mapper<BytesWritable, BytesWritable,
+ BytesWritable, IntWritable> {
+
private IntWritable value = null;
public void configure(JobConf job) {
@@ -316,27 +327,29 @@
value = deduceInputFile(job);
}
- public void map(WritableComparable key,
- Writable value,
- OutputCollector output,
+ public void map(BytesWritable key,
+ BytesWritable value,
+ OutputCollector<BytesWritable, IntWritable> output,
Reporter reporter) throws IOException {
// newKey = (key, value)
- BytesWritable keyValue =
- new BytesWritable(pair((BytesWritable)key, (BytesWritable)value));
+ BytesWritable keyValue = new BytesWritable(pair(key, value));
// output (newKey, value)
output.collect(keyValue, this.value);
}
}
- public static class Reduce extends MapReduceBase implements Reducer {
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output,
+ public static class Reduce extends MapReduceBase
+ implements Reducer<BytesWritable, IntWritable,
+ BytesWritable, IntWritable> {
+
+ public void reduce(BytesWritable key, Iterator<IntWritable> values,
+ OutputCollector<BytesWritable, IntWritable> output,
Reporter reporter) throws IOException {
int ones = 0;
int twos = 0;
while (values.hasNext()) {
- IntWritable count = ((IntWritable) values.next());
+ IntWritable count = values.next();
if (count.equals(sortInput)) {
++ones;
} else if (count.equals(sortOutput)) {
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java Thu Aug 16 11:45:49 2007
@@ -51,16 +51,19 @@
* mediate key value pairs are ordered by {input key, value}.
* Think of the random value as a timestamp associated with the record.
*/
- static class RandomGenMapper implements Mapper {
+ static class RandomGenMapper
+ implements Mapper<IntWritable, Writable, IntWritable, IntWritable> {
+
public void configure(JobConf job) {
}
- public void map(WritableComparable key, Writable value,
- OutputCollector out, Reporter reporter) throws IOException {
+ public void map(IntWritable key, Writable value,
+ OutputCollector<IntWritable, IntWritable> out,
+ Reporter reporter) throws IOException {
int num_values = 5;
for(int i = 0; i < num_values; ++i) {
int val = rng.nextInt(num_values);
- int compositeKey = ((IntWritable)(key)).get() * 100 + val;
+ int compositeKey = key.get() * 100 + val;
out.collect(new IntWritable(compositeKey), new IntWritable(val));
}
}
@@ -72,12 +75,16 @@
/**
* Your basic identity mapper.
*/
- static class IdentityMapper implements Mapper {
+ static class IdentityMapper
+ implements Mapper<WritableComparable, Writable,
+ WritableComparable, Writable> {
+
public void configure(JobConf job) {
}
public void map(WritableComparable key, Writable value,
- OutputCollector out, Reporter reporter) throws IOException {
+ OutputCollector<WritableComparable, Writable> out,
+ Reporter reporter) throws IOException {
out.collect(key, value);
}
@@ -88,14 +95,17 @@
/**
* Checks whether keys are in ascending order.
*/
- static class AscendingKeysReducer implements Reducer {
+ static class AscendingKeysReducer
+ implements Reducer<IntWritable, Writable, IntWritable, Text> {
+
public void configure(JobConf job) {}
// keep track of the last key we've seen
private int lastKey = Integer.MIN_VALUE;
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector out, Reporter reporter) throws IOException {
- int currentKey = ((IntWritable)(key)).get();
+ public void reduce(IntWritable key, Iterator<Writable> values,
+ OutputCollector<IntWritable, Text> out,
+ Reporter reporter) throws IOException {
+ int currentKey = key.get();
// keys should be in ascending order
if (currentKey < lastKey) {
fail("Keys not in sorted ascending order");
@@ -110,13 +120,15 @@
/**
* Checks whether keys are in ascending order.
*/
- static class DescendingKeysReducer implements Reducer {
+ static class DescendingKeysReducer
+ implements Reducer<IntWritable, Writable, IntWritable, Text> {
public void configure(JobConf job) {}
// keep track of the last key we've seen
private int lastKey = Integer.MAX_VALUE;
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector out, Reporter reporter) throws IOException {
+ public void reduce(IntWritable key, Iterator<Writable> values,
+ OutputCollector<IntWritable, Text> out,
+ Reporter reporter) throws IOException {
int currentKey = ((IntWritable)(key)).get();
// keys should be in descending order
if (currentKey > lastKey) {
@@ -134,19 +146,20 @@
* should have 5 values if the grouping is correct). It also checks whether
* the keys themselves are in ascending order.
*/
- static class AscendingGroupReducer implements Reducer {
+ static class AscendingGroupReducer
+ implements Reducer<IntWritable, IntWritable, IntWritable, Text> {
public void configure(JobConf job) {
}
// keep track of the last key we've seen
private int lastKey = Integer.MIN_VALUE;
- public void reduce(WritableComparable key,
- Iterator values,
- OutputCollector out,
+ public void reduce(IntWritable key,
+ Iterator<IntWritable> values,
+ OutputCollector<IntWritable, Text> out,
Reporter reporter) throws IOException {
// check key order
- int currentKey = ((IntWritable)(key)).get();
+ int currentKey = key.get();
if (currentKey < lastKey) {
fail("Keys not in sorted ascending order");
}
@@ -155,7 +168,7 @@
IntWritable previous = new IntWritable(Integer.MIN_VALUE);
int valueCount = 0;
while (values.hasNext()) {
- IntWritable current = (IntWritable) values.next();
+ IntWritable current = values.next();
// Check that the values are sorted
if (current.compareTo(previous) < 0)
@@ -177,19 +190,20 @@
* whether they are correctly grouped by key (i.e. each call to reduce
* should have 5 values if the grouping is correct).
*/
- static class DescendingGroupReducer implements Reducer {
+ static class DescendingGroupReducer
+ implements Reducer<IntWritable, IntWritable, IntWritable, Text> {
public void configure(JobConf job) {
}
// keep track of the last key we've seen
private int lastKey = Integer.MAX_VALUE;
- public void reduce(WritableComparable key,
- Iterator values,
- OutputCollector out,
+ public void reduce(IntWritable key,
+ Iterator<IntWritable> values,
+ OutputCollector<IntWritable, Text> out,
Reporter reporter) throws IOException {
// check key order
- int currentKey = ((IntWritable)(key)).get();
+ int currentKey = key.get();
if (currentKey > lastKey) {
fail("Keys not in sorted descending order");
}
@@ -198,7 +212,7 @@
IntWritable previous = new IntWritable(Integer.MAX_VALUE);
int valueCount = 0;
while (values.hasNext()) {
- IntWritable current = (IntWritable) values.next();
+ IntWritable current = values.next();
// Check that the values are sorted
if (current.compareTo(previous) > 0)
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java Thu Aug 16 11:45:49 2007
@@ -81,7 +81,7 @@
}
// try splitting the file in a variety of sizes
- TextInputFormat format = new KeyValueTextInputFormat();
+ KeyValueTextInputFormat format = new KeyValueTextInputFormat();
format.configure(job);
for (int i = 0; i < 3; i++) {
int numSplits = random.nextInt(MAX_LENGTH/20)+1;
@@ -93,14 +93,14 @@
BitSet bits = new BitSet(length);
for (int j = 0; j < splits.length; j++) {
LOG.debug("split["+j+"]= " + splits[j]);
- RecordReader reader =
+ RecordReader<Text, Text> reader =
format.getRecordReader(splits[j], job, reporter);
Class readerClass = reader.getClass();
assertEquals("reader class is KeyValueLineRecordReader.", KeyValueLineRecordReader.class, readerClass);
- Writable key = reader.createKey();
+ Text key = reader.createKey();
Class keyClass = key.getClass();
- Writable value = reader.createValue();
+ Text value = reader.createValue();
Class valueClass = value.getClass();
assertEquals("Key class is Text.", Text.class, keyClass);
assertEquals("Value class is Text.", Text.class, valueClass);
@@ -187,14 +187,14 @@
private static final Reporter voidReporter = Reporter.NULL;
- private static List<Text> readSplit(InputFormat format,
+ private static List<Text> readSplit(KeyValueTextInputFormat format,
InputSplit split,
JobConf job) throws IOException {
List<Text> result = new ArrayList<Text>();
- RecordReader reader = format.getRecordReader(split, job,
+ RecordReader<Text, Text> reader = format.getRecordReader(split, job,
voidReporter);
- Text key = (Text) reader.createKey();
- Text value = (Text) reader.createValue();
+ Text key = reader.createKey();
+ Text value = reader.createValue();
while (reader.next(key, value)) {
result.add(value);
value = (Text) reader.createValue();
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputType.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputType.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputType.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputType.java Thu Aug 16 11:45:49 2007
@@ -38,16 +38,16 @@
* type specified in conf will be anything but.
*/
- static class TextGen implements Mapper {
+ static class TextGen
+ implements Mapper<WritableComparable, Writable, Text, Text> {
+
public void configure(JobConf job) {
}
- public void map(WritableComparable key, Writable val, OutputCollector out,
+ public void map(WritableComparable key, Writable val,
+ OutputCollector<Text, Text> out,
Reporter reporter) throws IOException {
- key = new Text("Hello");
- val = new Text("World");
-
- out.collect(key, val);
+ out.collect(new Text("Hello"), new Text("World"));
}
public void close() {
@@ -57,14 +57,15 @@
/** A do-nothing reducer class. We won't get this far, really.
*
*/
- static class TextReduce implements Reducer {
+ static class TextReduce
+ implements Reducer<Text, Text, Text, Text> {
public void configure(JobConf job) {
}
- public void reduce(WritableComparable key,
- Iterator values,
- OutputCollector out,
+ public void reduce(Text key,
+ Iterator<Text> values,
+ OutputCollector<Text, Text> out,
Reporter reporter) throws IOException {
out.collect(new Text("Test"), new Text("Me"));
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Thu Aug 16 11:45:49 2007
@@ -83,13 +83,17 @@
* of numbers in random order, but where each number appears
* as many times as we were instructed.
*/
- static class RandomGenMapper implements Mapper {
+ static class RandomGenMapper
+ implements Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
+
public void configure(JobConf job) {
}
- public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
- int randomVal = ((IntWritable) key).get();
- int randomCount = ((IntWritable) val).get();
+ public void map(IntWritable key, IntWritable val,
+ OutputCollector<IntWritable, IntWritable> out,
+ Reporter reporter) throws IOException {
+ int randomVal = key.get();
+ int randomCount = key.get();
for (int i = 0; i < randomCount; i++) {
out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
@@ -100,13 +104,17 @@
}
/**
*/
- static class RandomGenReducer implements Reducer {
+ static class RandomGenReducer
+ implements Reducer<IntWritable, IntWritable, Text, Text> {
+
public void configure(JobConf job) {
}
- public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
+ public void reduce(IntWritable key, Iterator<IntWritable> it,
+ OutputCollector<Text, Text> out,
+ Reporter reporter) throws IOException {
while (it.hasNext()) {
- int val = ((IntWritable) it.next()).get();
+ int val = it.next().get();
out.collect(new Text("" + val), new Text(""));
}
}
@@ -130,26 +138,31 @@
* Each key here is a random number, and the count is the
* number of times the number was emitted.
*/
- static class RandomCheckMapper implements Mapper {
+ static class RandomCheckMapper
+ implements Mapper<WritableComparable, Text, IntWritable, IntWritable> {
+
public void configure(JobConf job) {
}
- public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
- Text str = (Text) val;
-
- out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1));
+ public void map(WritableComparable key, Text val,
+ OutputCollector<IntWritable, IntWritable> out,
+ Reporter reporter) throws IOException {
+ out.collect(new IntWritable(Integer.parseInt(val.toString().trim())), new IntWritable(1));
}
public void close() {
}
}
/**
*/
- static class RandomCheckReducer implements Reducer {
+ static class RandomCheckReducer
+ implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
public void configure(JobConf job) {
}
- public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
- int keyint = ((IntWritable) key).get();
+ public void reduce(IntWritable key, Iterator<IntWritable> it,
+ OutputCollector<IntWritable, IntWritable> out,
+ Reporter reporter) throws IOException {
+ int keyint = key.get();
int count = 0;
while (it.hasNext()) {
it.next();
@@ -169,28 +182,35 @@
* Thus, the map() function is just the identity function
* and reduce() just sums. Nothing to see here!
*/
- static class MergeMapper implements Mapper {
+ static class MergeMapper
+ implements Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
+
public void configure(JobConf job) {
}
- public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
- int keyint = ((IntWritable) key).get();
- int valint = ((IntWritable) val).get();
+ public void map(IntWritable key, IntWritable val,
+ OutputCollector<IntWritable, IntWritable> out,
+ Reporter reporter) throws IOException {
+ int keyint = key.get();
+ int valint = val.get();
out.collect(new IntWritable(keyint), new IntWritable(valint));
}
public void close() {
}
}
- static class MergeReducer implements Reducer {
+ static class MergeReducer
+ implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
public void configure(JobConf job) {
}
- public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
- int keyint = ((IntWritable) key).get();
+ public void reduce(IntWritable key, Iterator<IntWritable> it,
+ OutputCollector<IntWritable, IntWritable> out,
+ Reporter reporter) throws IOException {
+ int keyint = key.get();
int total = 0;
while (it.hasNext()) {
- total += ((IntWritable) it.next()).get();
+ total += it.next().get();
}
out.collect(new IntWritable(keyint), new IntWritable(total));
}
@@ -214,15 +234,16 @@
launch();
}
- private static class MyMap implements Mapper {
+ private static class MyMap
+ implements Mapper<WritableComparable, Text, Text, Text> {
public void configure(JobConf conf) {
}
- public void map(WritableComparable key, Writable value,
- OutputCollector output, Reporter reporter
- ) throws IOException {
- String str = ((Text) value).toString().toLowerCase();
+ public void map(WritableComparable key, Text value,
+ OutputCollector<Text, Text> output,
+ Reporter reporter) throws IOException {
+ String str = value.toString().toLowerCase();
output.collect(new Text(str), value);
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Thu Aug 16 11:45:49 2007
@@ -111,7 +111,9 @@
}
- private static class MyInputFormat implements InputFormat {
+ private static class MyInputFormat
+ implements InputFormat<IntWritable, Text> {
+
static final String[] data = new String[]{
"crocodile pants",
"aunt annie",
@@ -151,7 +153,7 @@
}
}
- static class MyRecordReader implements RecordReader {
+ static class MyRecordReader implements RecordReader<IntWritable, Text> {
int index;
int past;
int length;
@@ -162,21 +164,21 @@
this.length = length;
}
- public boolean next(Writable key, Writable value) throws IOException {
+ public boolean next(IntWritable key, Text value) throws IOException {
if (index < past) {
- ((IntWritable) key).set(index);
- ((Text) value).set(data[index]);
+ key.set(index);
+ value.set(data[index]);
index += 1;
return true;
}
return false;
}
- public WritableComparable createKey() {
+ public IntWritable createKey() {
return new IntWritable();
}
- public Writable createValue() {
+ public Text createValue() {
return new Text();
}
@@ -200,18 +202,23 @@
new MySplit(4, 2)};
}
- public RecordReader getRecordReader(InputSplit split,
- JobConf job,
- Reporter reporter) throws IOException {
+ public RecordReader<IntWritable, Text> getRecordReader(InputSplit split,
+ JobConf job,
+ Reporter reporter)
+ throws IOException {
MySplit sp = (MySplit) split;
return new MyRecordReader(sp.first, sp.length);
}
}
- static class MyMapper extends MapReduceBase implements Mapper {
+ static class MyMapper extends MapReduceBase
+ implements Mapper<WritableComparable, Writable,
+ WritableComparable, Writable> {
+
public void map(WritableComparable key, Writable value,
- OutputCollector out, Reporter reporter) throws IOException {
+ OutputCollector<WritableComparable, Writable> out,
+ Reporter reporter) throws IOException {
System.out.println("map: " + key + ", " + value);
out.collect((WritableComparable) value, key);
InputSplit split = reporter.getInputSplit();
@@ -222,10 +229,12 @@
}
}
- static class MyReducer extends MapReduceBase implements Reducer {
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter
- ) throws IOException {
+ static class MyReducer extends MapReduceBase
+ implements Reducer<WritableComparable, Writable,
+ WritableComparable, Writable> {
+ public void reduce(WritableComparable key, Iterator<Writable> values,
+ OutputCollector<WritableComparable, Writable> output,
+ Reporter reporter) throws IOException {
try {
InputSplit split = reporter.getInputSplit();
throw new IOException("Got an input split of " + split);