You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2013/09/17 07:43:05 UTC
svn commit: r1523901 -
/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java
Author: ssc
Date: Tue Sep 17 05:43:04 2013
New Revision: 1523901
URL: http://svn.apache.org/r1523901
Log:
MAHOUT-1335 MultithreadedSharingMapper fails on Hadoop 2
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java?rev=1523901&r1=1523900&r2=1523901&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java Tue Sep 17 05:43:04 2013
@@ -17,7 +17,10 @@
package org.apache.mahout.cf.taste.hadoop.als;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
import org.apache.hadoop.util.ReflectionUtils;
@@ -34,17 +37,21 @@ import java.io.IOException;
*/
public class MultithreadedSharingMapper<K1, V1, K2, V2> extends MultithreadedMapper<K1, V1, K2, V2> {
- private static final String MAPPER_CLASS = "mapred.map.multithreadedrunner.class";
-
@Override
public void run(Context ctx) throws IOException, InterruptedException {
+ Class<Mapper<K1, V1, K2, V2>> mapperClass =
+ MultithreadedSharingMapper.getMapperClass((JobContext) ctx);
+ Preconditions.checkNotNull(mapperClass, "Could not find Multithreaded Mapper class.");
Configuration conf = ctx.getConfiguration();
-
- Class<? extends SharingMapper<K1,V1,K2,V2, ?>> mapperClass =
- (Class<SharingMapper<K1,V1,K2,V2, ?>>) conf.getClass(MAPPER_CLASS, SharingMapper.class);
// instantiate the mapper
- SharingMapper<K1,V1,K2,V2, ?> mapper = ReflectionUtils.newInstance(mapperClass, conf);
+ Mapper<K1, V1, K2, V2> mapper1 = ReflectionUtils.newInstance(mapperClass, conf);
+ SharingMapper<K1, V1, K2, V2, ?> mapper = null;
+ if (mapper1 instanceof SharingMapper) {
+ mapper = (SharingMapper<K1, V1, K2, V2, ?>) mapper1;
+ }
+ Preconditions.checkNotNull(mapper, "Could not instantiate SharingMapper. Class was: %s",
+ mapper1.getClass().getName());
// single threaded call to setup the sharing mapper
mapper.setupSharedInstance(ctx);