You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2011/09/09 09:45:17 UTC
svn commit: r1167027 [1/2] - in /mahout/trunk: ./
core/src/main/java/org/apache/mahout/cf/taste/hadoop/
core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/
core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/
core/src/main/java/org/...
Author: ssc
Date: Fri Sep 9 07:45:16 2011
New Revision: 1167027
URL: http://svn.apache.org/viewvc?rev=1167027&view=rev
Log:
MAHOUT-767 Improve RowSimilarityJob performance
Added:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java
- copied, changed from r1164967, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java
- copied, changed from r1151818, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsReducer.java
- copied, changed from r1151818, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorsReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/ClassUtils.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CityBlockSimilarity.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CooccurrenceCountSimilarity.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CosineSimilarity.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CountbasedMeasure.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/EuclideanDistanceSimilarity.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/LoglikelihoodSimilarity.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/PearsonCorrelationSimilarity.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/TanimotoCoefficientSimilarity.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/VectorSimilarityMeasure.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/VectorSimilarityMeasures.java
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducerTest.java
- copied, changed from r1164967, mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducerTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/VectorSimilarityMeasuresTest.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorsReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/Cooccurrence.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/SimilarityMatrixEntryKey.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/SimilarityType.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedOccurrence.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedOccurrenceArray.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedRowPair.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapperTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/PredictionJobTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducerTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJobTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/graph/linkanalysis/PageRankJobTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/TestRowSimilarityJob.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/vector/
mahout/trunk/integration/src/test/java/org/apache/mahout/utils/eval/ParallelFactorizationEvaluatorTest.java
mahout/trunk/math/src/main/java/org/apache/mahout/math/AbstractVector.java
mahout/trunk/pom.xml
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java?rev=1167027&r1=1167026&r2=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java Fri Sep 9 07:45:16 2011
@@ -17,8 +17,12 @@
package org.apache.mahout.cf.taste.hadoop;
+import com.google.common.io.Closeables;
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.iterator.sequencefile.PathFilters;
@@ -28,6 +32,7 @@ import org.apache.mahout.math.VarIntWrit
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.map.OpenIntLongHashMap;
+import java.io.IOException;
import java.util.regex.Pattern;
/**
@@ -38,8 +43,7 @@ public final class TasteHadoopUtils {
/** Standard delimiter of textual preference data */
private static final Pattern PREFERENCE_TOKEN_DELIMITER = Pattern.compile("[\t,]");
- private TasteHadoopUtils() {
- }
+ private TasteHadoopUtils() {}
/**
* Splits a preference data line into string tokens
@@ -73,4 +77,24 @@ public final class TasteHadoopUtils {
return indexItemIDMap;
}
+ public static void writeInt(int value, Path path, Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ FSDataOutputStream out = fs.create(path);
+ try {
+ out.writeInt(value);
+ } finally {
+ Closeables.closeQuietly(out);
+ }
+ }
+
+ public static int readInt(Path path, Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ FSDataInputStream in = fs.open(path);
+ try {
+ return in.readInt();
+ } finally {
+ Closeables.closeQuietly(in);
+ }
+ }
+
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java?rev=1167027&r1=1167026&r2=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java Fri Sep 9 07:45:16 2011
@@ -30,13 +30,15 @@ import java.util.regex.Pattern;
public abstract class ToEntityPrefsMapper extends
Mapper<LongWritable,Text, VarLongWritable,VarLongWritable> {
- public static final String TRANSPOSE_USER_ITEM = "transposeUserItem";
+ public static final String TRANSPOSE_USER_ITEM = ToEntityPrefsMapper.class + "transposeUserItem";
+ public static final String RATING_SHIFT = ToEntityPrefsMapper.class + "shiftRatings";
private static final Pattern DELIMITER = Pattern.compile("[\t,]");
private boolean booleanData;
private boolean transpose;
private final boolean itemKey;
+ private float ratingShift;
ToEntityPrefsMapper(boolean itemKey) {
this.itemKey = itemKey;
@@ -47,6 +49,7 @@ public abstract class ToEntityPrefsMappe
Configuration jobConf = context.getConfiguration();
booleanData = jobConf.getBoolean(RecommenderJob.BOOLEAN_DATA, false);
transpose = jobConf.getBoolean(TRANSPOSE_USER_ITEM, false);
+ ratingShift = Float.parseFloat(jobConf.get(RATING_SHIFT, String.valueOf(0f)));
}
@Override
@@ -67,7 +70,7 @@ public abstract class ToEntityPrefsMappe
if (booleanData) {
context.write(new VarLongWritable(userID), new VarLongWritable(itemID));
} else {
- float prefValue = tokens.length > 2 ? Float.parseFloat(tokens[2]) : 1.0f;
+ float prefValue = tokens.length > 2 ? Float.parseFloat(tokens[2]) + ratingShift : 1.0f;
context.write(new VarLongWritable(userID), new EntityPrefWritable(itemID, prefValue));
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=1167027&r1=1167026&r2=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Fri Sep 9 07:45:16 2011
@@ -19,7 +19,6 @@ package org.apache.mahout.cf.taste.hadoo
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
@@ -29,22 +28,17 @@ import org.apache.hadoop.mapreduce.lib.i
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
-import org.apache.mahout.cf.taste.hadoop.MaybePruneRowsMapper;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
-import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
-import org.apache.mahout.cf.taste.hadoop.similarity.item.ToItemVectorsReducer;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.cf.taste.hadoop.preparation.PreparePreferenceMatrixJob;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.iterator.sequencefile.PathType;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
-import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.hadoop.DistributedRowMatrix;
-import org.apache.mahout.math.hadoop.similarity.RowSimilarityJob;
-import org.apache.mahout.math.hadoop.similarity.SimilarityType;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasures;
-import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
@@ -71,7 +65,7 @@ import java.util.regex.Pattern;
* <li>-Dmapred.input.dir=(path): Directory containing one or more text files with the preference data</li>
* <li>-Dmapred.output.dir=(path): output path where recommender output should go</li>
* <li>--similarityClassname (classname): Name of distributed similarity class to instantiate or a predefined similarity
- * from {@link SimilarityType}</li>
+ * from {@link org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasure}</li>
* <li>--usersFile (path): only compute recommendations for user IDs contained in this file (optional)</li>
* <li>--itemsFile (path): only include item IDs from this file in the recommendations (optional)</li>
* <li>--filterFile (path): file containing comma-separated userID,itemID pairs. Used to exclude the item from the
@@ -94,11 +88,11 @@ public final class RecommenderJob extend
public static final String BOOLEAN_DATA = "booleanData";
private static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM = 100;
- private static final int DEFAULT_MAX_COOCCURRENCES_PER_ITEM = 100;
+ private static final int DEFAULT_MAX_PREFS_PER_USER = 1000;
private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
@Override
- public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
+ public int run(String[] args) throws Exception {
addInputOption();
addOutputOption();
@@ -116,19 +110,17 @@ public final class RecommenderJob extend
+ "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')', String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
addOption("maxSimilaritiesPerItem", "m", "Maximum number of similarities considered per item ",
String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ITEM));
- addOption("maxCooccurrencesPerItem", "mo", "try to cap the number of cooccurrences per item to this "
- + "number (default: " + DEFAULT_MAX_COOCCURRENCES_PER_ITEM + ')',
- String.valueOf(DEFAULT_MAX_COOCCURRENCES_PER_ITEM));
- addOption("similarityClassname", "s", "Name of distributed similarity class to instantiate, alternatively use "
- + "one of the predefined similarities (" + SimilarityType.listEnumNames() + ')',
- String.valueOf(SimilarityType.SIMILARITY_COOCCURRENCE));
+ addOption("maxPrefsPerUserInItemSimilarity", "mppuiis", "max number of preferences to consider per user in the " +
+ "item similarity computation phase, users with more preferences will be sampled down (default: " +
+ DEFAULT_MAX_PREFS_PER_USER + ")", String.valueOf(DEFAULT_MAX_PREFS_PER_USER));
+ addOption("similarityClassname", "s", "Name of distributed similarity measures class to instantiate, " +
+ "alternatively use one of the predefined similarities (" + VectorSimilarityMeasures.list() + ')');
Map<String,String> parsedArgs = parseArguments(args);
if (parsedArgs == null) {
return -1;
}
- Path inputPath = getInputPath();
Path outputPath = getOutputPath();
int numRecommendations = Integer.parseInt(parsedArgs.get("--numRecommendations"));
String usersFile = parsedArgs.get("--usersFile");
@@ -137,14 +129,11 @@ public final class RecommenderJob extend
boolean booleanData = Boolean.valueOf(parsedArgs.get("--booleanData"));
int maxPrefsPerUser = Integer.parseInt(parsedArgs.get("--maxPrefsPerUser"));
int minPrefsPerUser = Integer.parseInt(parsedArgs.get("--minPrefsPerUser"));
+ int maxPrefsPerUserInItemSimilarity = Integer.parseInt(parsedArgs.get("--maxPrefsPerUserInItemSimilarity"));
int maxSimilaritiesPerItem = Integer.parseInt(parsedArgs.get("--maxSimilaritiesPerItem"));
- int maxCooccurrencesPerItem = Integer.parseInt(parsedArgs.get("--maxCooccurrencesPerItem"));
String similarityClassname = parsedArgs.get("--similarityClassname");
- Path userVectorPath = getTempPath("userVectors");
- Path itemIDIndexPath = getTempPath("itemIDIndex");
- Path countUsersPath = getTempPath("countUsers");
- Path itemUserMatrixPath = getTempPath("itemUserMatrix");
+ Path prepPath = getTempPath("preparePreferenceMatrix");
Path similarityMatrixPath = getTempPath("similarityMatrix");
Path prePartialMultiplyPath1 = getTempPath("prePartialMultiply1");
Path prePartialMultiplyPath2 = getTempPath("prePartialMultiply2");
@@ -153,63 +142,38 @@ public final class RecommenderJob extend
AtomicInteger currentPhase = new AtomicInteger();
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- Job itemIDIndex = prepareJob(
- inputPath, itemIDIndexPath, TextInputFormat.class,
- ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class,
- ItemIDIndexReducer.class, VarIntWritable.class, VarLongWritable.class,
- SequenceFileOutputFormat.class);
- itemIDIndex.setCombinerClass(ItemIDIndexReducer.class);
- itemIDIndex.waitForCompletion(true);
- }
-
int numberOfUsers = -1;
+
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- Job toUserVector = prepareJob(
- inputPath, userVectorPath, TextInputFormat.class,
- ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class,
- ToUserVectorReducer.class, VarLongWritable.class, VectorWritable.class,
- SequenceFileOutputFormat.class);
- toUserVector.getConfiguration().setBoolean(BOOLEAN_DATA, booleanData);
- toUserVector.getConfiguration().setInt(ToUserVectorReducer.MIN_PREFERENCES_PER_USER, minPrefsPerUser);
- toUserVector.waitForCompletion(true);
+ ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[] {
+ "--input", getInputPath().toString(),
+ "--output", prepPath.toString(),
+ "--maxPrefsPerUser", String.valueOf(maxPrefsPerUserInItemSimilarity),
+ "--minPrefsPerUser", String.valueOf(minPrefsPerUser),
+ "--booleanData", String.valueOf(booleanData),
+ "--tempDir", getTempPath().toString() });
- numberOfUsers = (int) toUserVector.getCounters().findCounter(ToUserVectorReducer.Counters.USERS).getValue();
+ numberOfUsers = TasteHadoopUtils.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());
}
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- Job maybePruneAndTransponse = prepareJob(userVectorPath,
- itemUserMatrixPath,
- SequenceFileInputFormat.class,
- MaybePruneRowsMapper.class,
- IntWritable.class,
- DistributedRowMatrix.MatrixEntryWritable.class,
- ToItemVectorsReducer.class,
- IntWritable.class,
- VectorWritable.class,
- SequenceFileOutputFormat.class);
- maybePruneAndTransponse.getConfiguration().setInt(MaybePruneRowsMapper.MAX_COOCCURRENCES,
- maxCooccurrencesPerItem);
- maybePruneAndTransponse.waitForCompletion(true);
- }
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+
+ /* special behavior if phase 1 is skipped */
+ if (numberOfUsers == -1) {
+ numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
+ PathType.LIST, null, getConf());
+ }
+
/* Once DistributedRowMatrix uses the hadoop 0.20 API, we should refactor this call to something like
* new DistributedRowMatrix(...).rowSimilarity(...) */
- try {
- if (numberOfUsers == -1){
- numberOfUsers = (int) HadoopUtil.countRecords(userVectorPath, PathType.LIST, null, getConf());
- }
- ToolRunner.run(getConf(), new RowSimilarityJob(), new String[] {
- "-Dmapred.input.dir=" + itemUserMatrixPath,
- "-Dmapred.output.dir=" + similarityMatrixPath,
- "--numberOfColumns", String.valueOf(numberOfUsers),
- "--similarityClassname", similarityClassname,
- "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem + 1),
- "--tempDir", getTempPath().toString() });
- } catch (Exception e) {
- throw new IllegalStateException("item-item-similarity computation failed", e);
- }
+ ToolRunner.run(getConf(), new RowSimilarityJob(), new String[] {
+ "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
+ "--output", similarityMatrixPath.toString(),
+ "--numberOfColumns", String.valueOf(numberOfUsers),
+ "--similarityClassname", similarityClassname,
+ "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem + 1),
+ "--tempDir", getTempPath().toString() });
}
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
@@ -220,10 +184,9 @@ public final class RecommenderJob extend
SequenceFileOutputFormat.class);
prePartialMultiply1.waitForCompletion(true);
- Job prePartialMultiply2 = prepareJob(
- userVectorPath, prePartialMultiplyPath2, SequenceFileInputFormat.class,
- UserVectorSplitterMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
- Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
+ Job prePartialMultiply2 = prepareJob(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
+ prePartialMultiplyPath2, SequenceFileInputFormat.class, UserVectorSplitterMapper.class, VarIntWritable.class,
+ VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
SequenceFileOutputFormat.class);
if (usersFile != null) {
prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);
@@ -271,7 +234,8 @@ public final class RecommenderJob extend
setS3SafeCombinedInputPath(aggregateAndRecommend, getTempPath(), partialMultiplyPath, explicitFilterPath);
}
setIOSort(aggregateAndRecommend);
- aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, itemIDIndexPath.toString());
+ aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH,
+ new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString());
aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations);
aggregateAndRecommendConf.setBoolean(BOOLEAN_DATA, booleanData);
aggregateAndRecommend.waitForCompletion(true);
Copied: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java (from r1164967, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java)
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java?p2=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java&p1=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java&r1=1164967&r2=1167027&rev=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java Fri Sep 9 07:45:16 2011
@@ -44,10 +44,10 @@ import org.apache.mahout.math.VectorWrit
* {@link ItemIDIndexMapper} and {@link ItemIDIndexReducer}.
* </p>
*/
-public final class ToUserVectorReducer extends
+public final class ToUserVectorsReducer extends
Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> {
- public static final String MIN_PREFERENCES_PER_USER = ToUserVectorReducer.class.getName() +
+ public static final String MIN_PREFERENCES_PER_USER = ToUserVectorsReducer.class.getName() +
".minPreferencesPerUser";
private int minPreferences;
Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java?rev=1167027&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java Fri Sep 9 07:45:16 2011
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.preparation;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.cf.taste.hadoop.ToEntityPrefsMapper;
+import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
+import org.apache.mahout.cf.taste.hadoop.item.ItemIDIndexMapper;
+import org.apache.mahout.cf.taste.hadoop.item.ItemIDIndexReducer;
+import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;
+import org.apache.mahout.cf.taste.hadoop.item.ToUserVectorsReducer;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.VectorWritable;
+
+import java.util.Map;
+
+public class PreparePreferenceMatrixJob extends AbstractJob {
+
+ public static final String NUM_USERS = "numUsers.bin";
+ public static final String ITEMID_INDEX = "itemIDIndex";
+ public static final String USER_VECTORS = "userVectors";
+ public static final String RATING_MATRIX = "ratingMatrix";
+
+ private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new PreparePreferenceMatrixJob(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ addInputOption();
+ addOutputOption();
+ addOption("maxPrefsPerUser", "mppu", "max number of preferences to consider per user, " +
+ "users with more preferences will be sampled down");
+ addOption("minPrefsPerUser", "mp", "ignore users with less preferences than this "
+ + "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')', String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
+ addOption("booleanData", "b", "Treat input as without pref values", Boolean.FALSE.toString());
+ addOption("ratingShift", "rs", "shift ratings by this value", String.valueOf(0f));
+
+ Map<String,String> parsedArgs = parseArguments(args);
+ if (parsedArgs == null) {
+ return -1;
+ }
+
+ int minPrefsPerUser = Integer.parseInt(parsedArgs.get("--minPrefsPerUser"));
+ boolean booleanData = Boolean.valueOf(parsedArgs.get("--booleanData"));
+ float ratingShift = Float.parseFloat(parsedArgs.get("--ratingShift"));
+
+ Job itemIDIndex = prepareJob(getInputPath(), getOutputPath(ITEMID_INDEX), TextInputFormat.class,
+ ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class, ItemIDIndexReducer.class,
+ VarIntWritable.class, VarLongWritable.class, SequenceFileOutputFormat.class);
+ itemIDIndex.setCombinerClass(ItemIDIndexReducer.class);
+ itemIDIndex.waitForCompletion(true);
+
+ Job toUserVectors = prepareJob(getInputPath(), getOutputPath(USER_VECTORS), TextInputFormat.class,
+ ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class,
+ ToUserVectorsReducer.class, VarLongWritable.class, VectorWritable.class, SequenceFileOutputFormat.class);
+ toUserVectors.getConfiguration().setBoolean(RecommenderJob.BOOLEAN_DATA, booleanData);
+ toUserVectors.getConfiguration().setInt(ToUserVectorsReducer.MIN_PREFERENCES_PER_USER, minPrefsPerUser);
+ toUserVectors.getConfiguration().set(ToEntityPrefsMapper.RATING_SHIFT, String.valueOf(ratingShift));
+ toUserVectors.waitForCompletion(true);
+
+ int numberOfUsers = (int) toUserVectors.getCounters().findCounter(ToUserVectorsReducer.Counters.USERS).getValue();
+ TasteHadoopUtils.writeInt(numberOfUsers, getOutputPath(NUM_USERS), getConf());
+
+ Job toItemVectors = prepareJob(getOutputPath(USER_VECTORS), getOutputPath(RATING_MATRIX),
+ ToItemVectorsMapper.class, IntWritable.class, VectorWritable.class, ToItemVectorsReducer.class,
+ IntWritable.class, VectorWritable.class);
+ toItemVectors.setCombinerClass(ToItemVectorsReducer.class);
+
+ /* configure sampling regarding the uservectors */
+ if (parsedArgs.containsKey("--maxPrefsPerUser")) {
+ int samplingSize = Integer.parseInt(parsedArgs.get("--maxPrefsPerUser"));
+ toItemVectors.getConfiguration().setInt(ToItemVectorsMapper.SAMPLE_SIZE, samplingSize);
+ }
+
+ toItemVectors.waitForCompletion(true);
+
+ return 0;
+ }
+}
Copied: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java (from r1151818, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapper.java)
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java?p2=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java&p1=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapper.java&r1=1151818&r2=1167027&rev=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java Fri Sep 9 07:45:16 2011
@@ -15,107 +15,58 @@
* limitations under the License.
*/
-package org.apache.mahout.cf.taste.hadoop;
+package org.apache.mahout.cf.taste.hadoop.preparation;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.mahout.cf.taste.common.MinK;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.hadoop.DistributedRowMatrix;
-import org.apache.mahout.math.map.OpenIntIntHashMap;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.Vectors;
import java.io.IOException;
-import java.util.Comparator;
import java.util.Iterator;
+public class ToItemVectorsMapper
+ extends Mapper<VarLongWritable,VectorWritable,IntWritable,VectorWritable> {
-/**
- * tries to limit the number of elements per col to a fixed size and transposes the input afterwards
- */
-public class MaybePruneRowsMapper
- extends Mapper<VarLongWritable,VectorWritable,IntWritable,DistributedRowMatrix.MatrixEntryWritable> {
-
- public static final String MAX_COOCCURRENCES = MaybePruneRowsMapper.class.getName() + ".maxCooccurrences";
-
- private int maxCooccurrences;
- private final OpenIntIntHashMap indexCounts = new OpenIntIntHashMap();
+ public static final String SAMPLE_SIZE = ToItemVectorsMapper.class + ".sampleSize";
enum Elements {
- USED, NEGLECTED
+ USER_RATINGS_USED, USER_RATINGS_NEGLECTED
}
+ private int sampleSize;
+
@Override
protected void setup(Context ctx) throws IOException, InterruptedException {
- super.setup(ctx);
- maxCooccurrences = ctx.getConfiguration().getInt(MAX_COOCCURRENCES, -1);
- if (maxCooccurrences < 1) {
- throw new IllegalStateException("Maximum number of cooccurrences was not correctly set!");
- }
+ sampleSize = ctx.getConfiguration().getInt(SAMPLE_SIZE, Integer.MAX_VALUE);
}
@Override
protected void map(VarLongWritable rowIndex, VectorWritable vectorWritable, Context ctx)
- throws IOException, InterruptedException {
- Vector vector = vectorWritable.get();
- countSeen(vector);
-
- int numElementsBeforePruning = vector.getNumNondefaultElements();
- vector = maybePruneVector(vector);
- int numElementsAfterPruning = vector.getNumNondefaultElements();
-
- ctx.getCounter(Elements.USED).increment(numElementsAfterPruning);
- ctx.getCounter(Elements.NEGLECTED).increment(numElementsBeforePruning - numElementsAfterPruning);
-
- DistributedRowMatrix.MatrixEntryWritable entry = new DistributedRowMatrix.MatrixEntryWritable();
- int colIndex = TasteHadoopUtils.idToIndex(rowIndex.get());
- entry.setCol(colIndex);
- Iterator<Vector.Element> iterator = vector.iterateNonZero();
+ throws IOException, InterruptedException {
+ Vector userRatings = vectorWritable.get();
+
+ int numElementsBeforeSampling = userRatings.getNumNondefaultElements();
+ userRatings = Vectors.maybeSample(userRatings, sampleSize);
+ int numElementsAfterSampling = userRatings.getNumNondefaultElements();
+
+ int column = TasteHadoopUtils.idToIndex(rowIndex.get());
+ VectorWritable itemVector = new VectorWritable(new RandomAccessSparseVector(Integer.MAX_VALUE, 1));
+ itemVector.setWritesLaxPrecision(true);
+
+ Iterator<Vector.Element> iterator = userRatings.iterateNonZero();
while (iterator.hasNext()) {
Vector.Element elem = iterator.next();
- entry.setRow(elem.index());
- entry.setVal(elem.get());
- ctx.write(new IntWritable(elem.index()), entry);
+ itemVector.get().setQuick(column, elem.get());
+ ctx.write(new IntWritable(elem.index()), itemVector);
}
- }
- private void countSeen(Vector vector) {
- Iterator<Vector.Element> it = vector.iterateNonZero();
- while (it.hasNext()) {
- int index = it.next().index();
- indexCounts.adjustOrPutValue(index, 1, 1);
- }
+ ctx.getCounter(Elements.USER_RATINGS_USED).increment(numElementsAfterSampling);
+ ctx.getCounter(Elements.USER_RATINGS_NEGLECTED).increment(numElementsBeforeSampling - numElementsAfterSampling);
}
- private Vector maybePruneVector(Vector vector) {
- if (vector.getNumNondefaultElements() <= maxCooccurrences) {
- return vector;
- }
-
- MinK<Integer> smallCounts = new MinK<Integer>(maxCooccurrences, new Comparator<Integer>() {
- @Override
- public int compare(Integer one, Integer two) {
- return one.compareTo(two);
- }
- });
-
- Iterator<Vector.Element> it = vector.iterateNonZero();
- while (it.hasNext()) {
- int count = indexCounts.get(it.next().index());
- smallCounts.offer(count);
- }
-
- int greatestSmallCount = smallCounts.greatestSmall();
- if (greatestSmallCount > 0) {
- Iterator<Vector.Element> it2 = vector.iterateNonZero();
- while (it2.hasNext()) {
- Vector.Element e = it2.next();
- if (indexCounts.get(e.index()) > greatestSmallCount) {
- e.set(0.0);
- }
- }
- }
- return vector;
- }
}
Copied: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsReducer.java (from r1151818, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorsReducer.java)
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsReducer.java?p2=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsReducer.java&p1=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorsReducer.java&r1=1151818&r2=1167027&rev=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorsReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsReducer.java Fri Sep 9 07:45:16 2011
@@ -15,30 +15,21 @@
* limitations under the License.
*/
-package org.apache.mahout.cf.taste.hadoop.similarity.item;
+package org.apache.mahout.cf.taste.hadoop.preparation;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.hadoop.DistributedRowMatrix;
import java.io.IOException;
-public class ToItemVectorsReducer
- extends Reducer<IntWritable, DistributedRowMatrix.MatrixEntryWritable,IntWritable, VectorWritable> {
-
- @Override
- protected void reduce(IntWritable rowIndex, Iterable<DistributedRowMatrix.MatrixEntryWritable> values, Context ctx)
- throws IOException, InterruptedException {
+public class ToItemVectorsReducer extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
- Vector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
- for (DistributedRowMatrix.MatrixEntryWritable entry : values) {
- vector.setQuick(entry.getCol(), entry.getVal());
- }
- VectorWritable vectorWritable = new VectorWritable(vector);
+ @Override
+ protected void reduce(IntWritable row, Iterable<VectorWritable> vectors, Context ctx)
+ throws IOException, InterruptedException {
+ VectorWritable vectorWritable = VectorWritable.merge(vectors.iterator());
vectorWritable.setWritesLaxPrecision(true);
- ctx.write(rowIndex, vectorWritable);
+ ctx.write(row, vectorWritable);
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java?rev=1167027&r1=1167026&r2=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java Fri Sep 9 07:45:16 2011
@@ -24,28 +24,16 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
-import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
-import org.apache.mahout.cf.taste.hadoop.MaybePruneRowsMapper;
-import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
-import org.apache.mahout.cf.taste.hadoop.item.ItemIDIndexMapper;
-import org.apache.mahout.cf.taste.hadoop.item.ItemIDIndexReducer;
-import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;
-import org.apache.mahout.cf.taste.hadoop.item.ToUserVectorReducer;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.cf.taste.hadoop.preparation.PreparePreferenceMatrixJob;
import org.apache.mahout.common.AbstractJob;
-import org.apache.mahout.math.VarIntWritable;
-import org.apache.mahout.math.VarLongWritable;
-import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.hadoop.DistributedRowMatrix;
-import org.apache.mahout.math.hadoop.similarity.RowSimilarityJob;
-import org.apache.mahout.math.hadoop.similarity.SimilarityType;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasures;
/**
* <p>Distributed precomputation of the item-item-similarities for Itembased Collaborative Filtering</p>
@@ -67,8 +55,8 @@ import org.apache.mahout.math.hadoop.sim
* <ol>
* <li>-Dmapred.input.dir=(path): Directory containing one or more text files with the preference data</li>
* <li>-Dmapred.output.dir=(path): output path where similarity data should be written</li>
- * <li>--similarityClassname (classname): Name of distributed similarity class to instantiate or a predefined similarity
- * from {@link SimilarityType}</li>
+ * <li>--similarityClassname (classname): Name of distributed similarity measure class to instantiate or a predefined similarity
+ * from {@link org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasure}</li>
* <li>--maxSimilaritiesPerItem (integer): Maximum number of similarities considered per item (100)</li>
* <li>--maxCooccurrencesPerItem (integer): Maximum number of cooccurrences considered per item (100)</li>
* <li>--booleanData (boolean): Treat input data as having no pref values (false)</li>
@@ -84,7 +72,7 @@ public final class ItemSimilarityJob ext
static final String MAX_SIMILARITIES_PER_ITEM = ItemSimilarityJob.class.getName() + ".maxSimilarItemsPerItem";
private static final int DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM = 100;
- private static final int DEFAULT_MAX_COOCCURRENCES_PER_ITEM = 100;
+ private static final int DEFAULT_MAX_PREFS_PER_USER = 1000;
private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
public static void main(String[] args) throws Exception {
@@ -96,17 +84,17 @@ public final class ItemSimilarityJob ext
addInputOption();
addOutputOption();
- addOption("similarityClassname", "s", "Name of distributed similarity class to instantiate, alternatively use "
- + "one of the predefined similarities (" + SimilarityType.listEnumNames() + ')');
+ addOption("similarityClassname", "s", "Name of distributed similarity measures class to instantiate, " +
+ "alternatively use one of the predefined similarities (" + VectorSimilarityMeasures.list() + ')');
addOption("maxSimilaritiesPerItem", "m", "try to cap the number of similar items per item to this number "
+ "(default: " + DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM + ')',
String.valueOf(DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM));
- addOption("maxCooccurrencesPerItem", "mo", "try to cap the number of cooccurrences per item to this number "
- + "(default: " + DEFAULT_MAX_COOCCURRENCES_PER_ITEM + ')',
- String.valueOf(DEFAULT_MAX_COOCCURRENCES_PER_ITEM));
+ addOption("maxPrefsPerUser", "mppu", "max number of preferences to consider per user, " +
+ "users with more preferences will be sampled down (default: " + DEFAULT_MAX_PREFS_PER_USER + ")",
+ String.valueOf(DEFAULT_MAX_PREFS_PER_USER));
addOption("minPrefsPerUser", "mp", "ignore users with less preferences than this "
+ "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')', String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
- addOption("booleanData", "b", "Treat input as without pref values", Boolean.FALSE.toString());
+ addOption("booleanData", "b", "Treat input as without pref values", String.valueOf(Boolean.FALSE));
Map<String,String> parsedArgs = parseArguments(args);
if (parsedArgs == null) {
@@ -115,79 +103,38 @@ public final class ItemSimilarityJob ext
String similarityClassName = parsedArgs.get("--similarityClassname");
int maxSimilarItemsPerItem = Integer.parseInt(parsedArgs.get("--maxSimilaritiesPerItem"));
- int maxCooccurrencesPerItem = Integer.parseInt(parsedArgs.get("--maxCooccurrencesPerItem"));
+ int maxPrefsPerUser = Integer.parseInt(parsedArgs.get("--maxPrefsPerUser"));
int minPrefsPerUser = Integer.parseInt(parsedArgs.get("--minPrefsPerUser"));
boolean booleanData = Boolean.valueOf(parsedArgs.get("--booleanData"));
- Path inputPath = getInputPath();
- Path outputPath = getOutputPath();
-
- Path itemIDIndexPath = getTempPath("itemIDIndex");
- Path userVectorPath = getTempPath("userVectors");
- Path itemUserMatrixPath = getTempPath("itemUserMatrix");
Path similarityMatrixPath = getTempPath("similarityMatrix");
+ Path prepPath = getTempPath("prepareRatingMatrix");
AtomicInteger currentPhase = new AtomicInteger();
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- Job itemIDIndex = prepareJob(
- inputPath, itemIDIndexPath, TextInputFormat.class,
- ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class,
- ItemIDIndexReducer.class, VarIntWritable.class, VarLongWritable.class,
- SequenceFileOutputFormat.class);
- itemIDIndex.setCombinerClass(ItemIDIndexReducer.class);
- itemIDIndex.waitForCompletion(true);
- }
-
- int numberOfUsers = 0;
-
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- Job toUserVector = prepareJob(inputPath,
- userVectorPath,
- TextInputFormat.class,
- ToItemPrefsMapper.class,
- VarLongWritable.class,
- booleanData ? VarLongWritable.class : EntityPrefWritable.class,
- ToUserVectorReducer.class,
- VarLongWritable.class,
- VectorWritable.class,
- SequenceFileOutputFormat.class);
- toUserVector.getConfiguration().setBoolean(RecommenderJob.BOOLEAN_DATA, booleanData);
- toUserVector.getConfiguration().setInt(ToUserVectorReducer.MIN_PREFERENCES_PER_USER, minPrefsPerUser);
- toUserVector.waitForCompletion(true);
+ ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{
+ "--input", getInputPath().toString(),
+ "--output", prepPath.toString(),
+ "--maxPrefsPerUser", String.valueOf(maxPrefsPerUser),
+ "--minPrefsPerUser", String.valueOf(minPrefsPerUser),
+ "--booleanData", String.valueOf(booleanData),
+ "--tempDir", getTempPath().toString()});
- numberOfUsers = (int) toUserVector.getCounters().findCounter(ToUserVectorReducer.Counters.USERS).getValue();
- }
-
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- Job maybePruneAndTransponse = prepareJob(userVectorPath,
- itemUserMatrixPath,
- SequenceFileInputFormat.class,
- MaybePruneRowsMapper.class,
- IntWritable.class,
- DistributedRowMatrix.MatrixEntryWritable.class,
- ToItemVectorsReducer.class,
- IntWritable.class,
- VectorWritable.class,
- SequenceFileOutputFormat.class);
- maybePruneAndTransponse.getConfiguration().setInt(MaybePruneRowsMapper.MAX_COOCCURRENCES,
- maxCooccurrencesPerItem);
- maybePruneAndTransponse.waitForCompletion(true);
- }
+ int numberOfUsers = TasteHadoopUtils.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());
/* Once DistributedRowMatrix uses the hadoop 0.20 API, we should refactor this call to something like
* new DistributedRowMatrix(...).rowSimilarity(...) */
ToolRunner.run(getConf(), new RowSimilarityJob(), new String[] {
- "-Dmapred.input.dir=" + itemUserMatrixPath,
- "-Dmapred.output.dir=" + similarityMatrixPath,
- "--numberOfColumns", String.valueOf(numberOfUsers),
- "--similarityClassname", similarityClassName,
- "--maxSimilaritiesPerRow", String.valueOf(maxSimilarItemsPerItem + 1),
- "--tempDir", getTempPath().toString() });
+ "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
+ "--output", similarityMatrixPath.toString(),
+ "--numberOfColumns", String.valueOf(numberOfUsers),
+ "--similarityClassname", similarityClassName,
+ "--maxSimilaritiesPerRow", String.valueOf(maxSimilarItemsPerItem + 1),
+ "--tempDir", getTempPath().toString() });
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job mostSimilarItems = prepareJob(similarityMatrixPath,
- outputPath,
+ getOutputPath(),
SequenceFileInputFormat.class,
MostSimilarItemPairsMapper.class,
EntityEntityWritable.class,
@@ -197,7 +144,8 @@ public final class ItemSimilarityJob ext
DoubleWritable.class,
TextOutputFormat.class);
Configuration mostSimilarItemsConf = mostSimilarItems.getConfiguration();
- mostSimilarItemsConf.set(ITEM_ID_INDEX_PATH_STR, itemIDIndexPath.toString());
+ mostSimilarItemsConf.set(ITEM_ID_INDEX_PATH_STR,
+ new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString());
mostSimilarItemsConf.setInt(MAX_SIMILARITIES_PER_ITEM, maxSimilarItemsPerItem);
mostSimilarItems.setCombinerClass(MostSimilarItemPairsReducer.class);
mostSimilarItems.waitForCompletion(true);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=1167027&r1=1167026&r2=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java Fri Sep 9 07:45:16 2011
@@ -44,6 +44,8 @@ import org.apache.hadoop.mapreduce.Mappe
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.slf4j.Logger;
@@ -123,6 +125,11 @@ public abstract class AbstractJob extend
return outputPath;
}
+ protected Path getOutputPath(String path) {
+ return new Path(outputPath, path);
+ }
+
+
protected Path getTempPath() {
return tempPath;
}
@@ -399,6 +406,48 @@ public abstract class AbstractJob extend
Class<? extends Mapper> mapper,
Class<? extends Writable> mapperKey,
Class<? extends Writable> mapperValue,
+ Class<? extends OutputFormat> outputFormat) throws IOException {
+
+ Job job = new Job(new Configuration(getConf()));
+ Configuration jobConf = job.getConfiguration();
+
+ if (mapper.equals(Mapper.class)) {
+ throw new IllegalStateException("Can't figure out the user class jar file from mapper/reducer");
+ }
+ job.setJarByClass(mapper);
+
+ job.setInputFormatClass(inputFormat);
+ jobConf.set("mapred.input.dir", inputPath.toString());
+
+ job.setMapperClass(mapper);
+ job.setMapOutputKeyClass(mapperKey);
+ job.setMapOutputValueClass(mapperValue);
+
+ jobConf.setBoolean("mapred.compress.map.output", true);
+ job.setNumReduceTasks(0);
+
+
+ job.setJobName(getCustomJobName(job, mapper, Reducer.class));
+
+ job.setOutputFormatClass(outputFormat);
+ jobConf.set("mapred.output.dir", outputPath.toString());
+
+ return job;
+ }
+
+ protected Job prepareJob(Path inputPath, Path outputPath, Class<? extends Mapper> mapper,
+ Class<? extends Writable> mapperKey, Class<? extends Writable> mapperValue, Class<? extends Reducer> reducer,
+ Class<? extends Writable> reducerKey, Class<? extends Writable> reducerValue) throws IOException {
+ return prepareJob(inputPath, outputPath, SequenceFileInputFormat.class, mapper, mapperKey, mapperValue, reducer,
+ reducerKey, reducerValue, SequenceFileOutputFormat.class);
+ }
+
+ protected Job prepareJob(Path inputPath,
+ Path outputPath,
+ Class<? extends InputFormat> inputFormat,
+ Class<? extends Mapper> mapper,
+ Class<? extends Writable> mapperKey,
+ Class<? extends Writable> mapperValue,
Class<? extends Reducer> reducer,
Class<? extends Writable> reducerKey,
Class<? extends Writable> reducerValue,
Added: mahout/trunk/core/src/main/java/org/apache/mahout/common/ClassUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/ClassUtils.java?rev=1167027&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/ClassUtils.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/ClassUtils.java Fri Sep 9 07:45:16 2011
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+public class ClassUtils {
+
+ private ClassUtils() {}
+
+ public static <T> T instantiateAs(String classname, Class<T> asSubclassOfClass) {
+ try {
+ return Class.forName(classname).asSubclass(asSubclassOfClass).newInstance();
+ } catch (ClassNotFoundException cnfe) {
+ throw new IllegalStateException(cnfe);
+ } catch (InstantiationException ie) {
+ throw new IllegalStateException(ie);
+ } catch (IllegalAccessException iae) {
+ throw new IllegalStateException(iae);
+ }
+ }
+
+ public static <T> T instantiateAs(Class<? extends T> clazz, Class<T> asSubclassOfClass) {
+ try {
+ return clazz.asSubclass(asSubclassOfClass).newInstance();
+ } catch (InstantiationException ie) {
+ throw new IllegalStateException(ie);
+ } catch (IllegalAccessException iae) {
+ throw new IllegalStateException(iae);
+ }
+ }
+}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java?rev=1167027&r1=1167026&r2=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java Fri Sep 9 07:45:16 2011
@@ -106,6 +106,9 @@ public class PageRankJob extends Abstrac
addOption("teleportationProbability", "tp", "probability to teleport to a random vertex", String.valueOf(0.8));
Map<String, String> parsedArgs = parseArguments(args);
+ if (parsedArgs == null) {
+ return -1;
+ }
Path vertexIndex = new Path(parsedArgs.get("--vertexIndex"));
Path edges = new Path(parsedArgs.get("--edges"));
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java?rev=1167027&r1=1167026&r2=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java Fri Sep 9 07:45:16 2011
@@ -21,6 +21,9 @@ import org.apache.hadoop.conf.Configured
import org.apache.hadoop.io.Writable;
import com.google.common.base.Preconditions;
+import org.apache.mahout.math.map.OpenDoubleIntHashMap;
+import org.apache.mahout.math.map.OpenIntDoubleHashMap;
+import org.apache.mahout.math.map.OpenIntIntHashMap;
import java.io.DataInput;
import java.io.DataOutput;
@@ -125,7 +128,7 @@ public final class VectorWritable extend
public static void writeVector(DataOutput out, Vector vector) throws IOException {
writeVector(out, vector, false);
}
-
+
public static void writeVector(DataOutput out, Vector vector, boolean laxPrecision) throws IOException {
boolean dense = vector.isDense();
boolean sequential = vector.isSequentialAccess();
@@ -186,6 +189,21 @@ public final class VectorWritable extend
return v.get();
}
+ public static VectorWritable merge(Iterator<VectorWritable> vectors) {
+ Vector accumulator = vectors.next().get();
+ while (vectors.hasNext()) {
+ VectorWritable v = vectors.next();
+ if (v != null) {
+ Iterator<Vector.Element> nonZeroElements = v.get().iterateNonZero();
+ while (nonZeroElements.hasNext()) {
+ Vector.Element nonZeroElement = nonZeroElements.next();
+ accumulator.setQuick(nonZeroElement.index(), nonZeroElement.get());
+ }
+ }
+ }
+ return new VectorWritable(accumulator);
+ }
+
@Override
public boolean equals(Object o) {
if (o instanceof VectorWritable) {
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java?rev=1167027&r1=1167026&r2=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java Fri Sep 9 07:45:16 2011
@@ -127,28 +127,13 @@ public class TransposeJob extends Abstra
}
}
- static Vector merge(Iterator<VectorWritable> vectors) {
- Vector accumulator = vectors.next().get();
- while (vectors.hasNext()) {
- VectorWritable v = vectors.next();
- if (v != null) {
- Iterator<Vector.Element> nonZeroElements = v.get().iterateNonZero();
- while (nonZeroElements.hasNext()) {
- Vector.Element nonZeroElement = nonZeroElements.next();
- accumulator.setQuick(nonZeroElement.index(), nonZeroElement.get());
- }
- }
- }
- return accumulator;
- }
-
public static class MergeVectorsCombiner extends MapReduceBase
implements Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable> {
@Override
public void reduce(WritableComparable<?> key, Iterator<VectorWritable> vectors,
OutputCollector<WritableComparable<?>, VectorWritable> out, Reporter reporter) throws IOException {
- out.collect(key, new VectorWritable(merge(vectors)));
+ out.collect(key, VectorWritable.merge(vectors));
}
}
@@ -158,7 +143,8 @@ public class TransposeJob extends Abstra
@Override
public void reduce(WritableComparable<?> key, Iterator<VectorWritable> vectors,
OutputCollector<WritableComparable<?>, VectorWritable> out, Reporter reporter) throws IOException {
- out.collect(key, new VectorWritable(new SequentialAccessSparseVector(merge(vectors))));
+ Vector merged = VectorWritable.merge(vectors).get();
+ out.collect(key, new VectorWritable(new SequentialAccessSparseVector(merged)));
}
}
}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java?rev=1167027&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java Fri Sep 9 07:45:16 2011
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.similarity.cooccurrence;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.common.TopK;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.mapreduce.VectorSumReducer;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasures;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasure;
+import org.apache.mahout.math.map.OpenIntDoubleHashMap;
+import org.apache.mahout.math.map.OpenIntIntHashMap;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RowSimilarityJob extends AbstractJob {
+
+ static final String SIMILARITY_CLASSNAME = RowSimilarityJob.class + ".distributedSimilarityClassname";
+ static final String NUMBER_OF_COLUMNS = RowSimilarityJob.class + ".numberOfColumns";
+ static final String MAX_SIMILARITIES_PER_ROW = RowSimilarityJob.class + ".maxSimilaritiesPerRow";
+ static final String EXCLUDE_SELF_SIMILARITY = RowSimilarityJob.class + ".excludeSelfSimilarity";
+ static final String THRESHOLD = RowSimilarityJob.class + ".threshold";
+
+ static final String NORMS_PATH = RowSimilarityJob.class + ".normsPath";
+ static final String MAXVALUES_PATH = RowSimilarityJob.class + ".maxWeightsPath";
+ static final String NUM_NON_ZERO_ENTRIES_PATH = RowSimilarityJob.class + ".nonZeroEntriesPath";
+
+ private static final int DEFAULT_MAX_SIMILARITIES_PER_ROW = 100;
+ private static final double NO_THRESHOLD = Double.MIN_VALUE;
+
+ private static final int NORM_VECTOR_MARKER = Integer.MIN_VALUE;
+ private static final int MAXVALUE_VECTOR_MARKER = Integer.MIN_VALUE + 1;
+ private static final int NUM_NON_ZERO_ENTRIES_VECTOR_MARKER = Integer.MIN_VALUE + 2;
+
+ static enum Counters { ROWS, COOCCURRENCES, PRUNED_COOCCURRENCES }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new RowSimilarityJob(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ addInputOption();
+ addOutputOption();
+ addOption("numberOfColumns", "r", "Number of columns in the input matrix");
+ addOption("similarityClassname", "s", "Name of distributed similarity class to instantiate, alternatively use "
+ + "one of the predefined similarities (" + VectorSimilarityMeasures.list() + ')');
+ addOption("maxSimilaritiesPerRow", "m", "Number of maximum similarities per row (default: "
+ + DEFAULT_MAX_SIMILARITIES_PER_ROW + ')', String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ROW));
+ addOption("excludeSelfSimilarity", "ess", "compute similarity of rows to themselves?", String.valueOf(false));
+ addOption("threshold", "tr", "drop row pairs with a similarity value below this");
+
+ Map<String,String> parsedArgs = parseArguments(args);
+ if (parsedArgs == null) {
+ return -1;
+ }
+
+ int numberOfColumns = Integer.parseInt(parsedArgs.get("--numberOfColumns"));
+ String similarityClassnameArg = parsedArgs.get("--similarityClassname");
+ String similarityClassname;
+ try {
+ similarityClassname = VectorSimilarityMeasures.valueOf(similarityClassnameArg).getClassname();
+ } catch (IllegalArgumentException iae) {
+ similarityClassname = similarityClassnameArg;
+ }
+
+ int maxSimilaritiesPerRow = Integer.parseInt(parsedArgs.get("--maxSimilaritiesPerRow"));
+ boolean excludeSelfSimilarity = Boolean.parseBoolean(parsedArgs.get("--excludeSelfSimilarity"));
+ double threshold = parsedArgs.containsKey("--threshold") ?
+ Double.parseDouble(parsedArgs.get("--threshold")) : NO_THRESHOLD;
+
+ Path weightsPath = getTempPath("weights");
+ Path normsPath = getTempPath("norms.bin");
+ Path numNonZeroEntriesPath = getTempPath("numNonZeroEntries.bin");
+ Path maxValuesPath = getTempPath("maxValues.bin");
+ Path pairwiseSimilarityPath = getTempPath("pairwiseSimilarity");
+
+ AtomicInteger currentPhase = new AtomicInteger();
+
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, VectorNormMapper.class, IntWritable.class,
+ VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class);
+ normsAndTranspose.setCombinerClass(MergeVectorsCombiner.class);
+ Configuration normsAndTransposeConf = normsAndTranspose.getConfiguration();
+ normsAndTransposeConf.set(THRESHOLD, String.valueOf(threshold));
+ normsAndTransposeConf.set(NORMS_PATH, normsPath.toString());
+ normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
+ normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString());
+ normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname);
+ normsAndTranspose.waitForCompletion(true);
+ }
+
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ Job pairwiseSimilarity = prepareJob(weightsPath, pairwiseSimilarityPath, CooccurrencesMapper.class,
+ IntWritable.class, VectorWritable.class, SimilarityReducer.class, IntWritable.class, VectorWritable.class);
+ pairwiseSimilarity.setCombinerClass(VectorSumReducer.class);
+ Configuration pairwiseConf = pairwiseSimilarity.getConfiguration();
+ pairwiseConf.set(THRESHOLD, String.valueOf(threshold));
+ pairwiseConf.set(NORMS_PATH, normsPath.toString());
+ pairwiseConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
+ pairwiseConf.set(MAXVALUES_PATH, maxValuesPath.toString());
+ pairwiseConf.set(SIMILARITY_CLASSNAME, similarityClassname);
+ pairwiseConf.setInt(NUMBER_OF_COLUMNS, numberOfColumns);
+ pairwiseConf.setBoolean(EXCLUDE_SELF_SIMILARITY, excludeSelfSimilarity);
+ pairwiseSimilarity.waitForCompletion(true);
+ }
+
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ Job asMatrix = prepareJob(pairwiseSimilarityPath, getOutputPath(), UnsymmetrifyMapper.class,
+ IntWritable.class, VectorWritable.class, MergeToTopKSimilaritiesReducer.class, IntWritable.class,
+ VectorWritable.class);
+ asMatrix.setCombinerClass(MergeToTopKSimilaritiesReducer.class);
+ asMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ROW, maxSimilaritiesPerRow);
+ asMatrix.waitForCompletion(true);
+ }
+
+ return 0;
+ }
+
+ public static class VectorNormMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+ private VectorSimilarityMeasure similarity;
+ private Vector norms;
+ private Vector nonZeroEntries;
+ private Vector maxValues;
+ private double threshold;
+
+ @Override
+ protected void setup(Context ctx) throws IOException, InterruptedException {
+ similarity = ClassUtils.instantiateAs(ctx.getConfiguration().get(SIMILARITY_CLASSNAME),
+ VectorSimilarityMeasure.class);
+ norms = new RandomAccessSparseVector(Integer.MAX_VALUE);
+ nonZeroEntries = new RandomAccessSparseVector(Integer.MAX_VALUE);
+ maxValues = new RandomAccessSparseVector(Integer.MAX_VALUE);
+ threshold = Double.parseDouble(ctx.getConfiguration().get(THRESHOLD));
+ }
+
+ @Override
+ protected void map(IntWritable row, VectorWritable vectorWritable, Context ctx)
+ throws IOException, InterruptedException {
+
+ Vector rowVector = similarity.normalize(vectorWritable.get());
+
+ int numNonZeroEntries = 0;
+ double maxValue = Double.MIN_VALUE;
+
+ Iterator<Vector.Element> nonZeroElements = rowVector.iterateNonZero();
+ while (nonZeroElements.hasNext()) {
+ Vector.Element element = nonZeroElements.next();
+ RandomAccessSparseVector partialColumnVector = new RandomAccessSparseVector(Integer.MAX_VALUE);
+ partialColumnVector.setQuick(row.get(), element.get());
+ ctx.write(new IntWritable(element.index()), new VectorWritable(partialColumnVector));
+
+ numNonZeroEntries++;
+ if (maxValue < element.get()) {
+ maxValue = element.get();
+ }
+ }
+
+ if (threshold != NO_THRESHOLD) {
+ nonZeroEntries.setQuick(row.get(), numNonZeroEntries);
+ maxValues.setQuick(row.get(), maxValue);
+ }
+ norms.setQuick(row.get(), similarity.norm(rowVector));
+
+ ctx.getCounter(Counters.ROWS).increment(1);
+ }
+
+ @Override
+ protected void cleanup(Context ctx) throws IOException, InterruptedException {
+ super.cleanup(ctx);
+ // dirty trick
+ ctx.write(new IntWritable(NORM_VECTOR_MARKER), new VectorWritable(norms));
+ ctx.write(new IntWritable(NUM_NON_ZERO_ENTRIES_VECTOR_MARKER), new VectorWritable(nonZeroEntries));
+ ctx.write(new IntWritable(MAXVALUE_VECTOR_MARKER), new VectorWritable(maxValues));
+ }
+ }
+
+ public static class MergeVectorsCombiner extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+ @Override
+ protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx)
+ throws IOException, InterruptedException {
+ ctx.write(row, new VectorWritable(Vectors.merge(partialVectors)));
+ }
+ }
+
+ public static class MergeVectorsReducer extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+ private Path normsPath;
+ private Path numNonZeroEntriesPath;
+ private Path maxValuesPath;
+
+ @Override
+ protected void setup(Context ctx) throws IOException, InterruptedException {
+ normsPath = new Path(ctx.getConfiguration().get(NORMS_PATH));
+ numNonZeroEntriesPath = new Path(ctx.getConfiguration().get(NUM_NON_ZERO_ENTRIES_PATH));
+ maxValuesPath = new Path(ctx.getConfiguration().get(MAXVALUES_PATH));
+ }
+
+ @Override
+ protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx)
+ throws IOException, InterruptedException {
+ Vector partialVector = Vectors.merge(partialVectors);
+
+ if (row.get() == NORM_VECTOR_MARKER) {
+ Vectors.write(partialVector, normsPath, ctx.getConfiguration());
+ } else if (row.get() == MAXVALUE_VECTOR_MARKER) {
+ Vectors.write(partialVector, maxValuesPath, ctx.getConfiguration());
+ } else if (row.get() == NUM_NON_ZERO_ENTRIES_VECTOR_MARKER) {
+ Vectors.write(partialVector, numNonZeroEntriesPath, ctx.getConfiguration(), true);
+ } else {
+ ctx.write(row, new VectorWritable(partialVector));
+ }
+ }
+ }
+
+
+ public static class CooccurrencesMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+ private VectorSimilarityMeasure similarity;
+
+ private OpenIntIntHashMap numNonZeroEntries;
+ private Vector maxValues;
+ private double threshold;
+
+ private static final Comparator<Vector.Element> BY_INDEX = new Comparator<Vector.Element>() {
+ @Override
+ public int compare(Vector.Element one, Vector.Element two) {
+ return Ints.compare(one.index(), two.index());
+ }
+ };
+
+ @Override
+ protected void setup(Context ctx) throws IOException, InterruptedException {
+ similarity = ClassUtils.instantiateAs(ctx.getConfiguration().get(SIMILARITY_CLASSNAME),
+ VectorSimilarityMeasure.class);
+ numNonZeroEntries = Vectors.readAsIntMap(new Path(ctx.getConfiguration().get(NUM_NON_ZERO_ENTRIES_PATH)),
+ ctx.getConfiguration());
+ maxValues = Vectors.read(new Path(ctx.getConfiguration().get(MAXVALUES_PATH)), ctx.getConfiguration());
+ threshold = Double.parseDouble(ctx.getConfiguration().get(THRESHOLD));
+ }
+
+ private boolean consider(Vector.Element occurrenceA, Vector.Element occurrenceB) {
+ int numNonZeroEntriesA = numNonZeroEntries.get(occurrenceA.index());
+ int numNonZeroEntriesB = numNonZeroEntries.get(occurrenceB.index());
+
+ double maxValueA = maxValues.get(occurrenceA.index());
+
+ return similarity.consider(numNonZeroEntriesA, numNonZeroEntriesB, maxValueA, threshold);
+ }
+
+ @Override
+ protected void map(IntWritable column, VectorWritable occurrenceVector, Context ctx)
+ throws IOException, InterruptedException {
+ Vector.Element[] occurrences = Vectors.toArray(occurrenceVector);
+ Arrays.sort(occurrences, BY_INDEX);
+
+ int cooccurrences = 0;
+ int prunedCooccurrences = 0;
+ for (int n = 0; n < occurrences.length; n++) {
+ Vector.Element occurrenceA = occurrences[n];
+ Vector dots = new RandomAccessSparseVector(Integer.MAX_VALUE);
+ for (int m = n; m < occurrences.length; m++) {
+ Vector.Element occurrenceB = occurrences[m];
+ if (threshold == NO_THRESHOLD || consider(occurrenceA, occurrenceB)) {
+ dots.setQuick(occurrenceB.index(), similarity.aggregate(occurrenceA.get(), occurrenceB.get()));
+ cooccurrences++;
+ } else {
+ prunedCooccurrences++;
+ }
+ }
+ ctx.write(new IntWritable(occurrenceA.index()), new VectorWritable(dots));
+ }
+ ctx.getCounter(Counters.COOCCURRENCES).increment(cooccurrences);
+ ctx.getCounter(Counters.PRUNED_COOCCURRENCES).increment(prunedCooccurrences);
+ }
+ }
+
+
+ public static class SimilarityReducer
+ extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+ private VectorSimilarityMeasure similarity;
+ private int numberOfColumns;
+ private boolean excludeSelfSimilarity;
+ private Vector norms;
+ private double treshold;
+
+ @Override
+ protected void setup(Context ctx) throws IOException, InterruptedException {
+ similarity = ClassUtils.instantiateAs(ctx.getConfiguration().get(SIMILARITY_CLASSNAME),
+ VectorSimilarityMeasure.class);
+ numberOfColumns = ctx.getConfiguration().getInt(NUMBER_OF_COLUMNS, -1);
+ Preconditions.checkArgument(numberOfColumns > 0, "Incorrect number of columns!");
+ excludeSelfSimilarity = ctx.getConfiguration().getBoolean(EXCLUDE_SELF_SIMILARITY, false);
+ norms = Vectors.read(new Path(ctx.getConfiguration().get(NORMS_PATH)), ctx.getConfiguration());
+ treshold = Double.parseDouble(ctx.getConfiguration().get(THRESHOLD));
+ }
+
+ @Override
+ protected void reduce(IntWritable row, Iterable<VectorWritable> partialDots, Context ctx)
+ throws IOException, InterruptedException {
+ Iterator<VectorWritable> partialDotsIterator = partialDots.iterator();
+ Vector dots = partialDotsIterator.next().get();
+ while (partialDotsIterator.hasNext()) {
+ Vector toAdd = partialDotsIterator.next().get();
+ Iterator<Vector.Element> nonZeroElements = toAdd.iterateNonZero();
+ while (nonZeroElements.hasNext()) {
+ Vector.Element nonZeroElement = nonZeroElements.next();
+ dots.setQuick(nonZeroElement.index(), dots.getQuick(nonZeroElement.index()) + nonZeroElement.get());
+ }
+ }
+
+ Vector similarities = dots.like();
+ double normA = norms.getQuick(row.get());
+ Iterator<Vector.Element> dotsWith = dots.iterateNonZero();
+ while (dotsWith.hasNext()) {
+ Vector.Element b = dotsWith.next();
+ double similarityValue = similarity.similarity(b.get(), normA, norms.getQuick(b.index()), numberOfColumns);
+ if (similarityValue >= treshold) {
+ similarities.set(b.index(), similarityValue);
+ }
+ }
+ if (excludeSelfSimilarity) {
+ similarities.setQuick(row.get(), 0);
+ }
+ ctx.write(row, new VectorWritable(similarities));
+ }
+ }
+
+ public static class UnsymmetrifyMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+ private int maxSimilaritiesPerRow;
+
+ @Override
+ protected void setup(Mapper.Context ctx) throws IOException, InterruptedException {
+ maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0);
+ Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Incorrect maximum number of similarities per row!");
+ }
+
+ @Override
+ protected void map(IntWritable row, VectorWritable similaritiesWritable, Context ctx)
+ throws IOException, InterruptedException {
+ Vector similarities = similaritiesWritable.get();
+ TopK<Vector.Element> topKQueue = new TopK<Vector.Element>(maxSimilaritiesPerRow, Vectors.BY_VALUE);
+ Iterator<Vector.Element> nonZeroElements = similarities.iterateNonZero();
+ while (nonZeroElements.hasNext()) {
+ Vector.Element nonZeroElement = nonZeroElements.next();
+ topKQueue.offer(new Vectors.TemporaryElement(nonZeroElement));
+ Vector transposedPartial = similarities.like();
+ transposedPartial.setQuick(row.get(), nonZeroElement.get());
+ ctx.write(new IntWritable(nonZeroElement.index()), new VectorWritable(transposedPartial));
+ }
+ Vector topKSimilarities = similarities.like();
+ for (Vector.Element topKSimilarity : topKQueue.retrieve()) {
+ topKSimilarities.setQuick(topKSimilarity.index(), topKSimilarity.get());
+ }
+ ctx.write(row, new VectorWritable(topKSimilarities));
+ }
+ }
+
+ public static class MergeToTopKSimilaritiesReducer
+ extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+ private int maxSimilaritiesPerRow;
+
+ @Override
+ protected void setup(Context ctx) throws IOException, InterruptedException {
+ maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0);
+ Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Incorrect maximum number of similarities per row!");
+ }
+
+ @Override
+ protected void reduce(IntWritable row, Iterable<VectorWritable> partials, Context ctx)
+ throws IOException, InterruptedException {
+ Vector allSimilarities = Vectors.merge(partials);
+ Vector topKSimilarities = Vectors.topKElements(maxSimilaritiesPerRow, allSimilarities);
+ ctx.write(row, new VectorWritable(topKSimilarities));
+ }
+ }
+
+}