You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/05/21 04:12:29 UTC
svn commit: r171186 - in
/incubator/nutch/branches/mapred/src/java/org/apache/nutch:
io/SequenceFile.java ipc/Server.java mapred/CombiningCollector.java
mapred/JobConf.java mapred/MapRunnable.java mapred/MapRunner.java
mapred/MapTask.java mapred/SequenceFileInputFormat.java
mapred/TextInputFormat.java mapred/TextOutputFormat.java
Author: cutting
Date: Fri May 20 19:12:28 2005
New Revision: 171186
URL: http://svn.apache.org/viewcvs?rev=171186&view=rev
Log:
Added support for async multi-threaded mappers, like, e.g., a fetcher.
Added:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Server.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java
Modified: incubator/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java?rev=171186&r1=171185&r2=171186&view=diff
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java (original)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java Fri May 20 19:12:28 2005
@@ -113,7 +113,7 @@
/** Close the file. */
- public void close() throws IOException {
+ public synchronized void close() throws IOException {
if (out != null) {
out.close();
out = null;
@@ -121,7 +121,8 @@
}
/** Append a key/value pair. */
- public void append(Writable key, Writable val) throws IOException {
+ public synchronized void append(Writable key, Writable val)
+ throws IOException {
if (key.getClass() != keyClass)
throw new IOException("wrong key class: "+key+" is not "+keyClass);
if (val.getClass() != valClass)
@@ -139,8 +140,8 @@
}
/** Append a key/value pair. */
- public void append(byte[] data, int start, int length, int keyLength)
- throws IOException {
+ public synchronized void append(byte[] data, int start, int length,
+ int keyLength) throws IOException {
if (keyLength == 0)
throw new IOException("zero length keys not allowed");
@@ -156,7 +157,7 @@
}
/** Returns the current length of the output file. */
- public long getLength() throws IOException {
+ public synchronized long getLength() throws IOException {
return out.getPos();
}
Modified: incubator/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Server.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Server.java?rev=171186&r1=171185&r2=171186&view=diff
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Server.java (original)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Server.java Fri May 20 19:12:28 2005
@@ -25,6 +25,7 @@
import java.net.Socket;
import java.net.ServerSocket;
+import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.LinkedList;
@@ -146,7 +147,9 @@
}
}
} catch (EOFException eof) {
- // This is what happens when the other side shuts things down
+ // This is what happens on linux when the other side shuts down
+ } catch (SocketException eof) {
+ // This is what happens on Win32 when the other side shuts down
} catch (Exception e) {
LOG.log(Level.INFO, getName() + " caught: " + e, e);
} finally {
Modified: incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java?rev=171186&r1=171185&r2=171186&view=diff
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java (original)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java Fri May 20 19:12:28 2005
@@ -51,7 +51,7 @@
}
}
- public void collect(WritableComparable key, Writable value)
+ public synchronized void collect(WritableComparable key, Writable value)
throws IOException {
// buffer new value in map
@@ -71,7 +71,7 @@
}
}
- public void flush() throws IOException {
+ public synchronized void flush() throws IOException {
Iterator pairs = keyToValues.entrySet().iterator();
while (pairs.hasNext()) {
Map.Entry pair = (Map.Entry)pairs.next();
Modified: incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java?rev=171186&r1=171185&r2=171186&view=diff
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java (original)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java Fri May 20 19:12:28 2005
@@ -168,6 +168,14 @@
setClass("mapred.mapper.class", theClass, Mapper.class);
}
+ public Class getMapRunnerClass() {
+ return getClass("mapred.map.runner.class",
+ MapRunner.class, MapRunnable.class);
+ }
+ public void setMapRunnerClass(Class theClass) {
+ setClass("mapred.map.runner.class", theClass, MapRunnable.class);
+ }
+
public Class getPartitionerClass() {
return getClass("mapred.partitioner.class",
HashPartitioner.class, Partitioner.class);
Added: incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java?rev=171186&view=auto
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java (added)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java Fri May 20 19:12:28 2005
@@ -0,0 +1,33 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.mapred;
+
+import java.io.IOException;
+
+import org.apache.nutch.io.Writable;
+import org.apache.nutch.io.WritableComparable;
+
+/** Expert: Permits greater control of map processing. For example,
+ * implementations might perform multi-threaded, asynchronous mappings. */
+public interface MapRunnable extends Configurable {
+ /** Called to execute mapping. Mapping is complete when this returns.
+ * @param input the {@link RecordReader} with input key/value pairs.
+ * @param output the {@link OutputCollector} for mapped key/value pairs.
+ */
+ void run(RecordReader input, OutputCollector output)
+ throws IOException;
+}
Added: incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java?rev=171186&view=auto
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java (added)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java Fri May 20 19:12:28 2005
@@ -0,0 +1,58 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.mapred;
+
+import java.io.IOException;
+
+import org.apache.nutch.io.Writable;
+import org.apache.nutch.io.WritableComparable;
+
+/** Default {@link MapRunnable} implementation.*/
+public class MapRunner implements MapRunnable {
+ private Mapper mapper;
+ private Class inputKeyClass;
+ private Class inputValueClass;
+
+ public void configure(JobConf job) {
+ mapper = (Mapper)job.newInstance(job.getMapperClass());
+ inputKeyClass = job.getInputKeyClass();
+ inputValueClass = job.getInputValueClass();
+ }
+
+ public void run(RecordReader input, OutputCollector output)
+ throws IOException {
+ while (true) {
+ // allocate new key & value instances
+ WritableComparable key = null;
+ Writable value = null;
+ try {
+ key = (WritableComparable)inputKeyClass.newInstance();
+ value = (Writable)inputValueClass.newInstance();
+ } catch (Exception e) {
+ throw new IOException(e.toString());
+ }
+
+ // read next key & value
+ if (!input.next(key, value))
+ return;
+
+ // map pair to output
+ mapper.map(key, value, output);
+ }
+ }
+
+}
Modified: incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java?rev=171186&r1=171185&r2=171186&view=diff
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java (original)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java Fri May 20 19:12:28 2005
@@ -53,7 +53,7 @@
split.readFields(in);
}
- public void run(final JobConf job, TaskUmbilicalProtocol umbilical)
+ public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException {
// open output files
@@ -68,12 +68,12 @@
job.getOutputValueClass());
}
- Mapper mapper = (Mapper)job.newInstance(job.getMapperClass());
final Partitioner partitioner =
(Partitioner)job.newInstance(job.getPartitionerClass());
OutputCollector partCollector = new OutputCollector() { // make collector
- public void collect(WritableComparable key, Writable value)
+ public synchronized void collect(WritableComparable key,
+ Writable value)
throws IOException {
outs[partitioner.getPartition(key, partitions)].append(key, value);
}
@@ -86,41 +86,34 @@
collector = new CombiningCollector(job, partCollector);
}
- float end = (float)split.getLength();
- float lastProgress = 0.0f;
+ final RecordReader rawIn = // open input
+ job.getInputFormat().getRecordReader(NutchFileSystem.get(),split,job);
- Class inputKeyClass = job.getInputKeyClass();
- Class inputValueClass = job.getInputValueClass();
- WritableComparable key = null;
- Writable value = null;
+ RecordReader in = new RecordReader() { // wrap in progress reporter
+ private float end = (float)split.getLength();
+ private float lastProgress = 0.0f;
- RecordReader in = // open the input
- job.getInputFormat().getRecordReader(NutchFileSystem.get(),split,job);
+ public synchronized boolean next(Writable key, Writable value)
+ throws IOException {
- try {
+ float progress = // compute progress
+ (float)Math.min((rawIn.getPos()-split.getStart())/end, 1.0f);
+ if ((progress - lastProgress) > 0.01f) { // 100 progress reports
+ umbilical.progress(getTaskId(), new FloatWritable(progress));
+ lastProgress = progress;
+ }
- // always allocate new keys and values when combining
- if (combining || key == null) {
- try {
- key = (WritableComparable)inputKeyClass.newInstance();
- value = (Writable)inputValueClass.newInstance();
- } catch (Exception e) {
- throw new IOException(e.toString());
+ return rawIn.next(key, value);
}
- }
-
- while (in.next(key, value)) { // map input to collector
+ public long getPos() throws IOException { return rawIn.getPos(); }
+ public void close() throws IOException { rawIn.close(); }
+ };
- mapper.map(key, value, collector);
+ MapRunnable runner =
+ (MapRunnable)job.newInstance(job.getMapRunnerClass());
- float progress = // compute progress
- (float)Math.min((in.getPos()-split.getStart())/end, 1.0f);
-
- if ((progress - lastProgress) > 0.01f) { // 100 progress reports
- umbilical.progress(getTaskId(), new FloatWritable(progress));
- lastProgress = progress;
- }
- }
+ try {
+ runner.run(in, collector); // run the map
if (combining) { // flush combiner
((CombiningCollector)collector).flush();
Modified: incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java?rev=171186&r1=171185&r2=171186&view=diff
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java (original)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java Fri May 20 19:12:28 2005
@@ -45,7 +45,8 @@
in.sync(split.getStart()); // sync to start
return new RecordReader() {
- public boolean next(Writable key, Writable value) throws IOException {
+ public synchronized boolean next(Writable key, Writable value)
+ throws IOException {
long pos = in.getPosition();
boolean more = in.next(key, value);
if (pos >= end && in.syncSeen()) {
@@ -55,9 +56,11 @@
}
}
- public long getPos() throws IOException { return in.getPosition(); }
+ public synchronized long getPos() throws IOException {
+ return in.getPosition();
+ }
- public void close() throws IOException { in.close(); }
+ public synchronized void close() throws IOException { in.close(); }
};
}
Modified: incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java?rev=171186&r1=171185&r2=171186&view=diff
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java (original)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java Fri May 20 19:12:28 2005
@@ -56,7 +56,8 @@
return new RecordReader() {
/** Read a line. */
- public boolean next(Writable key, Writable value) throws IOException {
+ public synchronized boolean next(Writable key, Writable value)
+ throws IOException {
long pos = in.getPos();
if (pos >= end)
return false;
@@ -66,9 +67,11 @@
return true;
}
- public long getPos() throws IOException { return in.getPos(); }
+ public synchronized long getPos() throws IOException {
+ return in.getPos();
+ }
- public void close() throws IOException { in.close(); }
+ public synchronized void close() throws IOException { in.close(); }
};
}
Modified: incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java?rev=171186&r1=171185&r2=171186&view=diff
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java (original)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java Fri May 20 19:12:28 2005
@@ -38,14 +38,14 @@
new NFSDataOutputStream(fs.create(file, true));
return new RecordWriter() {
- public void write(WritableComparable key, Writable value)
+ public synchronized void write(WritableComparable key, Writable value)
throws IOException {
out.writeBytes(key.toString()); // BUG: assume 8-bit chars
out.writeByte('\t');
out.writeBytes(value.toString()); // BUG: assume 8-bit chars
out.writeByte('\n');
}
- public void close() throws IOException {
+ public synchronized void close() throws IOException {
out.close();
}
};