You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/06/17 20:54:04 UTC

svn commit: r668793 - in /hadoop/core/branches/branch-0.17: CHANGES.txt src/java/org/apache/hadoop/io/serializer/JavaSerialization.java src/test/org/apache/hadoop/mapred/TestJavaSerialization.java

Author: omalley
Date: Tue Jun 17 11:54:04 2008
New Revision: 668793

URL: http://svn.apache.org/viewvc?rev=668793&view=rev
Log:
HADOOP-3565. Merge -r 668788:668789 from trunk to branch 0.17.

Modified:
    hadoop/core/branches/branch-0.17/CHANGES.txt
    hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java
    hadoop/core/branches/branch-0.17/src/test/org/apache/hadoop/mapred/TestJavaSerialization.java

Modified: hadoop/core/branches/branch-0.17/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.17/CHANGES.txt?rev=668793&r1=668792&r2=668793&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.17/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.17/CHANGES.txt Tue Jun 17 11:54:04 2008
@@ -18,6 +18,14 @@
     distributions.  (Adam Heath via cutting)
 
 
+Release 0.17.1 - Unreleased
+
+  INCOMPATIBLE CHANGES
+
+    HADOOP-3565. Fix the Java serialization, which is not enabled by
+    default, to clear the state of the serializer between objects.
+    (tomwhite via omalley)
+
 Release 0.17.0 - 2008-05-18
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java?rev=668793&r1=668792&r2=668793&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java (original)
+++ hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java Tue Jun 17 11:54:04 2008
@@ -76,6 +76,7 @@
     }
 
     public void serialize(Serializable object) throws IOException {
+      oos.reset(); // clear (class) back-references
       oos.writeObject(object);
     }
 

Modified: hadoop/core/branches/branch-0.17/src/test/org/apache/hadoop/mapred/TestJavaSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.17/src/test/org/apache/hadoop/mapred/TestJavaSerialization.java?rev=668793&r1=668792&r2=668793&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.17/src/test/org/apache/hadoop/mapred/TestJavaSerialization.java (original)
+++ hadoop/core/branches/branch-0.17/src/test/org/apache/hadoop/mapred/TestJavaSerialization.java Tue Jun 17 11:54:04 2008
@@ -18,83 +18,58 @@
 package org.apache.hadoop.mapred;
 
 import java.io.BufferedReader;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
+import java.util.Iterator;
+import java.util.StringTokenizer;
 
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.serializer.JavaSerializationComparator;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.hadoop.util.Progressable;
 
 public class TestJavaSerialization extends ClusterMapReduceTestCase {
   
-  static class TypeConverterMapper extends MapReduceBase implements
-      Mapper<LongWritable, Text, Long, String> {
+  static class WordCountMapper extends MapReduceBase implements
+      Mapper<LongWritable, Text, String, Long> {
 
     public void map(LongWritable key, Text value,
-        OutputCollector<Long, String> output, Reporter reporter)
+        OutputCollector<String, Long> output, Reporter reporter)
         throws IOException {
-      output.collect(key.get(), value.toString());
+      StringTokenizer st = new StringTokenizer(value.toString());
+      while (st.hasMoreTokens()) {
+        output.collect(st.nextToken(), 1L);
+      }
     }
 
   }
   
-  static class StringOutputFormat<K, V> extends FileOutputFormat<K, V> {
+  static class SumReducer<K> extends MapReduceBase implements
+      Reducer<K, Long, K, Long> {
     
-    static class LineRecordWriter<K, V> implements RecordWriter<K, V> {
-      
-      private DataOutputStream out;
-      
-      public LineRecordWriter(DataOutputStream out) {
-        this.out = out;
-      }
-
-      public void close(Reporter reporter) throws IOException {
-        out.close();
-      }
-
-      public void write(K key, V value) throws IOException {
-        print(key);
-        print("\t");
-        print(value);
-        print("\n");
-      }
-      
-      private void print(Object o) throws IOException {
-        out.write(o.toString().getBytes("UTF-8"));
+    public void reduce(K key, Iterator<Long> values,
+        OutputCollector<K, Long> output, Reporter reporter)
+      throws IOException {
+
+      long sum = 0;
+      while (values.hasNext()) {
+        sum += values.next();
       }
-      
-    }
-
-    @Override
-    public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
-        String name, Progressable progress) throws IOException {
-
-      Path dir = getWorkOutputPath(job);
-      FileSystem fs = dir.getFileSystem(job);
-      FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
-      return new LineRecordWriter<K, V>(fileOut);
+      output.collect(key, sum);
     }
     
   }
   
   public void testMapReduceJob() throws Exception {
-    OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
+    OutputStream os = getFileSystem().create(new Path(getInputDir(),
+        "text.txt"));
     Writer wr = new OutputStreamWriter(os);
-    wr.write("hello1\n");
-    wr.write("hello2\n");
-    wr.write("hello3\n");
-    wr.write("hello4\n");
+    wr.write("b a\n");
     wr.close();
 
     JobConf conf = createJobConf();
@@ -106,16 +81,12 @@
 
     conf.setInputFormat(TextInputFormat.class);
 
-    conf.setMapOutputKeyClass(Long.class);
-    conf.setMapOutputValueClass(String.class);
-
-    conf.setOutputFormat(StringOutputFormat.class);
-    conf.setOutputKeyClass(Long.class);
-    conf.setOutputValueClass(String.class);
+    conf.setOutputKeyClass(String.class);
+    conf.setOutputValueClass(Long.class);
     conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
 
-    conf.setMapperClass(TypeConverterMapper.class);
-    conf.setReducerClass(IdentityReducer.class);
+    conf.setMapperClass(WordCountMapper.class);
+    conf.setReducerClass(SumReducer.class);
 
     FileInputFormat.setInputPaths(conf, getInputDir());
 
@@ -126,19 +97,13 @@
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
                            new OutputLogFilter()));
-    if (outputFiles.length > 0) {
-      InputStream is = getFileSystem().open(outputFiles[0]);
-      BufferedReader reader = new BufferedReader(new InputStreamReader(is));
-      String line = reader.readLine();
-      int counter = 0;
-      while (line != null) {
-        counter++;
-        assertTrue(line.contains("hello"));
-        line = reader.readLine();
-      }
-      reader.close();
-      assertEquals(4, counter);
-    }
+    assertEquals(1, outputFiles.length);
+    InputStream is = getFileSystem().open(outputFiles[0]);
+    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+    assertEquals("a\t1", reader.readLine());
+    assertEquals("b\t1", reader.readLine());
+    assertNull(reader.readLine());
+    reader.close();
   }
 
 }