You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/03/28 16:25:49 UTC

svn commit: r642259 - /hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJavaSerialization.java

Author: ddas
Date: Fri Mar 28 08:25:45 2008
New Revision: 642259

URL: http://svn.apache.org/viewvc?rev=642259&view=rev
Log:
HADOOP-2997. Adding the testcase missed in the earlier commit of this issue.

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJavaSerialization.java

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJavaSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJavaSerialization.java?rev=642259&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJavaSerialization.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJavaSerialization.java Fri Mar 28 08:25:45 2008
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.JavaSerializationComparator;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Progressable;
+
+public class TestJavaSerialization extends ClusterMapReduceTestCase {
+  
+  static class TypeConverterMapper extends MapReduceBase implements
+      Mapper<LongWritable, Text, Long, String> {
+
+    public void map(LongWritable key, Text value,
+        OutputCollector<Long, String> output, Reporter reporter)
+        throws IOException {
+      output.collect(key.get(), value.toString());
+    }
+
+  }
+  
+  static class StringOutputFormat<K, V> extends OutputFormatBase<K, V> {
+    
+    static class LineRecordWriter<K, V> implements RecordWriter<K, V> {
+      
+      private DataOutputStream out;
+      
+      public LineRecordWriter(DataOutputStream out) {
+        this.out = out;
+      }
+
+      public void close(Reporter reporter) throws IOException {
+        out.close();
+      }
+
+      public void write(K key, V value) throws IOException {
+        print(key);
+        print("\t");
+        print(value);
+        print("\n");
+      }
+      
+      private void print(Object o) throws IOException {
+        out.write(o.toString().getBytes("UTF-8"));
+      }
+      
+    }
+
+    @Override
+    public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
+        String name, Progressable progress) throws IOException {
+
+      Path dir = job.getOutputPath();
+      FileSystem fs = dir.getFileSystem(job);
+      FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
+      return new LineRecordWriter<K, V>(fileOut);
+    }
+    
+  }
+  
+  public void testMapReduceJob() throws Exception {
+    OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
+    Writer wr = new OutputStreamWriter(os);
+    wr.write("hello1\n");
+    wr.write("hello2\n");
+    wr.write("hello3\n");
+    wr.write("hello4\n");
+    wr.close();
+
+    JobConf conf = createJobConf();
+    conf.setJobName("JavaSerialization");
+    
+    conf.set("io.serializations",
+    "org.apache.hadoop.io.serializer.JavaSerialization," +
+    "org.apache.hadoop.io.serializer.WritableSerialization");
+
+    conf.setInputFormat(TextInputFormat.class);
+
+    conf.setMapOutputKeyClass(Long.class);
+    conf.setMapOutputValueClass(String.class);
+
+    conf.setOutputFormat(StringOutputFormat.class);
+    conf.setOutputKeyClass(Long.class);
+    conf.setOutputValueClass(String.class);
+    conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
+
+    conf.setMapperClass(TypeConverterMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+
+    conf.setInputPath(getInputDir());
+
+    conf.setOutputPath(getOutputDir());
+
+    JobClient.runJob(conf);
+
+    Path[] outputFiles = FileUtil.stat2Paths(
+                           getFileSystem().listStatus(getOutputDir(),
+                           new OutputLogFilter()));
+    if (outputFiles.length > 0) {
+      InputStream is = getFileSystem().open(outputFiles[0]);
+      BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+      String line = reader.readLine();
+      int counter = 0;
+      while (line != null) {
+        counter++;
+        assertTrue(line.contains("hello"));
+        line = reader.readLine();
+      }
+      reader.close();
+      assertEquals(4, counter);
+    }
+  }
+
+}