You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/05/21 04:12:29 UTC

svn commit: r171186 - in /incubator/nutch/branches/mapred/src/java/org/apache/nutch: io/SequenceFile.java ipc/Server.java mapred/CombiningCollector.java mapred/JobConf.java mapred/MapRunnable.java mapred/MapRunner.java mapred/MapTask.java mapred/SequenceFileInputFormat.java mapred/TextInputFormat.java mapred/TextOutputFormat.java

Author: cutting
Date: Fri May 20 19:12:28 2005
New Revision: 171186

URL: http://svn.apache.org/viewcvs?rev=171186&view=rev
Log:
Added support for async multi-threaded mappers, like, e.g., a fetcher.

Added:
    incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java
    incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java
Modified:
    incubator/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
    incubator/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Server.java
    incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java
    incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java
    incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
    incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
    incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java
    incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java

Modified: incubator/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java?rev=171186&r1=171185&r2=171186&view=diff
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java (original)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java Fri May 20 19:12:28 2005
@@ -113,7 +113,7 @@
 
 
     /** Close the file. */
-    public void close() throws IOException {
+    public synchronized void close() throws IOException {
       if (out != null) {
         out.close();
         out = null;
@@ -121,7 +121,8 @@
     }
 
     /** Append a key/value pair. */
-    public void append(Writable key, Writable val) throws IOException {
+    public synchronized void append(Writable key, Writable val)
+      throws IOException {
       if (key.getClass() != keyClass)
         throw new IOException("wrong key class: "+key+" is not "+keyClass);
       if (val.getClass() != valClass)
@@ -139,8 +140,8 @@
     }
 
     /** Append a key/value pair. */
-    public void append(byte[] data, int start, int length, int keyLength)
-      throws IOException {
+    public synchronized void append(byte[] data, int start, int length,
+                                    int keyLength) throws IOException {
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed");
 
@@ -156,7 +157,7 @@
     }
 
     /** Returns the current length of the output file. */
-    public long getLength() throws IOException {
+    public synchronized long getLength() throws IOException {
       return out.getPos();
     }
 

Modified: incubator/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Server.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Server.java?rev=171186&r1=171185&r2=171186&view=diff
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Server.java (original)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Server.java Fri May 20 19:12:28 2005
@@ -25,6 +25,7 @@
 
 import java.net.Socket;
 import java.net.ServerSocket;
+import java.net.SocketException;
 import java.net.SocketTimeoutException;
 
 import java.util.LinkedList;
@@ -146,7 +147,9 @@
           }
         }
       } catch (EOFException eof) {
-          // This is what happens when the other side shuts things down
+          // This is what happens on linux when the other side shuts down
+      } catch (SocketException eof) {
+          // This is what happens on Win32 when the other side shuts down
       } catch (Exception e) {
         LOG.log(Level.INFO, getName() + " caught: " + e, e);
       } finally {

Modified: incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java?rev=171186&r1=171185&r2=171186&view=diff
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java (original)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java Fri May 20 19:12:28 2005
@@ -51,7 +51,7 @@
     }
   }
 
-  public void collect(WritableComparable key, Writable value)
+  public synchronized void collect(WritableComparable key, Writable value)
     throws IOException {
 
     // buffer new value in map
@@ -71,7 +71,7 @@
     }
   }
 
-  public void flush() throws IOException {
+  public synchronized void flush() throws IOException {
     Iterator pairs = keyToValues.entrySet().iterator();
     while (pairs.hasNext()) {
       Map.Entry pair = (Map.Entry)pairs.next();

Modified: incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java?rev=171186&r1=171185&r2=171186&view=diff
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java (original)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java Fri May 20 19:12:28 2005
@@ -168,6 +168,14 @@
     setClass("mapred.mapper.class", theClass, Mapper.class);
   }
 
+  public Class getMapRunnerClass() {
+    return getClass("mapred.map.runner.class",
+                    MapRunner.class, MapRunnable.class);
+  }
+  public void setMapRunnerClass(Class theClass) {
+    setClass("mapred.map.runner.class", theClass, MapRunnable.class);
+  }
+
   public Class getPartitionerClass() {
     return getClass("mapred.partitioner.class",
                     HashPartitioner.class, Partitioner.class);

Added: incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java?rev=171186&view=auto
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java (added)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java Fri May 20 19:12:28 2005
@@ -0,0 +1,33 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.mapred;
+
+import java.io.IOException;
+
+import org.apache.nutch.io.Writable;
+import org.apache.nutch.io.WritableComparable;
+
+/** Expert: Permits greater control of map processing. For example,
+ * implementations might perform multi-threaded, asynchronous mappings. */
+public interface MapRunnable extends Configurable {
+  /** Called to execute mapping.  Mapping is complete when this returns.
+   * @param input the {@link RecordReader} with input key/value pairs.
+   * @param output the {@link OutputCollector} for mapped key/value pairs.
+   */
+  void run(RecordReader input, OutputCollector output)
+    throws IOException;
+}

Added: incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java?rev=171186&view=auto
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java (added)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java Fri May 20 19:12:28 2005
@@ -0,0 +1,58 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.mapred;
+
+import java.io.IOException;
+
+import org.apache.nutch.io.Writable;
+import org.apache.nutch.io.WritableComparable;
+
+/** Default {@link MapRunnable} implementation.*/
+public class MapRunner implements MapRunnable {
+  private Mapper mapper;
+  private Class inputKeyClass;
+  private Class inputValueClass;
+
+  public void configure(JobConf job) {
+    mapper = (Mapper)job.newInstance(job.getMapperClass());
+    inputKeyClass = job.getInputKeyClass();
+    inputValueClass = job.getInputValueClass();
+  }
+
+  public void run(RecordReader input, OutputCollector output)
+    throws IOException {
+    while (true) {
+      // allocate new key & value instances
+      WritableComparable key = null;
+      Writable value = null;
+      try {
+        key = (WritableComparable)inputKeyClass.newInstance();
+        value = (Writable)inputValueClass.newInstance();
+      } catch (Exception e) {
+        throw new IOException(e.toString());
+      }
+
+      // read next key & value
+      if (!input.next(key, value))
+        return;
+
+      // map pair to output
+      mapper.map(key, value, output);
+    }
+  }
+
+}

Modified: incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java?rev=171186&r1=171185&r2=171186&view=diff
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java (original)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java Fri May 20 19:12:28 2005
@@ -53,7 +53,7 @@
     split.readFields(in);
   }
 
-  public void run(final JobConf job, TaskUmbilicalProtocol umbilical)
+  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
 
     // open output files
@@ -68,12 +68,12 @@
                                   job.getOutputValueClass());
       }
 
-      Mapper mapper = (Mapper)job.newInstance(job.getMapperClass());
       final Partitioner partitioner =
         (Partitioner)job.newInstance(job.getPartitionerClass());
 
       OutputCollector partCollector = new OutputCollector() { // make collector
-          public void collect(WritableComparable key, Writable value)
+          public synchronized void collect(WritableComparable key,
+                                           Writable value)
             throws IOException {
             outs[partitioner.getPartition(key, partitions)].append(key, value);
           }
@@ -86,41 +86,34 @@
         collector = new CombiningCollector(job, partCollector);
       }
 
-      float end = (float)split.getLength();
-      float lastProgress = 0.0f;
+      final RecordReader rawIn =                  // open input
+        job.getInputFormat().getRecordReader(NutchFileSystem.get(),split,job);
 
-      Class inputKeyClass = job.getInputKeyClass();
-      Class inputValueClass = job.getInputValueClass();
-      WritableComparable key = null;
-      Writable value = null;
+      RecordReader in = new RecordReader() {      // wrap in progress reporter
+          private float end = (float)split.getLength();
+          private float lastProgress = 0.0f;
 
-      RecordReader in =                           // open the input
-        job.getInputFormat().getRecordReader(NutchFileSystem.get(),split,job);
+          public synchronized boolean next(Writable key, Writable value)
+            throws IOException {
 
-      try {
+            float progress =                        // compute progress
+              (float)Math.min((rawIn.getPos()-split.getStart())/end, 1.0f);
+            if ((progress - lastProgress) > 0.01f)  { // 100 progress reports
+              umbilical.progress(getTaskId(), new FloatWritable(progress));
+              lastProgress = progress;
+            }
 
-        // always allocate new keys and values when combining
-        if (combining || key == null) {
-          try {
-            key = (WritableComparable)inputKeyClass.newInstance();
-            value = (Writable)inputValueClass.newInstance();
-          } catch (Exception e) {
-            throw new IOException(e.toString());
+            return rawIn.next(key, value);
           }
-        }
-
-        while (in.next(key, value)) {             // map input to collector
+          public long getPos() throws IOException { return rawIn.getPos(); }
+          public void close() throws IOException { rawIn.close(); }
+        };
 
-          mapper.map(key, value, collector);
+      MapRunnable runner =
+        (MapRunnable)job.newInstance(job.getMapRunnerClass());
 
-          float progress =                        // compute progress
-            (float)Math.min((in.getPos()-split.getStart())/end, 1.0f);
-      
-          if ((progress - lastProgress) > 0.01f)  { // 100 progress reports
-            umbilical.progress(getTaskId(), new FloatWritable(progress));
-            lastProgress = progress;
-          }
-        }
+      try {
+        runner.run(in, collector);                // run the map
 
         if (combining) {                          // flush combiner
           ((CombiningCollector)collector).flush();

Modified: incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java?rev=171186&r1=171185&r2=171186&view=diff
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java (original)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java Fri May 20 19:12:28 2005
@@ -45,7 +45,8 @@
     in.sync(split.getStart());                    // sync to start
 
     return new RecordReader() {
-        public boolean next(Writable key, Writable value) throws IOException {
+        public synchronized boolean next(Writable key, Writable value)
+          throws IOException {
           long pos = in.getPosition();
           boolean more = in.next(key, value);
           if (pos >= end && in.syncSeen()) {
@@ -55,9 +56,11 @@
           }
         }
         
-        public long getPos() throws IOException { return in.getPosition(); }
+        public synchronized long getPos() throws IOException {
+          return in.getPosition();
+        }
 
-        public void close() throws IOException { in.close(); }
+        public synchronized void close() throws IOException { in.close(); }
 
       };
   }

Modified: incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java?rev=171186&r1=171185&r2=171186&view=diff
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java (original)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java Fri May 20 19:12:28 2005
@@ -56,7 +56,8 @@
 
     return new RecordReader() {
         /** Read a line. */
-        public boolean next(Writable key, Writable value) throws IOException {
+        public synchronized boolean next(Writable key, Writable value)
+          throws IOException {
           long pos = in.getPos();
           if (pos >= end)
             return false;
@@ -66,9 +67,11 @@
           return true;
         }
         
-        public long getPos() throws IOException { return in.getPos(); }
+        public  synchronized long getPos() throws IOException {
+          return in.getPos();
+        }
 
-        public void close() throws IOException { in.close(); }
+        public synchronized void close() throws IOException { in.close(); }
 
       };
   }

Modified: incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java?rev=171186&r1=171185&r2=171186&view=diff
==============================================================================
--- incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java (original)
+++ incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java Fri May 20 19:12:28 2005
@@ -38,14 +38,14 @@
       new NFSDataOutputStream(fs.create(file, true));
 
     return new RecordWriter() {
-        public void write(WritableComparable key, Writable value)
+        public synchronized void write(WritableComparable key, Writable value)
           throws IOException {
           out.writeBytes(key.toString());         // BUG: assume 8-bit chars
           out.writeByte('\t');
           out.writeBytes(value.toString());       // BUG: assume 8-bit chars
           out.writeByte('\n');
         }
-        public void close() throws IOException {
+        public synchronized void close() throws IOException {
           out.close();
         }
       };