You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2007/04/11 14:05:11 UTC

svn commit: r527465 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/MapTask.java src/test/org/apache/hadoop/mapred/TestMapOutputType.java

Author: tomwhite
Date: Wed Apr 11 05:05:10 2007
New Revision: 527465

URL: http://svn.apache.org/viewvc?view=rev&rev=527465
Log:
HADOOP-1001.  Check the type of keys and values generated by the mapper against the types specified in JobConf.  Contributed by Tahir Hashmi.

Added:
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputType.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=527465&r1=527464&r2=527465
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Apr 11 05:05:10 2007
@@ -155,6 +155,10 @@
     previously started to create the file to the description of 
     AlreadyBeingCreatedException.  (Konstantin Shvachko via tomwhite)
 
+48. HADOOP-1001.  Check the type of keys and values generated by the 
+    mapper against the types specified in JobConf.  
+    (Tahir Hashmi via tomwhite)
+
 
 Release 0.12.3 - 2007-04-06
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=527465&r1=527464&r2=527465
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Wed Apr 11 05:05:10 2007
@@ -312,6 +312,18 @@
     
     public void collect(WritableComparable key,
               Writable value) throws IOException {
+      
+      if (key.getClass() != keyClass) {
+        throw new IOException("Type mismatch in key from map: expected "
+                              + keyClass.getName() + ", recieved "
+                              + key.getClass().getName());
+      }
+      if (value.getClass() != valClass) {
+        throw new IOException("Type mismatch in value from map: expected "
+                              + valClass.getName() + ", recieved "
+                              + value.getClass().getName());
+      }
+      
       synchronized (this) {
         //dump the key/value to buffer
         int keyOffset = keyValBuffer.getLength(); 

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputType.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputType.java?view=auto&rev=527465
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputType.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputType.java Wed Apr 11 05:05:10 2007
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.lib.*;
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+
+/** 
+ * TestMapOutputType checks whether the Map task handles type mismatch
+ * between mapper output and the type specified in
+ * JobConf.MapOutputKeyType and JobConf.MapOutputValueType.
+ */
+public class TestMapOutputType extends TestCase 
+{
+  JobConf conf = new JobConf(TestMapOutputType.class);
+  JobClient jc;
+  /** 
+   * TextGen is a Mapper that generates a Text key-value pair. The
+   * type specified in conf will be anything but.
+   */
+   
+  static class TextGen implements Mapper {
+    public void configure(JobConf job) {
+    }
+    
+    public void map(WritableComparable key, Writable val, OutputCollector out,
+                    Reporter reporter) throws IOException {
+      key = new Text("Hello");
+      val = new Text("World");
+      
+      out.collect(key, val);
+    }
+    
+    public void close() {
+    }
+  }
+  
+  /** A do-nothing reducer class. We won't get this far, really.
+   *
+   */
+  static class TextReduce implements Reducer {
+    
+    public void configure(JobConf job) {
+    }
+
+    public void reduce(WritableComparable key,
+                       Iterator values,
+                       OutputCollector out,
+                       Reporter reporter) throws IOException {
+      out.collect(new Text("Test"), new Text("Me"));
+    }
+
+    public void close() {
+    }
+  }
+
+
+  public void configure() throws Exception {
+    Path testdir = new Path("build/test/test.mapred.spill");
+    Path inDir = new Path(testdir, "in");
+    Path outDir = new Path(testdir, "out");
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(testdir);
+    conf.setInt("io.sort.mb", 1);
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setInputPath(inDir);
+    conf.setOutputPath(outDir);
+    conf.setMapperClass(TextGen.class);
+    conf.setReducerClass(TextReduce.class);
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(Text.class); 
+    
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    if (!fs.mkdirs(testdir)) {
+      throw new IOException("Mkdirs failed to create " + testdir.toString());
+    }
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    Path inFile = new Path(inDir, "part0");
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile, 
+        Text.class, Text.class);
+    writer.append(new Text("rec: 1"), new Text("Hello"));
+    writer.close();
+    
+    jc = new JobClient(conf);
+  }
+  
+  public void testKeyMismatch() throws Exception {
+    configure();
+    
+//  Set bad MapOutputKeyClass and MapOutputValueClass
+    conf.setMapOutputKeyClass(IntWritable.class);
+    conf.setMapOutputValueClass(IntWritable.class);
+    
+    RunningJob r_job = jc.submitJob(conf);
+    while (!r_job.isComplete()) {
+      Thread.sleep(1000);
+    }
+    
+    if (r_job.isSuccessful()) {
+      fail("Oops! The job was supposed to break due to an exception");
+    }
+  }
+  
+  public void testValueMismatch() throws Exception {
+    configure();
+  
+// Set good MapOutputKeyClass, bad MapOutputValueClass    
+    conf.setMapOutputKeyClass(Text.class);
+    conf.setMapOutputValueClass(IntWritable.class);
+    
+    RunningJob r_job = jc.submitJob(conf);
+    while (!r_job.isComplete()) {
+      Thread.sleep(1000);
+    }
+    
+    if (r_job.isSuccessful()) {
+      fail("Oops! The job was supposed to break due to an exception");
+    }
+  }
+  
+  public void testNoMismatch() throws Exception{ 
+    configure();
+    
+//  Set good MapOutputKeyClass and MapOutputValueClass    
+     conf.setMapOutputKeyClass(Text.class);
+     conf.setMapOutputValueClass(Text.class);
+     
+     RunningJob r_job = jc.submitJob(conf);
+     while (!r_job.isComplete()) {
+       Thread.sleep(1000);
+     }
+     
+     if (!r_job.isSuccessful()) {
+       fail("Oops! The job broke due to an unexpected error");
+     }
+   }
+}