You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/02/09 18:20:47 UTC

svn commit: r376355 - in /lucene/hadoop/trunk/src: examples/org/apache/hadoop/examples/ java/ java/org/apache/hadoop/mapred/ java/org/apache/hadoop/mapred/lib/ test/org/apache/hadoop/fs/ test/org/apache/hadoop/mapred/

Author: cutting
Date: Thu Feb  9 09:20:44 2006
New Revision: 376355

URL: http://svn.apache.org/viewcvs?rev=376355&view=rev
Log:
Fixed HADOOP-20: permit mappers and reducers to cleanup.  Add a close() method to the Mapper and Reducer interfaces by having them extend a Closeable interface.  Update all implementations to define close().  Patch by Michel Tourn.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Closeable.java
Modified:
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/InverseMapper.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java
    lucene/hadoop/trunk/src/java/overview.html
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MapredLoadTest.java

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java Thu Feb  9 09:20:44 2006
@@ -68,6 +68,9 @@
     public void configure(JobConf job) {
     }
     
+    public void close() {
+    }
+
   }
   
   /**
@@ -86,6 +89,9 @@
     }
     
     public void configure(JobConf job) {
+    }
+    
+    public void close() {
     }
     
   }

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Closeable.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Closeable.java?rev=376355&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Closeable.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Closeable.java Thu Feb  9 09:20:44 2006
@@ -0,0 +1,24 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/** That which can be closed. */
+public interface Closeable {
+  /** Called after the last call to any other method on this object to free
+   * and/or flush resources.  Typical implementations do nothing. */
+  void close();
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java Thu Feb  9 09:20:44 2006
@@ -77,5 +77,10 @@
     keyToValues.clear();
     count = 0;
   }
+  
+  public synchronized void close() 
+  { 
+	  combiner.close(); 
+  } 
 
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java Thu Feb  9 09:20:44 2006
@@ -38,18 +38,22 @@
   public void run(RecordReader input, OutputCollector output,
                   Reporter reporter)
     throws IOException {
-    while (true) {
-      // allocate new key & value instances
-      WritableComparable key =
-        (WritableComparable)job.newInstance(inputKeyClass);
-      Writable value = (Writable)job.newInstance(inputValueClass);
+    try {
+      while (true) {
+        // allocate new key & value instances
+        WritableComparable key =
+          (WritableComparable)job.newInstance(inputKeyClass);
+        Writable value = (Writable)job.newInstance(inputValueClass);
 
-      // read next key & value
-      if (!input.next(key, value))
-        return;
+        // read next key & value
+        if (!input.next(key, value))
+          return;
 
-      // map pair to output
-      mapper.map(key, value, output, reporter);
+        // map pair to output
+        mapper.map(key, value, output, reporter);
+      }
+    } finally {
+        mapper.close();
     }
   }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Thu Feb  9 09:20:44 2006
@@ -133,6 +133,9 @@
         }
 
       } finally {
+        if (combining) { 
+          ((CombiningCollector)collector).close(); 
+        } 
         in.close();                               // close input
       }
     } finally {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java Thu Feb  9 09:20:44 2006
@@ -25,7 +25,7 @@
  * intermediate values associated with a given output key are subsequently
  * grouped by the map/reduce system, and passed to a {@link Reducer} to
  * determine the final output.. */
-public interface Mapper extends JobConfigurable {
+public interface Mapper extends JobConfigurable, Closeable {
   /** Maps a single input key/value pair into intermediate key/value pairs.
    * Output pairs need not be of the same types as input pairs.  A given input
    * pair may map to zero or many output pairs.  Output pairs are collected

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Feb  9 09:20:44 2006
@@ -285,6 +285,7 @@
       }
 
     } finally {
+    	reducer.close();
       in.close();
       lfs.delete(new File(sortedFile));           // remove sorted
       out.close(reporter);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java Thu Feb  9 09:20:44 2006
@@ -25,7 +25,7 @@
 
 /** Reduces a set of intermediate values which share a key to a smaller set of
  * values.  Input values are the grouped output of a {@link Mapper}. */
-public interface Reducer extends JobConfigurable {
+public interface Reducer extends JobConfigurable, Closeable {
   /** Combines values for a given key.  Output values must be of the same type
    * as input values.  Input keys must not be altered.  Typically all values
    * are combined into zero or one value.  Output pairs are collected with
@@ -38,4 +38,5 @@
   void reduce(WritableComparable key, Iterator values,
               OutputCollector output, Reporter reporter)
     throws IOException;
+
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java Thu Feb  9 09:20:44 2006
@@ -38,5 +38,5 @@
     throws IOException {
     output.collect(key, val);
   }
-
+	public void close() {}
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java Thu Feb  9 09:20:44 2006
@@ -41,5 +41,7 @@
       output.collect(key, (Writable)values.next());
     }
   }
-
+	
+	public void close() {}
+	
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/InverseMapper.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/InverseMapper.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/InverseMapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/InverseMapper.java Thu Feb  9 09:20:44 2006
@@ -38,4 +38,7 @@
     throws IOException {
     output.collect((WritableComparable)value, key);
   }
+  
+  public void close() {}
+  
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java Thu Feb  9 09:20:44 2006
@@ -45,4 +45,7 @@
     // output sum
     output.collect(key, new LongWritable(sum));
   }
+  
+  public void close() {}
+  
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java Thu Feb  9 09:20:44 2006
@@ -53,4 +53,7 @@
       output.collect(new UTF8(matcher.group(group)), new LongWritable(1));
     }
   }
+  
+  public void close() {}
+  
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java Thu Feb  9 09:20:44 2006
@@ -48,6 +48,9 @@
     while (st.hasMoreTokens()) {
       // output <token,1> pairs
       output.collect(new UTF8(st.nextToken()), new LongWritable(1));
-    }
+    }  
   }
+  
+  public void close() {}
+  
 }

Modified: lucene/hadoop/trunk/src/java/overview.html
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/overview.html?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/overview.html (original)
+++ lucene/hadoop/trunk/src/java/overview.html Thu Feb  9 09:20:44 2006
@@ -120,7 +120,7 @@
 </configuration></xmp>
 
 <p>(We also set the DFS replication level to 1 in order to
-reduce the number of warnings.)</p>
+reduce warnings when running on a single node.)</p>
 
 <p>Now check that the command <br><tt>ssh localhost</tt><br> does not
 require a password.  If it does, execute the following commands:</p>

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java Thu Feb  9 09:20:44 2006
@@ -155,6 +155,10 @@
 
       reporter.setStatus("wrote " + name);
     }
+    
+    public void close() {
+    }
+    
   }
 
   public static void writeTest(FileSystem fs, boolean fastCheck)
@@ -247,6 +251,10 @@
 
       reporter.setStatus("read " + name);
     }
+    
+    public void close() {
+    }
+    
   }
 
   public static void readTest(FileSystem fs, boolean fastCheck)
@@ -339,6 +347,10 @@
         in.close();
       }
     }
+    
+    public void close() {
+    }
+    
   }
 
   public static void seekTest(FileSystem fs, boolean fastCheck)

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MapredLoadTest.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MapredLoadTest.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MapredLoadTest.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MapredLoadTest.java Thu Feb  9 09:20:44 2006
@@ -69,6 +69,9 @@
                 out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
             }
         }
+				public void close() {
+				}
+
     }
     static class RandomGenReducer implements Reducer {
         public void configure(JobConf job) {
@@ -81,6 +84,8 @@
                 out.collect(new UTF8("" + val), new UTF8(""));
             }
         }
+				public void close() {
+				}
     }
     static class RandomCheckMapper implements Mapper {
         public void configure(JobConf job) {
@@ -92,6 +97,8 @@
 
             out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1));
         }
+				public void close() {
+				}
     }
     static class RandomCheckReducer implements Reducer {
         public void configure(JobConf job) {
@@ -106,6 +113,8 @@
             }
             out.collect(new IntWritable(keyint), new IntWritable(count));
         }
+			public void close() {
+			}
     }
 
     int range;