You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2013/09/17 07:43:05 UTC

svn commit: r1523901 - /mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java

Author: ssc
Date: Tue Sep 17 05:43:04 2013
New Revision: 1523901

URL: http://svn.apache.org/r1523901
Log:
MAHOUT-1335 MultithreadedSharingMapper fails on Hadoop 2

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java?rev=1523901&r1=1523900&r2=1523901&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java Tue Sep 17 05:43:04 2013
@@ -17,7 +17,10 @@
 
 package org.apache.mahout.cf.taste.hadoop.als;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -34,17 +37,21 @@ import java.io.IOException;
  */
 public class MultithreadedSharingMapper<K1, V1, K2, V2> extends MultithreadedMapper<K1, V1, K2, V2> {
 
-  private static final String MAPPER_CLASS = "mapred.map.multithreadedrunner.class";
-
   @Override
   public void run(Context ctx) throws IOException, InterruptedException {
+    Class<Mapper<K1, V1, K2, V2>> mapperClass =
+        MultithreadedSharingMapper.getMapperClass((JobContext) ctx);
+    Preconditions.checkNotNull(mapperClass, "Could not find Multithreaded Mapper class.");
 
     Configuration conf = ctx.getConfiguration();
-
-    Class<? extends SharingMapper<K1,V1,K2,V2, ?>> mapperClass =
-        (Class<SharingMapper<K1,V1,K2,V2, ?>>) conf.getClass(MAPPER_CLASS, SharingMapper.class);
     // instantiate the mapper
-    SharingMapper<K1,V1,K2,V2, ?> mapper = ReflectionUtils.newInstance(mapperClass, conf);
+    Mapper<K1, V1, K2, V2> mapper1 = ReflectionUtils.newInstance(mapperClass, conf);
+    SharingMapper<K1, V1, K2, V2, ?> mapper = null;
+    if (mapper1 instanceof SharingMapper) {
+      mapper = (SharingMapper<K1, V1, K2, V2, ?>) mapper1;
+    }
+    Preconditions.checkNotNull(mapper, "Could not instantiate SharingMapper. Class was: %s",
+                               mapper1.getClass().getName());
 
     // single threaded call to setup the sharing mapper
     mapper.setupSharedInstance(ctx);