You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/04 14:29:50 UTC
[48/53] [abbrv] [partial] mahout git commit: end of day 6-2-2018
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java
new file mode 100644
index 0000000..704c74a
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+public final class PrefAndSimilarityColumnWritable implements Writable {
+
+ private float prefValue;
+ private Vector similarityColumn;
+
+ public PrefAndSimilarityColumnWritable() {
+ }
+
+ public PrefAndSimilarityColumnWritable(float prefValue, Vector similarityColumn) {
+ set(prefValue, similarityColumn);
+ }
+
+ public void set(float prefValue, Vector similarityColumn) {
+ this.prefValue = prefValue;
+ this.similarityColumn = similarityColumn;
+ }
+
+ public float getPrefValue() {
+ return prefValue;
+ }
+
+ public Vector getSimilarityColumn() {
+ return similarityColumn;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ prefValue = in.readFloat();
+ VectorWritable vw = new VectorWritable();
+ vw.readFields(in);
+ similarityColumn = vw.get();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeFloat(prefValue);
+ VectorWritable vw = new VectorWritable(similarityColumn);
+ vw.setWritesLaxPrecision(true);
+ vw.write(out);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof PrefAndSimilarityColumnWritable) {
+ PrefAndSimilarityColumnWritable other = (PrefAndSimilarityColumnWritable) obj;
+ return prefValue == other.prefValue && similarityColumn.equals(other.similarityColumn);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return RandomUtils.hashFloat(prefValue) + 31 * similarityColumn.hashCode();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
new file mode 100644
index 0000000..129db1d
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
+import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.cf.taste.hadoop.preparation.PreparePreferenceMatrixJob;
+import org.apache.mahout.cf.taste.hadoop.similarity.item.ItemSimilarityJob;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasures;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * <p>Runs a completely distributed recommender job as a series of mapreduces.</p>
+ * <p/>
+ * <p>Preferences in the input file should look like {@code userID, itemID[, preferencevalue]}</p>
+ * <p/>
+ * <p>
+ * Preference value is optional to accommodate applications that have no notion of a preference value (that is, the user
+ * simply expresses a preference for an item, but no degree of preference).
+ * </p>
+ * <p/>
+ * <p>
+ * The preference value is assumed to be parseable as a {@code double}. The user IDs and item IDs are
+ * parsed as {@code long}s.
+ * </p>
+ * <p/>
+ * <p>Command line arguments specific to this class are:</p>
+ * <p/>
+ * <ol>
+ * <li>--input(path): Directory containing one or more text files with the preference data</li>
+ * <li>--output(path): output path where recommender output should go</li>
+ * <li>--similarityClassname (classname): Name of vector similarity class to instantiate or a predefined similarity
+ * from {@link org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasure}</li>
+ * <li>--usersFile (path): only compute recommendations for user IDs contained in this file (optional)</li>
+ * <li>--itemsFile (path): only include item IDs from this file in the recommendations (optional)</li>
+ * <li>--filterFile (path): file containing comma-separated userID,itemID pairs. Used to exclude the item from the
+ * recommendations for that user (optional)</li>
+ * <li>--numRecommendations (integer): Number of recommendations to compute per user (10)</li>
+ * <li>--booleanData (boolean): Treat input data as having no pref values (false)</li>
+ * <li>--maxPrefsPerUser (integer): Maximum number of preferences considered per user in final
+ * recommendation phase (10)</li>
+ * <li>--maxSimilaritiesPerItem (integer): Maximum number of similarities considered per item (100)</li>
+ * <li>--minPrefsPerUser (integer): ignore users with less preferences than this in the similarity computation (1)</li>
+ * <li>--maxPrefsPerUserInItemSimilarity (integer): max number of preferences to consider per user in
+ * the item similarity computation phase,
+ * users with more preferences will be sampled down (1000)</li>
+ * <li>--threshold (double): discard item pairs with a similarity value below this</li>
+ * </ol>
+ * <p/>
+ * <p>General command line options are documented in {@link AbstractJob}.</p>
+ * <p/>
+ * <p>Note that because of how Hadoop parses arguments, all "-D" arguments must appear before all other
+ * arguments.</p>
+ */
+public final class RecommenderJob extends AbstractJob {
+
+ public static final String BOOLEAN_DATA = "booleanData";
+ public static final String DEFAULT_PREPARE_PATH = "preparePreferenceMatrix";
+
+ private static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM = 100;
+ private static final int DEFAULT_MAX_PREFS = 500;
+ private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ addInputOption();
+ addOutputOption();
+ addOption("numRecommendations", "n", "Number of recommendations per user",
+ String.valueOf(AggregateAndRecommendReducer.DEFAULT_NUM_RECOMMENDATIONS));
+ addOption("usersFile", null, "File of users to recommend for", null);
+ addOption("itemsFile", null, "File of items to recommend for", null);
+ addOption("filterFile", "f", "File containing comma-separated userID,itemID pairs. Used to exclude the item from "
+ + "the recommendations for that user (optional)", null);
+ addOption("userItemFile", "uif", "File containing comma-separated userID,itemID pairs (optional). "
+ + "Used to include only these items into recommendations. "
+ + "Cannot be used together with usersFile or itemsFile", null);
+ addOption("booleanData", "b", "Treat input as without pref values", Boolean.FALSE.toString());
+ addOption("maxPrefsPerUser", "mxp",
+ "Maximum number of preferences considered per user in final recommendation phase",
+ String.valueOf(UserVectorSplitterMapper.DEFAULT_MAX_PREFS_PER_USER_CONSIDERED));
+ addOption("minPrefsPerUser", "mp", "ignore users with less preferences than this in the similarity computation "
+ + "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')', String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
+ addOption("maxSimilaritiesPerItem", "m", "Maximum number of similarities considered per item ",
+ String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ITEM));
+ addOption("maxPrefsInItemSimilarity", "mpiis", "max number of preferences to consider per user or item in the "
+ + "item similarity computation phase, users or items with more preferences will be sampled down (default: "
+ + DEFAULT_MAX_PREFS + ')', String.valueOf(DEFAULT_MAX_PREFS));
+ addOption("similarityClassname", "s", "Name of distributed similarity measures class to instantiate, "
+ + "alternatively use one of the predefined similarities (" + VectorSimilarityMeasures.list() + ')', true);
+ addOption("threshold", "tr", "discard item pairs with a similarity value below this", false);
+ addOption("outputPathForSimilarityMatrix", "opfsm", "write the item similarity matrix to this path (optional)",
+ false);
+ addOption("randomSeed", null, "use this seed for sampling", false);
+ addFlag("sequencefileOutput", null, "write the output into a SequenceFile instead of a text file");
+
+ Map<String, List<String>> parsedArgs = parseArguments(args);
+ if (parsedArgs == null) {
+ return -1;
+ }
+
+ Path outputPath = getOutputPath();
+ int numRecommendations = Integer.parseInt(getOption("numRecommendations"));
+ String usersFile = getOption("usersFile");
+ String itemsFile = getOption("itemsFile");
+ String filterFile = getOption("filterFile");
+ String userItemFile = getOption("userItemFile");
+ boolean booleanData = Boolean.valueOf(getOption("booleanData"));
+ int maxPrefsPerUser = Integer.parseInt(getOption("maxPrefsPerUser"));
+ int minPrefsPerUser = Integer.parseInt(getOption("minPrefsPerUser"));
+ int maxPrefsInItemSimilarity = Integer.parseInt(getOption("maxPrefsInItemSimilarity"));
+ int maxSimilaritiesPerItem = Integer.parseInt(getOption("maxSimilaritiesPerItem"));
+ String similarityClassname = getOption("similarityClassname");
+ double threshold = hasOption("threshold")
+ ? Double.parseDouble(getOption("threshold")) : RowSimilarityJob.NO_THRESHOLD;
+ long randomSeed = hasOption("randomSeed")
+ ? Long.parseLong(getOption("randomSeed")) : RowSimilarityJob.NO_FIXED_RANDOM_SEED;
+
+
+ Path prepPath = getTempPath(DEFAULT_PREPARE_PATH);
+ Path similarityMatrixPath = getTempPath("similarityMatrix");
+ Path explicitFilterPath = getTempPath("explicitFilterPath");
+ Path partialMultiplyPath = getTempPath("partialMultiply");
+
+ AtomicInteger currentPhase = new AtomicInteger();
+
+ int numberOfUsers = -1;
+
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{
+ "--input", getInputPath().toString(),
+ "--output", prepPath.toString(),
+ "--minPrefsPerUser", String.valueOf(minPrefsPerUser),
+ "--booleanData", String.valueOf(booleanData),
+ "--tempDir", getTempPath().toString(),
+ });
+
+ numberOfUsers = HadoopUtil.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());
+ }
+
+
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+
+ /* special behavior if phase 1 is skipped */
+ if (numberOfUsers == -1) {
+ numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
+ PathType.LIST, null, getConf());
+ }
+
+ //calculate the co-occurrence matrix
+ ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{
+ "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
+ "--output", similarityMatrixPath.toString(),
+ "--numberOfColumns", String.valueOf(numberOfUsers),
+ "--similarityClassname", similarityClassname,
+ "--maxObservationsPerRow", String.valueOf(maxPrefsInItemSimilarity),
+ "--maxObservationsPerColumn", String.valueOf(maxPrefsInItemSimilarity),
+ "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),
+ "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
+ "--threshold", String.valueOf(threshold),
+ "--randomSeed", String.valueOf(randomSeed),
+ "--tempDir", getTempPath().toString(),
+ });
+
+ // write out the similarity matrix if the user specified that behavior
+ if (hasOption("outputPathForSimilarityMatrix")) {
+ Path outputPathForSimilarityMatrix = new Path(getOption("outputPathForSimilarityMatrix"));
+
+ Job outputSimilarityMatrix = prepareJob(similarityMatrixPath, outputPathForSimilarityMatrix,
+ SequenceFileInputFormat.class, ItemSimilarityJob.MostSimilarItemPairsMapper.class,
+ EntityEntityWritable.class, DoubleWritable.class, ItemSimilarityJob.MostSimilarItemPairsReducer.class,
+ EntityEntityWritable.class, DoubleWritable.class, TextOutputFormat.class);
+
+ Configuration mostSimilarItemsConf = outputSimilarityMatrix.getConfiguration();
+ mostSimilarItemsConf.set(ItemSimilarityJob.ITEM_ID_INDEX_PATH_STR,
+ new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString());
+ mostSimilarItemsConf.setInt(ItemSimilarityJob.MAX_SIMILARITIES_PER_ITEM, maxSimilaritiesPerItem);
+ outputSimilarityMatrix.waitForCompletion(true);
+ }
+ }
+
+ //start the multiplication of the co-occurrence matrix by the user vectors
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ Job partialMultiply = Job.getInstance(getConf(), "partialMultiply");
+ Configuration partialMultiplyConf = partialMultiply.getConfiguration();
+
+ MultipleInputs.addInputPath(partialMultiply, similarityMatrixPath, SequenceFileInputFormat.class,
+ SimilarityMatrixRowWrapperMapper.class);
+ MultipleInputs.addInputPath(partialMultiply, new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
+ SequenceFileInputFormat.class, UserVectorSplitterMapper.class);
+ partialMultiply.setJarByClass(ToVectorAndPrefReducer.class);
+ partialMultiply.setMapOutputKeyClass(VarIntWritable.class);
+ partialMultiply.setMapOutputValueClass(VectorOrPrefWritable.class);
+ partialMultiply.setReducerClass(ToVectorAndPrefReducer.class);
+ partialMultiply.setOutputFormatClass(SequenceFileOutputFormat.class);
+ partialMultiply.setOutputKeyClass(VarIntWritable.class);
+ partialMultiply.setOutputValueClass(VectorAndPrefsWritable.class);
+ partialMultiplyConf.setBoolean("mapred.compress.map.output", true);
+ partialMultiplyConf.set("mapred.output.dir", partialMultiplyPath.toString());
+
+ if (usersFile != null) {
+ partialMultiplyConf.set(UserVectorSplitterMapper.USERS_FILE, usersFile);
+ }
+
+ if (userItemFile != null) {
+ partialMultiplyConf.set(IDReader.USER_ITEM_FILE, userItemFile);
+ }
+
+ partialMultiplyConf.setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED, maxPrefsPerUser);
+
+ boolean succeeded = partialMultiply.waitForCompletion(true);
+ if (!succeeded) {
+ return -1;
+ }
+ }
+
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ //filter out any users we don't care about
+ /* convert the user/item pairs to filter if a filterfile has been specified */
+ if (filterFile != null) {
+ Job itemFiltering = prepareJob(new Path(filterFile), explicitFilterPath, TextInputFormat.class,
+ ItemFilterMapper.class, VarLongWritable.class, VarLongWritable.class,
+ ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
+ SequenceFileOutputFormat.class);
+ boolean succeeded = itemFiltering.waitForCompletion(true);
+ if (!succeeded) {
+ return -1;
+ }
+ }
+
+ String aggregateAndRecommendInput = partialMultiplyPath.toString();
+ if (filterFile != null) {
+ aggregateAndRecommendInput += "," + explicitFilterPath;
+ }
+
+ Class<? extends OutputFormat> outputFormat = parsedArgs.containsKey("--sequencefileOutput")
+ ? SequenceFileOutputFormat.class : TextOutputFormat.class;
+
+ //extract out the recommendations
+ Job aggregateAndRecommend = prepareJob(
+ new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,
+ PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,
+ AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
+ outputFormat);
+ Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();
+ if (itemsFile != null) {
+ aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile);
+ }
+
+ if (userItemFile != null) {
+ aggregateAndRecommendConf.set(IDReader.USER_ITEM_FILE, userItemFile);
+ }
+
+ if (filterFile != null) {
+ setS3SafeCombinedInputPath(aggregateAndRecommend, getTempPath(), partialMultiplyPath, explicitFilterPath);
+ }
+ setIOSort(aggregateAndRecommend);
+ aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH,
+ new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString());
+ aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations);
+ aggregateAndRecommendConf.setBoolean(BOOLEAN_DATA, booleanData);
+ boolean succeeded = aggregateAndRecommend.waitForCompletion(true);
+ if (!succeeded) {
+ return -1;
+ }
+ }
+
+ return 0;
+ }
+
+ private static void setIOSort(JobContext job) {
+ Configuration conf = job.getConfiguration();
+ conf.setInt("io.sort.factor", 100);
+ String javaOpts = conf.get("mapred.map.child.java.opts"); // new arg name
+ if (javaOpts == null) {
+ javaOpts = conf.get("mapred.child.java.opts"); // old arg name
+ }
+ int assumedHeapSize = 512;
+ if (javaOpts != null) {
+ Matcher m = Pattern.compile("-Xmx([0-9]+)([mMgG])").matcher(javaOpts);
+ if (m.find()) {
+ assumedHeapSize = Integer.parseInt(m.group(1));
+ String megabyteOrGigabyte = m.group(2);
+ if ("g".equalsIgnoreCase(megabyteOrGigabyte)) {
+ assumedHeapSize *= 1024;
+ }
+ }
+ }
+ // Cap this at 1024MB now; see https://issues.apache.org/jira/browse/MAPREDUCE-2308
+ conf.setInt("io.sort.mb", Math.min(assumedHeapSize / 2, 1024));
+ // For some reason the Merger doesn't report status for a long time; increase
+ // timeout when running these jobs
+ conf.setInt("mapred.task.timeout", 60 * 60 * 1000);
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new Configuration(), new RecommenderJob(), args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java
new file mode 100644
index 0000000..8ae8215
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * maps a row of the similarity matrix to a {@link VectorOrPrefWritable}
+ *
+ * actually a column from that matrix has to be used but as the similarity matrix is symmetric,
+ * we can use a row instead of having to transpose it
+ */
+public final class SimilarityMatrixRowWrapperMapper extends
+ Mapper<IntWritable,VectorWritable,VarIntWritable,VectorOrPrefWritable> {
+
+ private final VarIntWritable index = new VarIntWritable();
+ private final VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable();
+
+ @Override
+ protected void map(IntWritable key,
+ VectorWritable value,
+ Context context) throws IOException, InterruptedException {
+ Vector similarityMatrixRow = value.get();
+ /* remove self similarity */
+ similarityMatrixRow.set(key.get(), Double.NaN);
+
+ index.set(key.get());
+ vectorOrPref.set(similarityMatrixRow);
+
+ context.write(index, vectorOrPref);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java
new file mode 100644
index 0000000..e6e47fd
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * <h1>Input</h1>
+ *
+ * <p>
+ * Takes user IDs as {@link VarLongWritable} mapped to all associated item IDs and preference values, as
+ * {@link EntityPrefWritable}s.
+ * </p>
+ *
+ * <h1>Output</h1>
+ *
+ * <p>
+ * The same user ID mapped to a {@link RandomAccessSparseVector} representation of the same item IDs and
+ * preference values. Item IDs are used as vector indexes; they are hashed into ints to work as indexes with
+ * {@link TasteHadoopUtils#idToIndex(long)}. The mapping is remembered for later with a combination of
+ * {@link ItemIDIndexMapper} and {@link ItemIDIndexReducer}.
+ * </p>
+ */
+public final class ToUserVectorsReducer extends
+ Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> {
+
+ public static final String MIN_PREFERENCES_PER_USER = ToUserVectorsReducer.class.getName()
+ + ".minPreferencesPerUser";
+
+ private int minPreferences;
+
+ public enum Counters { USERS }
+
+ private final VectorWritable userVectorWritable = new VectorWritable();
+
+ @Override
+ protected void setup(Context ctx) throws IOException, InterruptedException {
+ super.setup(ctx);
+ minPreferences = ctx.getConfiguration().getInt(MIN_PREFERENCES_PER_USER, 1);
+ }
+
+ @Override
+ protected void reduce(VarLongWritable userID,
+ Iterable<VarLongWritable> itemPrefs,
+ Context context) throws IOException, InterruptedException {
+ Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+ for (VarLongWritable itemPref : itemPrefs) {
+ int index = TasteHadoopUtils.idToIndex(itemPref.get());
+ float value = itemPref instanceof EntityPrefWritable ? ((EntityPrefWritable) itemPref).getPrefValue() : 1.0f;
+ userVector.set(index, value);
+ }
+
+ if (userVector.getNumNondefaultElements() >= minPreferences) {
+ userVectorWritable.set(userVector);
+ userVectorWritable.setWritesLaxPrecision(true);
+ context.getCounter(Counters.USERS).increment(1);
+ context.write(userID, userVectorWritable);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java
new file mode 100644
index 0000000..9167437
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.Vector;
+
+public final class ToVectorAndPrefReducer extends
+ Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable> {
+
+ private final VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable();
+
+ @Override
+ protected void reduce(VarIntWritable key,
+ Iterable<VectorOrPrefWritable> values,
+ Context context) throws IOException, InterruptedException {
+
+ List<Long> userIDs = new ArrayList<>();
+ List<Float> prefValues = new ArrayList<>();
+ Vector similarityMatrixColumn = null;
+ for (VectorOrPrefWritable value : values) {
+ if (value.getVector() == null) {
+ // Then this is a user-pref value
+ userIDs.add(value.getUserID());
+ prefValues.add(value.getValue());
+ } else {
+ // Then this is the column vector
+ if (similarityMatrixColumn != null) {
+ throw new IllegalStateException("Found two similarity-matrix columns for item index " + key.get());
+ }
+ similarityMatrixColumn = value.getVector();
+ }
+ }
+
+ if (similarityMatrixColumn == null) {
+ return;
+ }
+
+ vectorAndPrefs.set(similarityMatrixColumn, userIDs, prefValues);
+ context.write(key, vectorAndPrefs);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
new file mode 100644
index 0000000..2290d06
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.lucene.util.PriorityQueue;
+import org.apache.mahout.cf.taste.impl.common.FastIDSet;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class UserVectorSplitterMapper extends
+ Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> {
+
+ private static final Logger log = LoggerFactory.getLogger(UserVectorSplitterMapper.class);
+
+ static final String USERS_FILE = "usersFile";
+ static final String MAX_PREFS_PER_USER_CONSIDERED = "maxPrefsPerUserConsidered";
+ static final int DEFAULT_MAX_PREFS_PER_USER_CONSIDERED = 10;
+
+ private int maxPrefsPerUserConsidered;
+ private FastIDSet usersToRecommendFor;
+
+ private final VarIntWritable itemIndexWritable = new VarIntWritable();
+ private final VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable();
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ Configuration jobConf = context.getConfiguration();
+ maxPrefsPerUserConsidered = jobConf.getInt(MAX_PREFS_PER_USER_CONSIDERED, DEFAULT_MAX_PREFS_PER_USER_CONSIDERED);
+
+ IDReader idReader = new IDReader (jobConf);
+ idReader.readIDs();
+ usersToRecommendFor = idReader.getUserIds();
+ }
+
+ @Override
+ protected void map(VarLongWritable key,
+ VectorWritable value,
+ Context context) throws IOException, InterruptedException {
+ long userID = key.get();
+
+ log.info("UserID = {}", userID);
+
+ if (usersToRecommendFor != null && !usersToRecommendFor.contains(userID)) {
+ return;
+ }
+ Vector userVector = maybePruneUserVector(value.get());
+
+ for (Element e : userVector.nonZeroes()) {
+ itemIndexWritable.set(e.index());
+ vectorOrPref.set(userID, (float) e.get());
+ context.write(itemIndexWritable, vectorOrPref);
+ }
+ }
+
+ private Vector maybePruneUserVector(Vector userVector) {
+ if (userVector.getNumNondefaultElements() <= maxPrefsPerUserConsidered) {
+ return userVector;
+ }
+
+ float smallestLargeValue = findSmallestLargeValue(userVector);
+
+ // "Blank out" small-sized prefs to reduce the amount of partial products
+ // generated later. They're not zeroed, but NaN-ed, so they come through
+ // and can be used to exclude these items from prefs.
+ for (Element e : userVector.nonZeroes()) {
+ float absValue = Math.abs((float) e.get());
+ if (absValue < smallestLargeValue) {
+ e.set(Float.NaN);
+ }
+ }
+
+ return userVector;
+ }
+
+ private float findSmallestLargeValue(Vector userVector) {
+
+ PriorityQueue<Float> topPrefValues = new PriorityQueue<Float>(maxPrefsPerUserConsidered) {
+ @Override
+ protected boolean lessThan(Float f1, Float f2) {
+ return f1 < f2;
+ }
+ };
+
+ for (Element e : userVector.nonZeroes()) {
+ float absValue = Math.abs((float) e.get());
+ topPrefValues.insertWithOverflow(absValue);
+ }
+ return topPrefValues.top();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java
new file mode 100644
index 0000000..11d496f
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.Varint;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+public final class VectorAndPrefsWritable implements Writable {
+
+ private Vector vector;
+ private List<Long> userIDs;
+ private List<Float> values;
+
+ public VectorAndPrefsWritable() {
+ }
+
+ public VectorAndPrefsWritable(Vector vector, List<Long> userIDs, List<Float> values) {
+ set(vector, userIDs, values);
+ }
+
+ public void set(Vector vector, List<Long> userIDs, List<Float> values) {
+ this.vector = vector;
+ this.userIDs = userIDs;
+ this.values = values;
+ }
+
+ public Vector getVector() {
+ return vector;
+ }
+
+ public List<Long> getUserIDs() {
+ return userIDs;
+ }
+
+ public List<Float> getValues() {
+ return values;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ VectorWritable vw = new VectorWritable(vector);
+ vw.setWritesLaxPrecision(true);
+ vw.write(out);
+ Varint.writeUnsignedVarInt(userIDs.size(), out);
+ for (int i = 0; i < userIDs.size(); i++) {
+ Varint.writeSignedVarLong(userIDs.get(i), out);
+ out.writeFloat(values.get(i));
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ VectorWritable writable = new VectorWritable();
+ writable.readFields(in);
+ vector = writable.get();
+ int size = Varint.readUnsignedVarInt(in);
+ userIDs = new ArrayList<>(size);
+ values = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ userIDs.add(Varint.readSignedVarLong(in));
+ values.add(in.readFloat());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return vector + "\t" + userIDs + '\t' + values;
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
new file mode 100644
index 0000000..515d7ea
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.Varint;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+public final class VectorOrPrefWritable implements Writable {
+
+ private Vector vector;
+ private long userID;
+ private float value;
+
+ public VectorOrPrefWritable() {
+ }
+
+ public VectorOrPrefWritable(Vector vector) {
+ this.vector = vector;
+ }
+
+ public VectorOrPrefWritable(long userID, float value) {
+ this.userID = userID;
+ this.value = value;
+ }
+
+ public Vector getVector() {
+ return vector;
+ }
+
+ public long getUserID() {
+ return userID;
+ }
+
+ public float getValue() {
+ return value;
+ }
+
+ void set(Vector vector) {
+ this.vector = vector;
+ this.userID = Long.MIN_VALUE;
+ this.value = Float.NaN;
+ }
+
+ public void set(long userID, float value) {
+ this.vector = null;
+ this.userID = userID;
+ this.value = value;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (vector == null) {
+ out.writeBoolean(false);
+ Varint.writeSignedVarLong(userID, out);
+ out.writeFloat(value);
+ } else {
+ out.writeBoolean(true);
+ VectorWritable vw = new VectorWritable(vector);
+ vw.setWritesLaxPrecision(true);
+ vw.write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ boolean hasVector = in.readBoolean();
+ if (hasVector) {
+ VectorWritable writable = new VectorWritable();
+ writable.readFields(in);
+ set(writable.get());
+ } else {
+ long theUserID = Varint.readSignedVarLong(in);
+ float theValue = in.readFloat();
+ set(theUserID, theValue);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return vector == null ? userID + ":" + value : vector.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
new file mode 100644
index 0000000..c64ee38
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.preparation;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
+import org.apache.mahout.cf.taste.hadoop.ToEntityPrefsMapper;
+import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
+import org.apache.mahout.cf.taste.hadoop.item.ItemIDIndexMapper;
+import org.apache.mahout.cf.taste.hadoop.item.ItemIDIndexReducer;
+import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;
+import org.apache.mahout.cf.taste.hadoop.item.ToUserVectorsReducer;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.VectorWritable;
+
+import java.util.List;
+import java.util.Map;
+
+public class PreparePreferenceMatrixJob extends AbstractJob {
+
+ public static final String NUM_USERS = "numUsers.bin";
+ public static final String ITEMID_INDEX = "itemIDIndex";
+ public static final String USER_VECTORS = "userVectors";
+ public static final String RATING_MATRIX = "ratingMatrix";
+
+ private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new PreparePreferenceMatrixJob(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ addInputOption();
+ addOutputOption();
+ addOption("minPrefsPerUser", "mp", "ignore users with less preferences than this "
+ + "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')', String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
+ addOption("booleanData", "b", "Treat input as without pref values", Boolean.FALSE.toString());
+ addOption("ratingShift", "rs", "shift ratings by this value", "0.0");
+
+ Map<String, List<String>> parsedArgs = parseArguments(args);
+ if (parsedArgs == null) {
+ return -1;
+ }
+
+ int minPrefsPerUser = Integer.parseInt(getOption("minPrefsPerUser"));
+ boolean booleanData = Boolean.valueOf(getOption("booleanData"));
+ float ratingShift = Float.parseFloat(getOption("ratingShift"));
+ //convert items to an internal index
+ Job itemIDIndex = prepareJob(getInputPath(), getOutputPath(ITEMID_INDEX), TextInputFormat.class,
+ ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class, ItemIDIndexReducer.class,
+ VarIntWritable.class, VarLongWritable.class, SequenceFileOutputFormat.class);
+ itemIDIndex.setCombinerClass(ItemIDIndexReducer.class);
+ boolean succeeded = itemIDIndex.waitForCompletion(true);
+ if (!succeeded) {
+ return -1;
+ }
+ //convert user preferences into a vector per user
+ Job toUserVectors = prepareJob(getInputPath(),
+ getOutputPath(USER_VECTORS),
+ TextInputFormat.class,
+ ToItemPrefsMapper.class,
+ VarLongWritable.class,
+ booleanData ? VarLongWritable.class : EntityPrefWritable.class,
+ ToUserVectorsReducer.class,
+ VarLongWritable.class,
+ VectorWritable.class,
+ SequenceFileOutputFormat.class);
+ toUserVectors.getConfiguration().setBoolean(RecommenderJob.BOOLEAN_DATA, booleanData);
+ toUserVectors.getConfiguration().setInt(ToUserVectorsReducer.MIN_PREFERENCES_PER_USER, minPrefsPerUser);
+ toUserVectors.getConfiguration().set(ToEntityPrefsMapper.RATING_SHIFT, String.valueOf(ratingShift));
+ succeeded = toUserVectors.waitForCompletion(true);
+ if (!succeeded) {
+ return -1;
+ }
+ //we need the number of users later
+ int numberOfUsers = (int) toUserVectors.getCounters().findCounter(ToUserVectorsReducer.Counters.USERS).getValue();
+ HadoopUtil.writeInt(numberOfUsers, getOutputPath(NUM_USERS), getConf());
+ //build the rating matrix
+ Job toItemVectors = prepareJob(getOutputPath(USER_VECTORS), getOutputPath(RATING_MATRIX),
+ ToItemVectorsMapper.class, IntWritable.class, VectorWritable.class, ToItemVectorsReducer.class,
+ IntWritable.class, VectorWritable.class);
+ toItemVectors.setCombinerClass(ToItemVectorsReducer.class);
+
+ succeeded = toItemVectors.waitForCompletion(true);
+ if (!succeeded) {
+ return -1;
+ }
+
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java
new file mode 100644
index 0000000..5a4144c
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.preparation;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+public class ToItemVectorsMapper
+ extends Mapper<VarLongWritable,VectorWritable,IntWritable,VectorWritable> {
+
+ private final IntWritable itemID = new IntWritable();
+ private final VectorWritable itemVectorWritable = new VectorWritable();
+
+ @Override
+ protected void map(VarLongWritable rowIndex, VectorWritable vectorWritable, Context ctx)
+ throws IOException, InterruptedException {
+ Vector userRatings = vectorWritable.get();
+
+ int column = TasteHadoopUtils.idToIndex(rowIndex.get());
+
+ itemVectorWritable.setWritesLaxPrecision(true);
+
+ Vector itemVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 1);
+ for (Vector.Element elem : userRatings.nonZeroes()) {
+ itemID.set(elem.index());
+ itemVector.setQuick(column, elem.get());
+ itemVectorWritable.set(itemVector);
+ ctx.write(itemID, itemVectorWritable);
+ // reset vector for reuse
+ itemVector.setQuick(elem.index(), 0.0);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsReducer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsReducer.java b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsReducer.java
new file mode 100644
index 0000000..f74511b
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsReducer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.preparation;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+public class ToItemVectorsReducer extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+ private final VectorWritable merged = new VectorWritable();
+
+ @Override
+ protected void reduce(IntWritable row, Iterable<VectorWritable> vectors, Context ctx)
+ throws IOException, InterruptedException {
+
+ merged.setWritesLaxPrecision(true);
+ merged.set(VectorWritable.mergeToVector(vectors.iterator()));
+ ctx.write(row, merged);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
new file mode 100644
index 0000000..c50fa20
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.similarity.item;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.cf.taste.hadoop.preparation.PreparePreferenceMatrixJob;
+import org.apache.mahout.cf.taste.similarity.precompute.SimilarItem;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasures;
+import org.apache.mahout.math.map.OpenIntLongHashMap;
+
+/**
+ * <p>Distributed precomputation of the item-item-similarities for Itembased Collaborative Filtering</p>
+ *
+ * <p>Preferences in the input file should look like {@code userID,itemID[,preferencevalue]}</p>
+ *
+ * <p>
+ * Preference value is optional to accommodate applications that have no notion of a preference value (that is, the user
+ * simply expresses a preference for an item, but no degree of preference).
+ * </p>
+ *
+ * <p>
+ * The preference value is assumed to be parseable as a {@code double}. The user IDs and item IDs are
+ * parsed as {@code long}s.
+ * </p>
+ *
+ * <p>Command line arguments specific to this class are:</p>
+ *
+ * <ol>
+ * <li>--input (path): Directory containing one or more text files with the preference data</li>
+ * <li>--output (path): output path where similarity data should be written</li>
+ * <li>--similarityClassname (classname): Name of distributed similarity measure class to instantiate or a predefined
+ * similarity from {@link org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasure}</li>
+ * <li>--maxSimilaritiesPerItem (integer): Maximum number of similarities considered per item (100)</li>
+ * <li>--maxPrefsPerUser (integer): max number of preferences to consider per user, users with more preferences will
+ * be sampled down (1000)</li>
+ * <li>--minPrefsPerUser (integer): ignore users with less preferences than this (1)</li>
+ * <li>--booleanData (boolean): Treat input data as having no pref values (false)</li>
+ * <li>--threshold (double): discard item pairs with a similarity value below this</li>
+ * </ol>
+ *
+ * <p>General command line options are documented in {@link AbstractJob}.</p>
+ *
+ * <p>Note that because of how Hadoop parses arguments, all "-D" arguments must appear before all other arguments.</p>
+ */
+public final class ItemSimilarityJob extends AbstractJob {
+
+ public static final String ITEM_ID_INDEX_PATH_STR = ItemSimilarityJob.class.getName() + ".itemIDIndexPathStr";
+ public static final String MAX_SIMILARITIES_PER_ITEM = ItemSimilarityJob.class.getName() + ".maxSimilarItemsPerItem";
+
+ private static final int DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM = 100;
+ private static final int DEFAULT_MAX_PREFS = 500;
+ private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new ItemSimilarityJob(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ addInputOption();
+ addOutputOption();
+ addOption("similarityClassname", "s", "Name of distributed similarity measures class to instantiate, "
+ + "alternatively use one of the predefined similarities (" + VectorSimilarityMeasures.list() + ')');
+ addOption("maxSimilaritiesPerItem", "m", "try to cap the number of similar items per item to this number "
+ + "(default: " + DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM + ')',
+ String.valueOf(DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM));
+ addOption("maxPrefs", "mppu", "max number of preferences to consider per user or item, "
+ + "users or items with more preferences will be sampled down (default: " + DEFAULT_MAX_PREFS + ')',
+ String.valueOf(DEFAULT_MAX_PREFS));
+ addOption("minPrefsPerUser", "mp", "ignore users with less preferences than this "
+ + "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')', String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
+ addOption("booleanData", "b", "Treat input as without pref values", String.valueOf(Boolean.FALSE));
+ addOption("threshold", "tr", "discard item pairs with a similarity value below this", false);
+ addOption("randomSeed", null, "use this seed for sampling", false);
+
+ Map<String,List<String>> parsedArgs = parseArguments(args);
+ if (parsedArgs == null) {
+ return -1;
+ }
+
+ String similarityClassName = getOption("similarityClassname");
+ int maxSimilarItemsPerItem = Integer.parseInt(getOption("maxSimilaritiesPerItem"));
+ int maxPrefs = Integer.parseInt(getOption("maxPrefs"));
+ int minPrefsPerUser = Integer.parseInt(getOption("minPrefsPerUser"));
+ boolean booleanData = Boolean.valueOf(getOption("booleanData"));
+
+ double threshold = hasOption("threshold")
+ ? Double.parseDouble(getOption("threshold")) : RowSimilarityJob.NO_THRESHOLD;
+ long randomSeed = hasOption("randomSeed")
+ ? Long.parseLong(getOption("randomSeed")) : RowSimilarityJob.NO_FIXED_RANDOM_SEED;
+
+ Path similarityMatrixPath = getTempPath("similarityMatrix");
+ Path prepPath = getTempPath("prepareRatingMatrix");
+
+ AtomicInteger currentPhase = new AtomicInteger();
+
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[] {
+ "--input", getInputPath().toString(),
+ "--output", prepPath.toString(),
+ "--minPrefsPerUser", String.valueOf(minPrefsPerUser),
+ "--booleanData", String.valueOf(booleanData),
+ "--tempDir", getTempPath().toString(),
+ });
+ }
+
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ int numberOfUsers = HadoopUtil.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());
+
+ ToolRunner.run(getConf(), new RowSimilarityJob(), new String[] {
+ "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
+ "--output", similarityMatrixPath.toString(),
+ "--numberOfColumns", String.valueOf(numberOfUsers),
+ "--similarityClassname", similarityClassName,
+ "--maxObservationsPerRow", String.valueOf(maxPrefs),
+ "--maxObservationsPerColumn", String.valueOf(maxPrefs),
+ "--maxSimilaritiesPerRow", String.valueOf(maxSimilarItemsPerItem),
+ "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
+ "--threshold", String.valueOf(threshold),
+ "--randomSeed", String.valueOf(randomSeed),
+ "--tempDir", getTempPath().toString(),
+ });
+ }
+
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ Job mostSimilarItems = prepareJob(similarityMatrixPath, getOutputPath(), SequenceFileInputFormat.class,
+ MostSimilarItemPairsMapper.class, EntityEntityWritable.class, DoubleWritable.class,
+ MostSimilarItemPairsReducer.class, EntityEntityWritable.class, DoubleWritable.class, TextOutputFormat.class);
+ Configuration mostSimilarItemsConf = mostSimilarItems.getConfiguration();
+ mostSimilarItemsConf.set(ITEM_ID_INDEX_PATH_STR,
+ new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString());
+ mostSimilarItemsConf.setInt(MAX_SIMILARITIES_PER_ITEM, maxSimilarItemsPerItem);
+ boolean succeeded = mostSimilarItems.waitForCompletion(true);
+ if (!succeeded) {
+ return -1;
+ }
+ }
+
+ return 0;
+ }
+
+ public static class MostSimilarItemPairsMapper
+ extends Mapper<IntWritable,VectorWritable,EntityEntityWritable,DoubleWritable> {
+
+ private OpenIntLongHashMap indexItemIDMap;
+ private int maxSimilarItemsPerItem;
+
+ @Override
+ protected void setup(Context ctx) {
+ Configuration conf = ctx.getConfiguration();
+ maxSimilarItemsPerItem = conf.getInt(MAX_SIMILARITIES_PER_ITEM, -1);
+ indexItemIDMap = TasteHadoopUtils.readIDIndexMap(conf.get(ITEM_ID_INDEX_PATH_STR), conf);
+
+ Preconditions.checkArgument(maxSimilarItemsPerItem > 0, "maxSimilarItemsPerItem must be greater then 0!");
+ }
+
+ @Override
+ protected void map(IntWritable itemIDIndexWritable, VectorWritable similarityVector, Context ctx)
+ throws IOException, InterruptedException {
+
+ int itemIDIndex = itemIDIndexWritable.get();
+
+ TopSimilarItemsQueue topKMostSimilarItems = new TopSimilarItemsQueue(maxSimilarItemsPerItem);
+
+ for (Vector.Element element : similarityVector.get().nonZeroes()) {
+ SimilarItem top = topKMostSimilarItems.top();
+ double candidateSimilarity = element.get();
+ if (candidateSimilarity > top.getSimilarity()) {
+ top.set(indexItemIDMap.get(element.index()), candidateSimilarity);
+ topKMostSimilarItems.updateTop();
+ }
+ }
+
+ long itemID = indexItemIDMap.get(itemIDIndex);
+ for (SimilarItem similarItem : topKMostSimilarItems.getTopItems()) {
+ long otherItemID = similarItem.getItemID();
+ if (itemID < otherItemID) {
+ ctx.write(new EntityEntityWritable(itemID, otherItemID), new DoubleWritable(similarItem.getSimilarity()));
+ } else {
+ ctx.write(new EntityEntityWritable(otherItemID, itemID), new DoubleWritable(similarItem.getSimilarity()));
+ }
+ }
+ }
+ }
+
+ public static class MostSimilarItemPairsReducer
+ extends Reducer<EntityEntityWritable,DoubleWritable,EntityEntityWritable,DoubleWritable> {
+ @Override
+ protected void reduce(EntityEntityWritable pair, Iterable<DoubleWritable> values, Context ctx)
+ throws IOException, InterruptedException {
+ ctx.write(pair, values.iterator().next());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/TopSimilarItemsQueue.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/TopSimilarItemsQueue.java b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/TopSimilarItemsQueue.java
new file mode 100644
index 0000000..acb6392
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/TopSimilarItemsQueue.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.similarity.item;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.lucene.util.PriorityQueue;
+import org.apache.mahout.cf.taste.similarity.precompute.SimilarItem;
+
+public class TopSimilarItemsQueue extends PriorityQueue<SimilarItem> {
+
+ private static final long SENTINEL_ID = Long.MIN_VALUE;
+
+ private final int maxSize;
+
+ public TopSimilarItemsQueue(int maxSize) {
+ super(maxSize);
+ this.maxSize = maxSize;
+ }
+
+ public List<SimilarItem> getTopItems() {
+ List<SimilarItem> items = new ArrayList<>(maxSize);
+ while (size() > 0) {
+ SimilarItem topItem = pop();
+ // filter out "sentinel" objects necessary for maintaining an efficient priority queue
+ if (topItem.getItemID() != SENTINEL_ID) {
+ items.add(topItem);
+ }
+ }
+ Collections.reverse(items);
+ return items;
+ }
+
+ @Override
+ protected boolean lessThan(SimilarItem one, SimilarItem two) {
+ return one.getSimilarity() < two.getSimilarity();
+ }
+
+ @Override
+ protected SimilarItem getSentinelObject() {
+ return new SimilarItem(SENTINEL_ID, Double.MIN_VALUE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/impl/common/AbstractLongPrimitiveIterator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/impl/common/AbstractLongPrimitiveIterator.java b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/impl/common/AbstractLongPrimitiveIterator.java
new file mode 100644
index 0000000..f46785c
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/impl/common/AbstractLongPrimitiveIterator.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.impl.common;
+
+public abstract class AbstractLongPrimitiveIterator implements LongPrimitiveIterator {
+
+ @Override
+ public Long next() {
+ return nextLong();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/impl/common/BitSet.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/impl/common/BitSet.java b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/impl/common/BitSet.java
new file mode 100644
index 0000000..c46b4b6
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/impl/common/BitSet.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.impl.common;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/** A simplified and streamlined version of {@link java.util.BitSet}. */
+final class BitSet implements Serializable, Cloneable {
+
+ private final long[] bits;
+
+ BitSet(int numBits) {
+ int numLongs = numBits >>> 6;
+ if ((numBits & 0x3F) != 0) {
+ numLongs++;
+ }
+ bits = new long[numLongs];
+ }
+
+ private BitSet(long[] bits) {
+ this.bits = bits;
+ }
+
+ boolean get(int index) {
+ // skipping range check for speed
+ return (bits[index >>> 6] & 1L << (index & 0x3F)) != 0L;
+ }
+
+ void set(int index) {
+ // skipping range check for speed
+ bits[index >>> 6] |= 1L << (index & 0x3F);
+ }
+
+ void clear(int index) {
+ // skipping range check for speed
+ bits[index >>> 6] &= ~(1L << (index & 0x3F));
+ }
+
+ void clear() {
+ int length = bits.length;
+ for (int i = 0; i < length; i++) {
+ bits[i] = 0L;
+ }
+ }
+
+ @Override
+ public BitSet clone() {
+ return new BitSet(bits.clone());
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(bits);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof BitSet)) {
+ return false;
+ }
+ BitSet other = (BitSet) o;
+ return Arrays.equals(bits, other.bits);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder result = new StringBuilder(64 * bits.length);
+ for (long l : bits) {
+ for (int j = 0; j < 64; j++) {
+ result.append((l & 1L << j) == 0 ? '0' : '1');
+ }
+ result.append(' ');
+ }
+ return result.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/impl/common/Cache.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/impl/common/Cache.java b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/impl/common/Cache.java
new file mode 100755
index 0000000..b2d9b36
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/cf/taste/impl/common/Cache.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.impl.common;
+
+import com.google.common.base.Preconditions;
+import org.apache.mahout.cf.taste.common.TasteException;
+
+import java.util.Iterator;
+
+/**
+ * <p>
+ * An efficient Map-like class which caches values for keys. Values are not "put" into a {@link Cache};
+ * instead the caller supplies the instance with an implementation of {@link Retriever} which can load the
+ * value for a given key.
+ * </p>
+ *
+ * <p>
+ * The cache does not support {@code null} keys.
+ * </p>
+ *
+ * <p>
+ * Thanks to Amila Jayasooriya for helping evaluate performance of the rewrite of this class, as part of a
+ * Google Summer of Code 2007 project.
+ * </p>
+ */
+public final class Cache<K,V> implements Retriever<K,V> {
+
+ private static final Object NULL = new Object();
+
+ private final FastMap<K,V> cache;
+ private final Retriever<? super K,? extends V> retriever;
+
+ /**
+ * <p>
+ * Creates a new cache based on the given {@link Retriever}.
+ * </p>
+ *
+ * @param retriever
+ * object which can retrieve values for keys
+ */
+ public Cache(Retriever<? super K,? extends V> retriever) {
+ this(retriever, FastMap.NO_MAX_SIZE);
+ }
+
+ /**
+ * <p>
+ * Creates a new cache based on the given {@link Retriever} and with given maximum size.
+ * </p>
+ *
+ * @param retriever
+ * object which can retrieve values for keys
+ * @param maxEntries
+ * maximum number of entries the cache will store before evicting some
+ */
+ public Cache(Retriever<? super K,? extends V> retriever, int maxEntries) {
+ Preconditions.checkArgument(retriever != null, "retriever is null");
+ Preconditions.checkArgument(maxEntries >= 1, "maxEntries must be at least 1");
+ cache = new FastMap<>(11, maxEntries);
+ this.retriever = retriever;
+ }
+
+ /**
+ * <p>
+ * Returns cached value for a key. If it does not exist, it is loaded using a {@link Retriever}.
+ * </p>
+ *
+ * @param key
+ * cache key
+ * @return value for that key
+ * @throws TasteException
+ * if an exception occurs while retrieving a new cached value
+ */
+ @Override
+ public V get(K key) throws TasteException {
+ V value;
+ synchronized (cache) {
+ value = cache.get(key);
+ }
+ if (value == null) {
+ return getAndCacheValue(key);
+ }
+ return value == NULL ? null : value;
+ }
+
+ /**
+ * <p>
+ * Uncaches any existing value for a given key.
+ * </p>
+ *
+ * @param key
+ * cache key
+ */
+ public void remove(K key) {
+ synchronized (cache) {
+ cache.remove(key);
+ }
+ }
+
+ /**
+ * Clears all cache entries whose key matches the given predicate.
+ */
+ public void removeKeysMatching(MatchPredicate<K> predicate) {
+ synchronized (cache) {
+ Iterator<K> it = cache.keySet().iterator();
+ while (it.hasNext()) {
+ K key = it.next();
+ if (predicate.matches(key)) {
+ it.remove();
+ }
+ }
+ }
+ }
+
+ /**
+ * Clears all cache entries whose value matches the given predicate.
+ */
+ public void removeValueMatching(MatchPredicate<V> predicate) {
+ synchronized (cache) {
+ Iterator<V> it = cache.values().iterator();
+ while (it.hasNext()) {
+ V value = it.next();
+ if (predicate.matches(value)) {
+ it.remove();
+ }
+ }
+ }
+ }
+
+ /**
+ * <p>
+ * Clears the cache.
+ * </p>
+ */
+ public void clear() {
+ synchronized (cache) {
+ cache.clear();
+ }
+ }
+
+ private V getAndCacheValue(K key) throws TasteException {
+ V value = retriever.get(key);
+ if (value == null) {
+ value = (V) NULL;
+ }
+ synchronized (cache) {
+ cache.put(key, value);
+ }
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return "Cache[retriever:" + retriever + ']';
+ }
+
+ /**
+ * Used by {#link #removeKeysMatching(Object)} to decide things that are matching.
+ */
+ public interface MatchPredicate<T> {
+ boolean matches(T thing);
+ }
+
+}