You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/02/09 18:20:47 UTC
svn commit: r376355 - in /lucene/hadoop/trunk/src:
examples/org/apache/hadoop/examples/ java/ java/org/apache/hadoop/mapred/
java/org/apache/hadoop/mapred/lib/ test/org/apache/hadoop/fs/
test/org/apache/hadoop/mapred/
Author: cutting
Date: Thu Feb 9 09:20:44 2006
New Revision: 376355
URL: http://svn.apache.org/viewcvs?rev=376355&view=rev
Log:
Fixed HADOOP-20: permit mappers and reducers to cleanup. Add a close() method to the Mapper and Reducer interfaces by having them extend a Closeable interface. Update all implementations to define close(). Patch by Michel Tourn.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Closeable.java
Modified:
lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/InverseMapper.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java
lucene/hadoop/trunk/src/java/overview.html
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MapredLoadTest.java
Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java Thu Feb 9 09:20:44 2006
@@ -68,6 +68,9 @@
public void configure(JobConf job) {
}
+ public void close() {
+ }
+
}
/**
@@ -86,6 +89,9 @@
}
public void configure(JobConf job) {
+ }
+
+ public void close() {
}
}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Closeable.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Closeable.java?rev=376355&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Closeable.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Closeable.java Thu Feb 9 09:20:44 2006
@@ -0,0 +1,24 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/** That which can be closed. */
+public interface Closeable {
+ /** Called after the last call to any other method on this object to free
+ * and/or flush resources. Typical implementations do nothing. */
+ void close();
+}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java Thu Feb 9 09:20:44 2006
@@ -77,5 +77,10 @@
keyToValues.clear();
count = 0;
}
+
+ public synchronized void close()
+ {
+ combiner.close();
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java Thu Feb 9 09:20:44 2006
@@ -38,18 +38,22 @@
public void run(RecordReader input, OutputCollector output,
Reporter reporter)
throws IOException {
- while (true) {
- // allocate new key & value instances
- WritableComparable key =
- (WritableComparable)job.newInstance(inputKeyClass);
- Writable value = (Writable)job.newInstance(inputValueClass);
+ try {
+ while (true) {
+ // allocate new key & value instances
+ WritableComparable key =
+ (WritableComparable)job.newInstance(inputKeyClass);
+ Writable value = (Writable)job.newInstance(inputValueClass);
- // read next key & value
- if (!input.next(key, value))
- return;
+ // read next key & value
+ if (!input.next(key, value))
+ return;
- // map pair to output
- mapper.map(key, value, output, reporter);
+ // map pair to output
+ mapper.map(key, value, output, reporter);
+ }
+ } finally {
+ mapper.close();
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Thu Feb 9 09:20:44 2006
@@ -133,6 +133,9 @@
}
} finally {
+ if (combining) {
+ ((CombiningCollector)collector).close();
+ }
in.close(); // close input
}
} finally {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java Thu Feb 9 09:20:44 2006
@@ -25,7 +25,7 @@
* intermediate values associated with a given output key are subsequently
* grouped by the map/reduce system, and passed to a {@link Reducer} to
* determine the final output.. */
-public interface Mapper extends JobConfigurable {
+public interface Mapper extends JobConfigurable, Closeable {
/** Maps a single input key/value pair into intermediate key/value pairs.
* Output pairs need not be of the same types as input pairs. A given input
* pair may map to zero or many output pairs. Output pairs are collected
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Feb 9 09:20:44 2006
@@ -285,6 +285,7 @@
}
} finally {
+ reducer.close();
in.close();
lfs.delete(new File(sortedFile)); // remove sorted
out.close(reporter);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java Thu Feb 9 09:20:44 2006
@@ -25,7 +25,7 @@
/** Reduces a set of intermediate values which share a key to a smaller set of
* values. Input values are the grouped output of a {@link Mapper}. */
-public interface Reducer extends JobConfigurable {
+public interface Reducer extends JobConfigurable, Closeable {
/** Combines values for a given key. Output values must be of the same type
* as input values. Input keys must not be altered. Typically all values
* are combined into zero or one value. Output pairs are collected with
@@ -38,4 +38,5 @@
void reduce(WritableComparable key, Iterator values,
OutputCollector output, Reporter reporter)
throws IOException;
+
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java Thu Feb 9 09:20:44 2006
@@ -38,5 +38,5 @@
throws IOException {
output.collect(key, val);
}
-
+ public void close() {}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java Thu Feb 9 09:20:44 2006
@@ -41,5 +41,7 @@
output.collect(key, (Writable)values.next());
}
}
-
+
+ public void close() {}
+
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/InverseMapper.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/InverseMapper.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/InverseMapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/InverseMapper.java Thu Feb 9 09:20:44 2006
@@ -38,4 +38,7 @@
throws IOException {
output.collect((WritableComparable)value, key);
}
+
+ public void close() {}
+
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java Thu Feb 9 09:20:44 2006
@@ -45,4 +45,7 @@
// output sum
output.collect(key, new LongWritable(sum));
}
+
+ public void close() {}
+
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java Thu Feb 9 09:20:44 2006
@@ -53,4 +53,7 @@
output.collect(new UTF8(matcher.group(group)), new LongWritable(1));
}
}
+
+ public void close() {}
+
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java Thu Feb 9 09:20:44 2006
@@ -48,6 +48,9 @@
while (st.hasMoreTokens()) {
// output <token,1> pairs
output.collect(new UTF8(st.nextToken()), new LongWritable(1));
- }
+ }
}
+
+ public void close() {}
+
}
Modified: lucene/hadoop/trunk/src/java/overview.html
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/overview.html?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/overview.html (original)
+++ lucene/hadoop/trunk/src/java/overview.html Thu Feb 9 09:20:44 2006
@@ -120,7 +120,7 @@
</configuration></xmp>
<p>(We also set the DFS replication level to 1 in order to
-reduce the number of warnings.)</p>
+reduce warnings when running on a single node.)</p>
<p>Now check that the command <br><tt>ssh localhost</tt><br> does not
require a password. If it does, execute the following commands:</p>
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java Thu Feb 9 09:20:44 2006
@@ -155,6 +155,10 @@
reporter.setStatus("wrote " + name);
}
+
+ public void close() {
+ }
+
}
public static void writeTest(FileSystem fs, boolean fastCheck)
@@ -247,6 +251,10 @@
reporter.setStatus("read " + name);
}
+
+ public void close() {
+ }
+
}
public static void readTest(FileSystem fs, boolean fastCheck)
@@ -339,6 +347,10 @@
in.close();
}
}
+
+ public void close() {
+ }
+
}
public static void seekTest(FileSystem fs, boolean fastCheck)
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MapredLoadTest.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MapredLoadTest.java?rev=376355&r1=376354&r2=376355&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MapredLoadTest.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MapredLoadTest.java Thu Feb 9 09:20:44 2006
@@ -69,6 +69,9 @@
out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
}
}
+ public void close() {
+ }
+
}
static class RandomGenReducer implements Reducer {
public void configure(JobConf job) {
@@ -81,6 +84,8 @@
out.collect(new UTF8("" + val), new UTF8(""));
}
}
+ public void close() {
+ }
}
static class RandomCheckMapper implements Mapper {
public void configure(JobConf job) {
@@ -92,6 +97,8 @@
out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1));
}
+ public void close() {
+ }
}
static class RandomCheckReducer implements Reducer {
public void configure(JobConf job) {
@@ -106,6 +113,8 @@
}
out.collect(new IntWritable(keyint), new IntWritable(count));
}
+ public void close() {
+ }
}
int range;